mirror of https://github.com/LemmyNet/lemmy.git
add debug to make localhost urls not valid in ap crate, add some debug logs
parent
2e2345e6f7
commit
e3fef895a1
|
@ -3113,6 +3113,7 @@ name = "lemmy_federate"
|
||||||
version = "0.19.5"
|
version = "0.19.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"activitypub_federation",
|
"activitypub_federation",
|
||||||
|
"actix-web",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
"diesel",
|
"diesel",
|
||||||
|
@ -3131,6 +3132,8 @@ dependencies = [
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"tracing-test",
|
||||||
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -6336,6 +6339,27 @@ dependencies = [
|
||||||
"tracing-serde",
|
"tracing-serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tracing-test"
|
||||||
|
version = "0.2.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
|
||||||
|
dependencies = [
|
||||||
|
"tracing-core",
|
||||||
|
"tracing-subscriber",
|
||||||
|
"tracing-test-macro",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tracing-test-macro"
|
||||||
|
version = "0.2.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
|
||||||
|
dependencies = [
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.66",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "triomphe"
|
name = "triomphe"
|
||||||
version = "0.1.12"
|
version = "0.1.12"
|
||||||
|
|
|
@ -55,7 +55,7 @@ impl LemmyContext {
|
||||||
/// Initialize a context for use in tests which blocks federation network calls.
|
/// Initialize a context for use in tests which blocks federation network calls.
|
||||||
///
|
///
|
||||||
/// Do not use this in production code.
|
/// Do not use this in production code.
|
||||||
pub async fn init_test_context() -> Data<LemmyContext> {
|
pub async fn init_test_federation_config() -> FederationConfig<LemmyContext> {
|
||||||
// call this to run migrations
|
// call this to run migrations
|
||||||
let pool = build_db_pool_for_tests().await;
|
let pool = build_db_pool_for_tests().await;
|
||||||
|
|
||||||
|
@ -73,11 +73,16 @@ impl LemmyContext {
|
||||||
let config = FederationConfig::builder()
|
let config = FederationConfig::builder()
|
||||||
.domain(context.settings().hostname.clone())
|
.domain(context.settings().hostname.clone())
|
||||||
.app_data(context)
|
.app_data(context)
|
||||||
|
.debug(true)
|
||||||
// Dont allow any network fetches
|
// Dont allow any network fetches
|
||||||
.http_fetch_limit(0)
|
.http_fetch_limit(0)
|
||||||
.build()
|
.build()
|
||||||
.await
|
.await
|
||||||
.expect("build federation config");
|
.expect("build federation config");
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
pub async fn init_test_context() -> Data<LemmyContext> {
|
||||||
|
let config = Self::init_test_federation_config().await;
|
||||||
config.to_request_data()
|
config.to_request_data()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,3 +37,6 @@ tokio-util = "0.7.11"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serial_test = { workspace = true }
|
serial_test = { workspace = true }
|
||||||
|
url.workspace = true
|
||||||
|
actix-web.workspace = true
|
||||||
|
tracing-test = "0.2.5"
|
||||||
|
|
|
@ -99,6 +99,11 @@ impl CommunityInboxCollector {
|
||||||
.filter(|&u| (u.domain() == Some(&self.domain)))
|
.filter(|&u| (u.domain() == Some(&self.domain)))
|
||||||
.map(|u| u.inner().clone()),
|
.map(|u| u.inner().clone()),
|
||||||
);
|
);
|
||||||
|
tracing::trace!(
|
||||||
|
"get_inbox_urls: {:?}, send_inboxes: {:?}",
|
||||||
|
inbox_urls,
|
||||||
|
activity.send_inboxes
|
||||||
|
);
|
||||||
Ok(inbox_urls.into_iter().collect())
|
Ok(inbox_urls.into_iter().collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,8 +33,11 @@ use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
/// Save state to db after this time has passed since the last state (so if the server crashes or is
|
/// Save state to db after this time has passed since the last state (so if the server crashes or is
|
||||||
/// SIGKILLed, less than X seconds of activities are resent)
|
/// SIGKILLed, less than X seconds of activities are resent)
|
||||||
|
#[cfg(not(test))]
|
||||||
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
|
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
|
||||||
|
#[cfg(test)]
|
||||||
|
/// in test mode, we want it to save state and send it to print_stats after every send
|
||||||
|
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(0);
|
||||||
/// Maximum number of successful sends to allow out of order
|
/// Maximum number of successful sends to allow out of order
|
||||||
const MAX_SUCCESSFULS: usize = 1000;
|
const MAX_SUCCESSFULS: usize = 1000;
|
||||||
|
|
||||||
|
@ -391,3 +394,216 @@ impl InstanceWorker {
|
||||||
DbPool::Pool(&self.pool)
|
DbPool::Pool(&self.pool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[allow(clippy::unwrap_used)]
|
||||||
|
#[allow(clippy::indexing_slicing)]
|
||||||
|
mod test {
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use activitypub_federation::{
|
||||||
|
http_signatures::generate_actor_keypair,
|
||||||
|
protocol::context::WithContext,
|
||||||
|
};
|
||||||
|
use actix_web::{web, App, HttpResponse, HttpServer};
|
||||||
|
use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url};
|
||||||
|
use lemmy_db_schema::{
|
||||||
|
newtypes::DbUrl,
|
||||||
|
source::{
|
||||||
|
activity::{ActorType, SentActivity, SentActivityForm},
|
||||||
|
person::{Person, PersonInsertForm},
|
||||||
|
},
|
||||||
|
traits::Crud,
|
||||||
|
};
|
||||||
|
use lemmy_utils::error::LemmyResult;
|
||||||
|
use reqwest::StatusCode;
|
||||||
|
use serde_json::Value;
|
||||||
|
use serial_test::serial;
|
||||||
|
use std::{fs::File, io::BufReader};
|
||||||
|
use tokio::{
|
||||||
|
select,
|
||||||
|
spawn,
|
||||||
|
sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver},
|
||||||
|
};
|
||||||
|
use tracing_test::traced_test;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
struct Data {
|
||||||
|
context: activitypub_federation::config::Data<LemmyContext>,
|
||||||
|
instance: Instance,
|
||||||
|
person: Person,
|
||||||
|
stats_receiver: UnboundedReceiver<FederationQueueStateWithDomain>,
|
||||||
|
inbox_receiver: UnboundedReceiver<String>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Data {
|
||||||
|
async fn init() -> LemmyResult<Self> {
|
||||||
|
let context = LemmyContext::init_test_federation_config().await;
|
||||||
|
let instance = Instance::read_or_create(&mut context.pool(), "localhost".to_string()).await?;
|
||||||
|
|
||||||
|
let actor_keypair = generate_actor_keypair()?;
|
||||||
|
let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into();
|
||||||
|
let person_form = PersonInsertForm {
|
||||||
|
actor_id: Some(actor_id.clone()),
|
||||||
|
private_key: (Some(actor_keypair.private_key)),
|
||||||
|
inbox_url: Some(generate_inbox_url(&actor_id)?),
|
||||||
|
shared_inbox_url: Some(generate_shared_inbox_url(context.settings())?),
|
||||||
|
..PersonInsertForm::new("alice".to_string(), actor_keypair.public_key, instance.id)
|
||||||
|
};
|
||||||
|
let person = Person::create(&mut context.pool(), &person_form).await?;
|
||||||
|
|
||||||
|
let cancel = CancellationToken::new();
|
||||||
|
let (stats_sender, stats_receiver) = unbounded_channel();
|
||||||
|
let (inbox_sender, inbox_receiver) = unbounded_channel();
|
||||||
|
|
||||||
|
// listen for received activities in background
|
||||||
|
let cancel_ = cancel.clone();
|
||||||
|
listen_activities(inbox_sender, cancel_).await?;
|
||||||
|
|
||||||
|
let fed_config = FederationWorkerConfig {
|
||||||
|
concurrent_sends_per_instance: 1,
|
||||||
|
};
|
||||||
|
spawn(InstanceWorker::init_and_loop(
|
||||||
|
instance.clone(),
|
||||||
|
context.clone(),
|
||||||
|
fed_config,
|
||||||
|
cancel.clone(),
|
||||||
|
stats_sender,
|
||||||
|
));
|
||||||
|
// wait for startup
|
||||||
|
sleep(*WORK_FINISHED_RECHECK_DELAY).await;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
context: context.to_request_data(),
|
||||||
|
instance,
|
||||||
|
person,
|
||||||
|
stats_receiver,
|
||||||
|
inbox_receiver,
|
||||||
|
cancel,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn cleanup(&self) -> LemmyResult<()> {
|
||||||
|
self.cancel.cancel();
|
||||||
|
sleep(*WORK_FINISHED_RECHECK_DELAY).await;
|
||||||
|
Instance::delete_all(&mut self.context.pool()).await?;
|
||||||
|
Person::delete(&mut self.context.pool(), self.person.id).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
#[serial]
|
||||||
|
async fn test_stats() -> LemmyResult<()> {
|
||||||
|
let mut data = Data::init().await?;
|
||||||
|
tracing::debug!("hello world");
|
||||||
|
|
||||||
|
// first receive at startup
|
||||||
|
let rcv = data.stats_receiver.recv().await.unwrap();
|
||||||
|
tracing::debug!("received first stats");
|
||||||
|
assert_eq!(data.instance.id, rcv.state.instance_id);
|
||||||
|
assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
|
||||||
|
|
||||||
|
let sent = send_activity(data.person.actor_id.clone(), &data.context).await?;
|
||||||
|
tracing::debug!("sent activity");
|
||||||
|
// receive for successfully sent activity
|
||||||
|
let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
|
||||||
|
let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
|
||||||
|
assert_eq!(&sent.data, parsed_activity.inner());
|
||||||
|
tracing::debug!("received activity");
|
||||||
|
|
||||||
|
let rcv = data.stats_receiver.recv().await.unwrap();
|
||||||
|
assert_eq!(data.instance.id, rcv.state.instance_id);
|
||||||
|
assert_eq!(Some(sent.id), rcv.state.last_successful_id);
|
||||||
|
|
||||||
|
data.cleanup().await?;
|
||||||
|
|
||||||
|
// it also sends state on shutdown
|
||||||
|
let rcv = data.stats_receiver.try_recv();
|
||||||
|
assert!(rcv.is_ok());
|
||||||
|
|
||||||
|
// nothing further received
|
||||||
|
let rcv = data.stats_receiver.try_recv();
|
||||||
|
assert_eq!(Some(TryRecvError::Disconnected), rcv.err());
|
||||||
|
let inbox_rcv = data.inbox_receiver.try_recv();
|
||||||
|
assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[serial]
|
||||||
|
async fn test_update_instance() -> LemmyResult<()> {
|
||||||
|
let mut data = Data::init().await?;
|
||||||
|
|
||||||
|
let form = InstanceForm::builder()
|
||||||
|
.domain(data.instance.domain.clone())
|
||||||
|
.updated(None)
|
||||||
|
.build();
|
||||||
|
Instance::update(&mut data.context.pool(), data.instance.id, form).await?;
|
||||||
|
|
||||||
|
send_activity(data.person.actor_id.clone(), &data.context).await?;
|
||||||
|
data.inbox_receiver.recv().await.unwrap();
|
||||||
|
|
||||||
|
let instance =
|
||||||
|
Instance::read_or_create(&mut data.context.pool(), data.instance.domain.clone()).await?;
|
||||||
|
|
||||||
|
assert!(instance.updated.is_some());
|
||||||
|
|
||||||
|
data.cleanup().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen_activities(
|
||||||
|
inbox_sender: UnboundedSender<String>,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> LemmyResult<()> {
|
||||||
|
let run = HttpServer::new(move || {
|
||||||
|
App::new()
|
||||||
|
.app_data(actix_web::web::Data::new(inbox_sender.clone()))
|
||||||
|
.route(
|
||||||
|
"/inbox",
|
||||||
|
web::post().to(
|
||||||
|
|inbox_sender: actix_web::web::Data<UnboundedSender<String>>, body: String| async move {
|
||||||
|
tracing::debug!("received activity: {:?}", body);
|
||||||
|
inbox_sender.send(body.clone()).unwrap();
|
||||||
|
HttpResponse::new(StatusCode::OK)
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.bind(("127.0.0.1", 8085))?
|
||||||
|
.run();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
select! {
|
||||||
|
_ = run => {},
|
||||||
|
_ = cancel.cancelled() => {}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_activity(actor_id: DbUrl, context: &LemmyContext) -> LemmyResult<SentActivity> {
|
||||||
|
// create outgoing activity
|
||||||
|
let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?;
|
||||||
|
let reader = BufReader::new(file);
|
||||||
|
let form = SentActivityForm {
|
||||||
|
ap_id: Url::parse("http://local.com/activity/1")?.into(),
|
||||||
|
data: serde_json::from_reader(reader)?,
|
||||||
|
sensitive: false,
|
||||||
|
send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())],
|
||||||
|
send_all_instances: false,
|
||||||
|
send_community_followers_of: None,
|
||||||
|
actor_type: ActorType::Person,
|
||||||
|
actor_apub_id: actor_id,
|
||||||
|
};
|
||||||
|
let sent = SentActivity::create(&mut context.pool(), form).await?;
|
||||||
|
|
||||||
|
sleep(*WORK_FINISHED_RECHECK_DELAY * 2).await;
|
||||||
|
|
||||||
|
Ok(sent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ source scripts/start_dev_db.sh
|
||||||
# so to load the config we need to traverse to the repo root
|
# so to load the config we need to traverse to the repo root
|
||||||
export LEMMY_CONFIG_LOCATION=../../config/config.hjson
|
export LEMMY_CONFIG_LOCATION=../../config/config.hjson
|
||||||
export RUST_BACKTRACE=1
|
export RUST_BACKTRACE=1
|
||||||
|
export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min
|
||||||
|
|
||||||
if [ -n "$PACKAGE" ];
|
if [ -n "$PACKAGE" ];
|
||||||
then
|
then
|
||||||
|
|
Loading…
Reference in New Issue