diff --git a/Cargo.lock b/Cargo.lock index a14147e09..3ae9cde90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3113,6 +3113,7 @@ name = "lemmy_federate" version = "0.19.5" dependencies = [ "activitypub_federation", + "actix-web", "anyhow", "chrono", "diesel", @@ -3131,6 +3132,8 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "tracing-test", + "url", ] [[package]] @@ -6336,6 +6339,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.66", +] + [[package]] name = "triomphe" version = "0.1.12" diff --git a/crates/api_common/src/context.rs b/crates/api_common/src/context.rs index f4ac41db1..77d430b20 100644 --- a/crates/api_common/src/context.rs +++ b/crates/api_common/src/context.rs @@ -55,7 +55,7 @@ impl LemmyContext { /// Initialize a context for use in tests which blocks federation network calls. /// /// Do not use this in production code. - pub async fn init_test_context() -> Data { + pub async fn init_test_federation_config() -> FederationConfig { // call this to run migrations let pool = build_db_pool_for_tests().await; @@ -73,11 +73,16 @@ impl LemmyContext { let config = FederationConfig::builder() .domain(context.settings().hostname.clone()) .app_data(context) + .debug(true) // Dont allow any network fetches .http_fetch_limit(0) .build() .await .expect("build federation config"); + return config; + } + pub async fn init_test_context() -> Data { + let config = Self::init_test_federation_config().await; config.to_request_data() } } diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml index 2405d3af0..e63dea443 100644 --- a/crates/federate/Cargo.toml +++ b/crates/federate/Cargo.toml @@ -37,3 +37,6 @@ tokio-util = "0.7.11" [dev-dependencies] serial_test = { workspace = true } +url.workspace = true +actix-web.workspace = true +tracing-test = "0.2.5" diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs index 72e65fdf5..45fce8119 100644 --- a/crates/federate/src/inboxes.rs +++ b/crates/federate/src/inboxes.rs @@ -99,6 +99,11 @@ impl CommunityInboxCollector { .filter(|&u| (u.domain() == Some(&self.domain))) .map(|u| u.inner().clone()), ); + tracing::trace!( + "get_inbox_urls: {:?}, send_inboxes: {:?}", + inbox_urls, + activity.send_inboxes + ); Ok(inbox_urls.into_iter().collect()) } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index ebdda0bfc..13c945525 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -33,8 +33,11 @@ use tokio_util::sync::CancellationToken; /// Save state to db after this time has passed since the last state (so if the server crashes or is /// SIGKILLed, less than X seconds of activities are resent) +#[cfg(not(test))] static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); - +#[cfg(test)] +/// in test mode, we want it to save state and send it to print_stats after every send +static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(0); /// Maximum number of successful sends to allow out of order const MAX_SUCCESSFULS: usize = 1000; @@ -391,3 +394,216 @@ impl InstanceWorker { DbPool::Pool(&self.pool) } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +#[allow(clippy::indexing_slicing)] +mod test { + + use super::*; + use activitypub_federation::{ + http_signatures::generate_actor_keypair, + protocol::context::WithContext, + }; + use actix_web::{web, App, HttpResponse, HttpServer}; + use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url}; + use lemmy_db_schema::{ + newtypes::DbUrl, + source::{ + activity::{ActorType, SentActivity, SentActivityForm}, + person::{Person, PersonInsertForm}, + }, + traits::Crud, + }; + use lemmy_utils::error::LemmyResult; + use reqwest::StatusCode; + use serde_json::Value; + use serial_test::serial; + use std::{fs::File, io::BufReader}; + use tokio::{ + select, + spawn, + sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}, + }; + use tracing_test::traced_test; + use url::Url; + + struct Data { + context: activitypub_federation::config::Data, + instance: Instance, + person: Person, + stats_receiver: UnboundedReceiver, + inbox_receiver: UnboundedReceiver, + cancel: CancellationToken, + } + + impl Data { + async fn init() -> LemmyResult { + let context = LemmyContext::init_test_federation_config().await; + let instance = Instance::read_or_create(&mut context.pool(), "localhost".to_string()).await?; + + let actor_keypair = generate_actor_keypair()?; + let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into(); + let person_form = PersonInsertForm { + actor_id: Some(actor_id.clone()), + private_key: (Some(actor_keypair.private_key)), + inbox_url: Some(generate_inbox_url(&actor_id)?), + shared_inbox_url: Some(generate_shared_inbox_url(context.settings())?), + ..PersonInsertForm::new("alice".to_string(), actor_keypair.public_key, instance.id) + }; + let person = Person::create(&mut context.pool(), &person_form).await?; + + let cancel = CancellationToken::new(); + let (stats_sender, stats_receiver) = unbounded_channel(); + let (inbox_sender, inbox_receiver) = unbounded_channel(); + + // listen for received activities in background + let cancel_ = cancel.clone(); + listen_activities(inbox_sender, cancel_).await?; + + let fed_config = FederationWorkerConfig { + concurrent_sends_per_instance: 1, + }; + spawn(InstanceWorker::init_and_loop( + instance.clone(), + context.clone(), + fed_config, + cancel.clone(), + stats_sender, + )); + // wait for startup + sleep(*WORK_FINISHED_RECHECK_DELAY).await; + + Ok(Self { + context: context.to_request_data(), + instance, + person, + stats_receiver, + inbox_receiver, + cancel, + }) + } + + async fn cleanup(&self) -> LemmyResult<()> { + self.cancel.cancel(); + sleep(*WORK_FINISHED_RECHECK_DELAY).await; + Instance::delete_all(&mut self.context.pool()).await?; + Person::delete(&mut self.context.pool(), self.person.id).await?; + Ok(()) + } + } + + #[tokio::test] + #[traced_test] + #[serial] + async fn test_stats() -> LemmyResult<()> { + let mut data = Data::init().await?; + tracing::debug!("hello world"); + + // first receive at startup + let rcv = data.stats_receiver.recv().await.unwrap(); + tracing::debug!("received first stats"); + assert_eq!(data.instance.id, rcv.state.instance_id); + assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); + + let sent = send_activity(data.person.actor_id.clone(), &data.context).await?; + tracing::debug!("sent activity"); + // receive for successfully sent activity + let inbox_rcv = data.inbox_receiver.recv().await.unwrap(); + let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; + assert_eq!(&sent.data, parsed_activity.inner()); + tracing::debug!("received activity"); + + let rcv = data.stats_receiver.recv().await.unwrap(); + assert_eq!(data.instance.id, rcv.state.instance_id); + assert_eq!(Some(sent.id), rcv.state.last_successful_id); + + data.cleanup().await?; + + // it also sends state on shutdown + let rcv = data.stats_receiver.try_recv(); + assert!(rcv.is_ok()); + + // nothing further received + let rcv = data.stats_receiver.try_recv(); + assert_eq!(Some(TryRecvError::Disconnected), rcv.err()); + let inbox_rcv = data.inbox_receiver.try_recv(); + assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err()); + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_update_instance() -> LemmyResult<()> { + let mut data = Data::init().await?; + + let form = InstanceForm::builder() + .domain(data.instance.domain.clone()) + .updated(None) + .build(); + Instance::update(&mut data.context.pool(), data.instance.id, form).await?; + + send_activity(data.person.actor_id.clone(), &data.context).await?; + data.inbox_receiver.recv().await.unwrap(); + + let instance = + Instance::read_or_create(&mut data.context.pool(), data.instance.domain.clone()).await?; + + assert!(instance.updated.is_some()); + + data.cleanup().await?; + + Ok(()) + } + + async fn listen_activities( + inbox_sender: UnboundedSender, + cancel: CancellationToken, + ) -> LemmyResult<()> { + let run = HttpServer::new(move || { + App::new() + .app_data(actix_web::web::Data::new(inbox_sender.clone())) + .route( + "/inbox", + web::post().to( + |inbox_sender: actix_web::web::Data>, body: String| async move { + tracing::debug!("received activity: {:?}", body); + inbox_sender.send(body.clone()).unwrap(); + HttpResponse::new(StatusCode::OK) + }, + ), + ) + }) + .bind(("127.0.0.1", 8085))? + .run(); + tokio::spawn(async move { + select! { + _ = run => {}, + _ = cancel.cancelled() => {} + } + }); + Ok(()) + } + + async fn send_activity(actor_id: DbUrl, context: &LemmyContext) -> LemmyResult { + // create outgoing activity + let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?; + let reader = BufReader::new(file); + let form = SentActivityForm { + ap_id: Url::parse("http://local.com/activity/1")?.into(), + data: serde_json::from_reader(reader)?, + sensitive: false, + send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())], + send_all_instances: false, + send_community_followers_of: None, + actor_type: ActorType::Person, + actor_apub_id: actor_id, + }; + let sent = SentActivity::create(&mut context.pool(), form).await?; + + sleep(*WORK_FINISHED_RECHECK_DELAY * 2).await; + + Ok(sent) + } +} diff --git a/scripts/test.sh b/scripts/test.sh index 9bb6acaa8..04cc94f9d 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -14,6 +14,7 @@ source scripts/start_dev_db.sh # so to load the config we need to traverse to the repo root export LEMMY_CONFIG_LOCATION=../../config/config.hjson export RUST_BACKTRACE=1 +export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min if [ -n "$PACKAGE" ]; then