From 4f1240487e5c02dbeb2ac255552aacce9a4d16c2 Mon Sep 17 00:00:00 2001 From: Dessalines Date: Tue, 12 Sep 2023 15:54:49 -0400 Subject: [PATCH 01/43] Fixing high CPU usage on federation worker recheck. Fixes #3958 --- crates/federate/src/worker.rs | 9 +++++---- .../down.sql | 2 ++ .../2023-09-12-194850_add_federation_worker_index/up.sql | 2 ++ 3 files changed, 9 insertions(+), 4 deletions(-) create mode 100644 migrations/2023-09-12-194850_add_federation_worker_index/down.sql create mode 100644 migrations/2023-09-12-194850_add_federation_worker_index/up.sql diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index a2bdf33c2..f1fbbb9c3 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -32,10 +32,10 @@ static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(1); static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30); #[cfg(debug_assertions)] static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::Duration::seconds(1)); + Lazy::new(|| chrono::Duration::minutes(1)); #[cfg(not(debug_assertions))] static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::Duration::minutes(1)); + Lazy::new(|| chrono::Duration::minutes(5)); static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::Duration::hours(1)); pub(crate) struct InstanceWorker { @@ -254,7 +254,8 @@ impl InstanceWorker { .send_inboxes .iter() .filter_map(std::option::Option::as_ref) - .filter_map(|u| (u.domain() == Some(&self.instance.domain)).then(|| u.inner().clone())), + .filter(|&u| (u.domain() == Some(&self.instance.domain))) + .map(|u| u.inner().clone()), ); Ok(inbox_urls) } @@ -295,7 +296,7 @@ impl InstanceWorker { .await? .into_iter() .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_insert_with(HashSet::new).insert(u.into()); + map.entry(c).or_default().insert(u.into()); map }), new_last_fetch, diff --git a/migrations/2023-09-12-194850_add_federation_worker_index/down.sql b/migrations/2023-09-12-194850_add_federation_worker_index/down.sql new file mode 100644 index 000000000..a203e8087 --- /dev/null +++ b/migrations/2023-09-12-194850_add_federation_worker_index/down.sql @@ -0,0 +1,2 @@ +DROP INDEX idx_person_local_instance; + diff --git a/migrations/2023-09-12-194850_add_federation_worker_index/up.sql b/migrations/2023-09-12-194850_add_federation_worker_index/up.sql new file mode 100644 index 000000000..bbbab0b1f --- /dev/null +++ b/migrations/2023-09-12-194850_add_federation_worker_index/up.sql @@ -0,0 +1,2 @@ +CREATE INDEX idx_person_local_instance ON person (local DESC, instance_id); + From dca43dcfd9e376ee63e5416a4421477371d8d44c Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 13 Sep 2023 10:54:10 +0000 Subject: [PATCH 02/43] fix a bug where after an hour community follows would be overwritten completely by an incremental upgrade --- crates/federate/src/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index f1fbbb9c3..3bc6218bb 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -264,7 +264,7 @@ impl InstanceWorker { if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { // process removals every hour (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(pool, self.instance.id, self.last_full_communities_fetch) + .get_communities(pool, self.instance.id, Utc.timestamp_nanos(0)) .await?; self.last_incremental_communities_fetch = self.last_full_communities_fetch; } From b09ffa7197cb28c5de72ff0497103f14b20ec558 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 13 Sep 2023 11:20:09 +0000 Subject: [PATCH 03/43] instead of changing fed delays in debug mode, change them via env variable --- api_tests/run-federation-test.sh | 1 + crates/federate/src/worker.rs | 54 ++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index 3042fd344..aca57bc5f 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,6 +2,7 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 +export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min pushd .. cargo build rm target/lemmy_server || true diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 3bc6218bb..b52e4dbbf 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -22,20 +22,47 @@ use std::{ }; use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; -/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt) + +/// Decrease the delays of the federation queue. +/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue. +static LEMMY_TEST_FAST_FEDERATION: Lazy = Lazy::new(|| { + std::env::var("LEMMY_TEST_FAST_FEDERATION") + .map(|s| !s.is_empty()) + .unwrap_or(false) +}); + +/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) +/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; +/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// recheck for new federation work every n seconds -#[cfg(debug_assertions)] -static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(1); -#[cfg(not(debug_assertions))] -static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30); -#[cfg(debug_assertions)] -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::Duration::minutes(1)); -#[cfg(not(debug_assertions))] -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::Duration::minutes(5)); +/// Recheck for new federation work every n seconds. +/// +/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch, +/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table. +/// This delay is only applied if no federated activity happens during sending activities of the last batch. +static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + Duration::from_secs(1) + } else { + Duration::from_secs(30) + } +}); +/// interval with which new additions to community_followers are queried. +/// +/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), +/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. +/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. +/// (see https://github.com/LemmyNet/lemmy/issues/3958) +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + chrono::Duration::seconds(1) + } else { + chrono::Duration::minutes(2) + } +}); +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. +/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::Duration::hours(1)); pub(crate) struct InstanceWorker { @@ -121,6 +148,7 @@ impl InstanceWorker { } Ok(()) } + /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { let latest_id = get_latest_activity_id(pool).await?; if self.state.last_successful_id == -1 { @@ -134,7 +162,7 @@ impl InstanceWorker { if id == latest_id { // no more work to be done, wait before rechecking tokio::select! { - () = sleep(WORK_FINISHED_RECHECK_DELAY) => {}, + () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, () = self.stop.cancelled() => {} } return Ok(()); From c86173577d8c620e403c5002948b6432a08ee3d4 Mon Sep 17 00:00:00 2001 From: phiresky Date: Fri, 15 Sep 2023 15:51:31 +0000 Subject: [PATCH 05/43] fix export location --- api_tests/prepare-drone-federation-test.sh | 2 ++ api_tests/run-federation-test.sh | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/api_tests/prepare-drone-federation-test.sh b/api_tests/prepare-drone-federation-test.sh index ba6dd324b..ac204ed31 100755 --- a/api_tests/prepare-drone-federation-test.sh +++ b/api_tests/prepare-drone-federation-test.sh @@ -6,6 +6,8 @@ set -e export RUST_BACKTRACE=1 export RUST_LOG="warn,lemmy_server=debug,lemmy_federate=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug" +export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min + for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do echo "DB URL: ${LEMMY_DATABASE_URL} INSTANCE: $INSTANCE" psql "${LEMMY_DATABASE_URL}/lemmy" -c "DROP DATABASE IF EXISTS $INSTANCE" diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index aca57bc5f..3042fd344 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,7 +2,6 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 -export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min pushd .. cargo build rm target/lemmy_server || true From 08401fc85fd98b1512909a41462a2970f5afc6cb Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 14:25:35 +0000 Subject: [PATCH 07/43] Revert "remove synchronous federation" This reverts commit 2767ab4a6fed9aa8d197eda0c6a25a1d617d192d. --- api_tests/run-federation-test.sh | 1 + crates/api_common/src/send_activity.rs | 59 ++++++++++++++++++++++---- crates/api_crud/src/post/create.rs | 7 ++- crates/apub/src/activities/mod.rs | 14 +++++- crates/utils/src/lib.rs | 11 +++++ src/lib.rs | 9 +++- 6 files changed, 89 insertions(+), 12 deletions(-) diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index 3042fd344..ff74744a1 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,6 +2,7 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 +export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. pushd .. cargo build rm target/lemmy_server || true diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index 84b2efb2d..897f102fe 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -17,14 +17,22 @@ use lemmy_db_schema::{ }, }; use lemmy_db_views::structs::PrivateMessageView; -use lemmy_utils::error::LemmyResult; -use once_cell::sync::OnceCell; +use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; +use once_cell::sync::{Lazy, OnceCell}; +use tokio::{ + sync::{ + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender}, + Mutex, + }, + task::JoinHandle, +}; use url::Url; type MatchOutgoingActivitiesBoxed = Box fn(SendActivityData, &'a Data) -> BoxFuture<'a, LemmyResult<()>>>; -/// This static is necessary so that the api_common crates don't need to depend on lemmy_apub +/// This static is necessary so that activities can be sent out synchronously for tests and the api_common crates don't need to depend on lemmy_apub pub static MATCH_OUTGOING_ACTIVITIES: OnceCell = OnceCell::new(); #[derive(Debug)] @@ -54,16 +62,51 @@ pub enum SendActivityData { CreateReport(Url, Person, Community, String), } -pub struct ActivityChannel; +// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with +// ctrl+c still works. +static ACTIVITY_CHANNEL: Lazy = Lazy::new(|| { + let (sender, receiver) = mpsc::unbounded_channel(); + let weak_sender = sender.downgrade(); + ActivityChannel { + weak_sender, + receiver: Mutex::new(receiver), + keepalive_sender: Mutex::new(Some(sender)), + } +}); + +pub struct ActivityChannel { + weak_sender: WeakUnboundedSender, + receiver: Mutex>, + keepalive_sender: Mutex>>, +} impl ActivityChannel { + pub async fn retrieve_activity() -> Option { + let mut lock = ACTIVITY_CHANNEL.receiver.lock().await; + lock.recv().await + } + pub async fn submit_activity( data: SendActivityData, context: &Data, ) -> LemmyResult<()> { - MATCH_OUTGOING_ACTIVITIES - .get() - .expect("retrieve function pointer")(data, context) - .await + if *SYNCHRONOUS_FEDERATION { + MATCH_OUTGOING_ACTIVITIES + .get() + .expect("retrieve function pointer")(data, context) + .await?; + } + // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender, + // not sure which way is more efficient + else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { + sender.send(data)?; + } + Ok(()) + } + + pub async fn close(outgoing_activities_task: JoinHandle>) -> LemmyResult<()> { + ACTIVITY_CHANNEL.keepalive_sender.lock().await.take(); + outgoing_activities_task.await??; + Ok(()) } } diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 8cc6ffe62..d0b0f368c 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -37,6 +37,7 @@ use lemmy_utils::{ slurs::{check_slurs, check_slurs_opt}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, }, + SYNCHRONOUS_FEDERATION, }; use tracing::Instrument; use url::Url; @@ -189,7 +190,11 @@ pub async fn create_post( Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), } }; - spawn_try_task(task); + if *SYNCHRONOUS_FEDERATION { + task.await?; + } else { + spawn_try_task(task); + } }; build_post_response(&context, community_id, person_id, post_id).await diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 29ec5bd30..53adc78df 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -45,6 +45,7 @@ use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_utils::{ error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}, spawn_try_task, + SYNCHRONOUS_FEDERATION, }; use serde::Serialize; use std::{ops::Deref, time::Duration}; @@ -220,6 +221,13 @@ where Ok(()) } +pub async fn handle_outgoing_activities(context: Data) -> LemmyResult<()> { + while let Some(data) = ActivityChannel::retrieve_activity().await { + match_outgoing_activities(data, &context.reset_request_count()).await? + } + Ok(()) +} + pub async fn match_outgoing_activities( data: SendActivityData, context: &Data, @@ -324,6 +332,10 @@ pub async fn match_outgoing_activities( } } }; - spawn_try_task(fed_task); + if *SYNCHRONOUS_FEDERATION { + fed_task.await?; + } else { + spawn_try_task(fed_task); + } Ok(()) } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index c0553de31..1ef8a842c 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -18,6 +18,7 @@ pub mod version; use error::LemmyError; use futures::Future; +use once_cell::sync::Lazy; use std::time::Duration; use tracing::Instrument; @@ -37,6 +38,16 @@ macro_rules! location_info { }; } +/// if true, all federation should happen synchronously. useful for debugging and testing. +/// defaults to true on debug mode, false on releasemode +/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1 +/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION="" +pub static SYNCHRONOUS_FEDERATION: Lazy = Lazy::new(|| { + std::env::var("LEMMY_SYNCHRONOUS_FEDERATION") + .map(|s| !s.is_empty()) + .unwrap_or(cfg!(debug_assertions)) +}); + /// tokio::spawn, but accepts a future that may fail and also /// * logs errors /// * attaches the spawned task to the tracing span of the caller for better logging diff --git a/src/lib.rs b/src/lib.rs index e1c6d1fae..9ce1bfa00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,14 +28,14 @@ use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, request::build_user_agent, - send_activity::MATCH_OUTGOING_ACTIVITIES, + send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES}, utils::{ check_private_instance_and_federation_enabled, local_site_rate_limit_to_rate_limit_config, }, }; use lemmy_apub::{ - activities::match_outgoing_activities, + activities::{handle_outgoing_activities, match_outgoing_activities}, VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT, }; @@ -203,6 +203,8 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { Box::pin(match_outgoing_activities(d, c)) })) .expect("set function pointer"); + let request_data = federation_config.to_request_data(); + let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); let server = if args.http_server { Some(create_http_server( @@ -245,6 +247,9 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { federate.cancel().await?; } + // Wait for outgoing apub sends to complete + ActivityChannel::close(outgoing_activities_task).await?; + Ok(()) } From 27141553b6d17925050bf1e16ac1a8c1ea1afeb5 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 14:28:49 +0000 Subject: [PATCH 08/43] fix after revert --- api_tests/prepare-drone-federation-test.sh | 23 +++++++++++----------- api_tests/run-federation-test.sh | 1 - crates/apub/src/activities/mod.rs | 5 ++++- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/api_tests/prepare-drone-federation-test.sh b/api_tests/prepare-drone-federation-test.sh index ac204ed31..fc19ee8c8 100755 --- a/api_tests/prepare-drone-federation-test.sh +++ b/api_tests/prepare-drone-federation-test.sh @@ -6,7 +6,8 @@ set -e export RUST_BACKTRACE=1 export RUST_LOG="warn,lemmy_server=debug,lemmy_federate=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug" -export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min +export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. +export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do echo "DB URL: ${LEMMY_DATABASE_URL} INSTANCE: $INSTANCE" @@ -36,30 +37,30 @@ echo "$PWD" echo "start alpha" LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_alpha.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_alpha" \ -target/lemmy_server >/tmp/lemmy_alpha.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_alpha" \ + target/lemmy_server >/tmp/lemmy_alpha.out 2>&1 & echo "start beta" LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_beta.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_beta" \ -target/lemmy_server >/tmp/lemmy_beta.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_beta" \ + target/lemmy_server >/tmp/lemmy_beta.out 2>&1 & echo "start gamma" LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_gamma.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_gamma" \ -target/lemmy_server >/tmp/lemmy_gamma.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_gamma" \ + target/lemmy_server >/tmp/lemmy_gamma.out 2>&1 & echo "start delta" # An instance with only an allowlist for beta LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_delta.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_delta" \ -target/lemmy_server >/tmp/lemmy_delta.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_delta" \ + target/lemmy_server >/tmp/lemmy_delta.out 2>&1 & echo "start epsilon" # An instance who has a blocklist, with lemmy-alpha blocked LEMMY_CONFIG_LOCATION=./docker/federation/lemmy_epsilon.hjson \ -LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_epsilon" \ -target/lemmy_server >/tmp/lemmy_epsilon.out 2>&1 & + LEMMY_DATABASE_URL="${LEMMY_DATABASE_URL}/lemmy_epsilon" \ + target/lemmy_server >/tmp/lemmy_epsilon.out 2>&1 & echo "wait for all instances to start" while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'lemmy-alpha:8541/api/v3/site')" != "200" ]]; do sleep 1; done diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index ff74744a1..3042fd344 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,7 +2,6 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 -export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. pushd .. cargo build rm target/lemmy_server || true diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 53adc78df..81bf806a6 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -33,7 +33,10 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use anyhow::anyhow; -use lemmy_api_common::{context::LemmyContext, send_activity::SendActivityData}; +use lemmy_api_common::{ + context::LemmyContext, + send_activity::{ActivityChannel, SendActivityData}, +}; use lemmy_db_schema::{ newtypes::CommunityId, source::{ From 943b960c32a4b56c38c81f3112bfa12603c2d184 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 16:29:08 +0000 Subject: [PATCH 10/43] fix waits after all follow actions --- api_tests/src/comment.spec.ts | 13 ++++--------- api_tests/src/community.spec.ts | 10 ---------- api_tests/src/post.spec.ts | 2 -- api_tests/src/shared.ts | 25 +++++++++++++++++-------- 4 files changed, 21 insertions(+), 29 deletions(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 000c0b0ab..9011fb8f1 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -42,10 +42,7 @@ let postOnAlphaRes: PostResponse; beforeAll(async () => { await setupLogins(); await unfollows(); - await followBeta(alpha); - await followBeta(gamma); - // wait for FOLLOW_ADDITIONS_RECHECK_DELAY - await delay(2000); + await Promise.all([followBeta(alpha), followBeta(gamma)]); let betaCommunity = (await resolveBetaCommunity(alpha)).community; if (betaCommunity) { postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); @@ -560,8 +557,6 @@ test("Check that activity from another instance is sent to third instance", asyn () => resolveBetaCommunity(gamma), c => c.community?.subscribed === "Subscribed", ); - // FOLLOW_ADDITIONS_RECHECK_DELAY - await delay(2000); // Create a post on beta let betaPost = await createPost(beta, 2); @@ -607,8 +602,7 @@ test("Check that activity from another instance is sent to third instance", asyn commentRes.comment_view, ); - await unfollowRemotes(alpha); - await unfollowRemotes(gamma); + await Promise.all([unfollowRemotes(alpha), unfollowRemotes(gamma)]); }); test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedded comments, A subs to B, B updates the lowest level comment, A fetches both the post and all the inreplyto comments for that post.", async () => { @@ -671,7 +665,8 @@ test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde () => getComments(alpha, alphaPostB!.post.id), c => c.comments[1]?.comment.content === - parentCommentRes.comment_view.comment.content, + parentCommentRes.comment_view.comment.content && + c.comments[0]?.comment.content === updateRes.comment_view.comment.content, ); expect(alphaPost.post_view.post.name).toBeDefined(); assertCommentFederation( diff --git a/api_tests/src/community.spec.ts b/api_tests/src/community.spec.ts index a2af84440..c078cd9b0 100644 --- a/api_tests/src/community.spec.ts +++ b/api_tests/src/community.spec.ts @@ -87,12 +87,6 @@ test("Delete community", async () => { // Make sure the follow response went through expect(follow.community_view.community.local).toBe(false); - await waitUntil( - () => resolveCommunity(alpha, searchShort), - g => g.community?.subscribed === "Subscribed", - ); - // wait FOLLOW_ADDITIONS_RECHECK_DELAY - await delay(2000); let deleteCommunityRes = await deleteCommunity( beta, true, @@ -145,10 +139,6 @@ test("Remove community", async () => { // Make sure the follow response went through expect(follow.community_view.community.local).toBe(false); - await waitUntil( - () => resolveCommunity(alpha, searchShort), - g => g.community?.subscribed === "Subscribed", - ); let removeCommunityRes = await removeCommunity( beta, true, diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index cd3eec71b..1696ef998 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -220,8 +220,6 @@ test("Lock a post", async () => { () => resolveBetaCommunity(alpha), c => c.community?.subscribed === "Subscribed", ); - // wait FOLLOW_ADDITIONS_RECHECK_DELAY (there's no API to wait for this currently) - await delay(2_000); let postRes = await createPost(alpha, betaCommunity.community.id); // wait for federation diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index d116d329f..7e6124025 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -447,7 +447,14 @@ export async function followCommunity( follow, auth: api.auth, }; - return api.client.followCommunity(form); + const res = await api.client.followCommunity(form); + await waitUntil( + () => resolveCommunity(api, res.community_view.community.actor_id), + g => g.community?.subscribed === (follow ? "Subscribed" : "NotSubscribed"), + ); + // wait FOLLOW_ADDITIONS_RECHECK_DELAY (there's no API to wait for this currently) + await delay(2000); + return res; } export async function likePost( @@ -745,9 +752,9 @@ export async function unfollowRemotes(api: API): Promise { let site = await getSite(api); let remoteFollowed = site.my_user?.follows.filter(c => c.community.local == false) ?? []; - for (let cu of remoteFollowed) { - await followCommunity(api, false, cu.community.id); - } + await Promise.all( + remoteFollowed.map(cu => followCommunity(api, false, cu.community.id)), + ); let siteRes = await getSite(api); return siteRes; } @@ -841,10 +848,12 @@ export function randomString(length: number): string { } export async function unfollows() { - await unfollowRemotes(alpha); - await unfollowRemotes(gamma); - await unfollowRemotes(delta); - await unfollowRemotes(epsilon); + await Promise.all([ + unfollowRemotes(alpha), + unfollowRemotes(gamma), + unfollowRemotes(delta), + unfollowRemotes(epsilon), + ]); } export function getCommentParentId(comment: Comment): number | undefined { From 9e886fba4a93139bdd10fd1b85d7288615f0455d Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 16:49:55 +0000 Subject: [PATCH 11/43] delay shorter --- api_tests/src/post.spec.ts | 12 +++++++++--- api_tests/src/shared.ts | 6 ++++-- crates/federate/src/util.rs | 26 +++++++++++++++++++++++++- crates/federate/src/worker.rs | 29 ++++++++--------------------- 4 files changed, 46 insertions(+), 27 deletions(-) diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 1696ef998..b2d07c291 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -279,8 +279,9 @@ test("Delete a post", async () => { // Make sure lemmy beta sees post is deleted // This will be undefined because of the tombstone - await expect(resolvePost(beta, postRes.post_view.post)).rejects.toBe( - "couldnt_find_object", + await waitUntil( + () => resolvePost(beta, postRes.post_view.post).catch(e => e), + e => e === "couldnt_find_object", ); // Undelete @@ -288,7 +289,12 @@ test("Delete a post", async () => { expect(undeletedPost.post_view.post.deleted).toBe(false); // Make sure lemmy beta sees post is undeleted - let betaPost2 = (await resolvePost(beta, postRes.post_view.post)).post; + let betaPost2 = ( + await waitUntil( + () => resolvePost(beta, postRes.post_view.post).catch(e => e), + e => e !== "couldnt_find_object", + ) + ).post; if (!betaPost2) { throw "Missing beta post 2"; } diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index 7e6124025..6898221f3 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -872,13 +872,15 @@ export async function waitUntil( fetcher: () => Promise, checker: (t: T) => boolean, retries = 10, - delaySeconds = 2, + delaySeconds = [0.2, 0.5, 1, 2, 3], ) { let retry = 0; while (retry++ < retries) { const result = await fetcher(); if (checker(result)) return result; - await delay(delaySeconds * 1000); + await delay( + delaySeconds[Math.min(retry - 1, delaySeconds.length - 1)] * 1000, + ); } throw Error( `Failed "${fetcher}": "${checker}" did not return true after ${retries} retries (delayed ${delaySeconds}s each)`, diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 4f260708d..f00ccadb3 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -31,6 +31,26 @@ use std::{ use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; +/// Decrease the delays of the federation queue. +/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue. +pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy = Lazy::new(|| { + std::env::var("LEMMY_TEST_FAST_FEDERATION") + .map(|s| !s.is_empty()) + .unwrap_or(false) +}); +/// Recheck for new federation work every n seconds. +/// +/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch, +/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table. +/// This delay is only applied if no federated activity happens during sending activities of the last batch. +pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + Duration::from_millis(100) + } else { + Duration::from_secs(30) + } +}); + pub struct CancellableTask { f: Pin> + Send + 'static>>, ended: Arc>, @@ -162,7 +182,11 @@ pub(crate) async fn get_activity_cached( pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result { static CACHE: Lazy> = Lazy::new(|| { Cache::builder() - .time_to_live(Duration::from_secs(1)) + .time_to_live(if *LEMMY_TEST_FAST_FEDERATION { + *WORK_FINISHED_RECHECK_DELAY + } else { + Duration::from_secs(1) + }) .build() }); CACHE diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index b52e4dbbf..b6e174c04 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,6 +1,13 @@ use crate::{ federation_queue_state::FederationQueueState, - util::{get_activity_cached, get_actor_cached, get_latest_activity_id, retry_sleep_duration}, + util::{ + get_activity_cached, + get_actor_cached, + get_latest_activity_id, + retry_sleep_duration, + LEMMY_TEST_FAST_FEDERATION, + WORK_FINISHED_RECHECK_DELAY, + }, }; use activitypub_federation::{activity_sending::SendActivityTask, config::Data}; use anyhow::{Context, Result}; @@ -23,31 +30,11 @@ use std::{ use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; -/// Decrease the delays of the federation queue. -/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue. -static LEMMY_TEST_FAST_FEDERATION: Lazy = Lazy::new(|| { - std::env::var("LEMMY_TEST_FAST_FEDERATION") - .map(|s| !s.is_empty()) - .unwrap_or(false) -}); - /// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) /// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// Recheck for new federation work every n seconds. -/// -/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch, -/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table. -/// This delay is only applied if no federated activity happens during sending activities of the last batch. -static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { - if *LEMMY_TEST_FAST_FEDERATION { - Duration::from_secs(1) - } else { - Duration::from_secs(30) - } -}); /// interval with which new additions to community_followers are queried. /// /// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), From 954b3f443d18bca428f01c2a24447b312851420d Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 17:04:46 +0000 Subject: [PATCH 12/43] fix wait --- api_tests/src/comment.spec.ts | 2 +- api_tests/src/post.spec.ts | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 9011fb8f1..2314205d1 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -511,7 +511,7 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t // Make sure alpha sees it let alphaPostComments2 = await waitUntil( () => getComments(alpha, alphaPost.post_view.post.id), - e => !!e.comments[0], + e => e.comments[0]?.counts.score === 1, ); expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent); expect(alphaPostComments2.comments[0].community.local).toBe(true); diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index b2d07c291..89dfaa35f 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -365,16 +365,22 @@ test("Remove a post from admin and community on same instance", async () => { expect(removePostRes.post_view.post.removed).toBe(true); // Make sure lemmy alpha sees post is removed - // let alphaPost = await getPost(alpha, postRes.post_view.post.id); - // expect(alphaPost.post_view.post.removed).toBe(true); // TODO this shouldn't be commented - // assertPostFederation(alphaPost.post_view, removePostRes.post_view); + let alphaPost = await waitUntil( + () => getPost(alpha, postRes.post_view.post.id), + p => p.post_view.post.removed, + ); + expect(alphaPost.post_view.post.removed).toBe(true); + assertPostFederation(alphaPost.post_view, removePostRes.post_view); // Undelete let undeletedPost = await removePost(beta, false, betaPost.post); expect(undeletedPost.post_view.post.removed).toBe(false); // Make sure lemmy alpha sees post is undeleted - let alphaPost2 = await getPost(alpha, postRes.post_view.post.id); + let alphaPost2 = await waitUntil( + () => getPost(alpha, postRes.post_view.post.id), + p => !p.post_view.post.removed, + ); expect(alphaPost2.post_view.post.removed).toBe(false); assertPostFederation(alphaPost2.post_view, undeletedPost.post_view); await unfollowRemotes(alpha); From 007f4f53198938907e4e64f6052a0e8a51391070 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 17:30:02 +0000 Subject: [PATCH 13/43] wait on score --- api_tests/src/comment.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 2314205d1..9685d654f 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -591,7 +591,7 @@ test("Check that activity from another instance is sent to third instance", asyn // Make sure alpha sees it let alphaPostComments2 = await waitUntil( () => getComments(alpha, alphaPost!.post.id), - e => !!e.comments[0], + e => e.comments[0]?.counts.score === 1, ); expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent); expect(alphaPostComments2.comments[0].community.local).toBe(false); From d58e2e9db7205f0e035e66ffbd3db4ef444471a5 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 18:09:18 +0000 Subject: [PATCH 14/43] minor test improvement --- api_tests/src/comment.spec.ts | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 9685d654f..214988198 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -36,14 +36,16 @@ import { delay, } from "./shared"; import { CommentView } from "lemmy-js-client/dist/types/CommentView"; +import { CommunityView } from "lemmy-js-client"; +let betaCommunity: CommunityView | undefined; let postOnAlphaRes: PostResponse; beforeAll(async () => { await setupLogins(); await unfollows(); await Promise.all([followBeta(alpha), followBeta(gamma)]); - let betaCommunity = (await resolveBetaCommunity(alpha)).community; + betaCommunity = (await resolveBetaCommunity(alpha)).community; if (betaCommunity) { postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); } @@ -397,7 +399,7 @@ test("Reply to a comment from another instance, get notification", async () => { () => getUnreadCount(alpha), e => e.replies >= 1, ); - expect(alphaUnreadCountRes.replies).toBe(1); + expect(alphaUnreadCountRes.replies).toBeGreaterThanOrEqual(1); // check inbox of replies on alpha, fetching read/unread both let alphaRepliesRes = await getReplies(alpha); @@ -414,6 +416,8 @@ test("Reply to a comment from another instance, get notification", async () => { }); test("Mention beta from alpha", async () => { + if (!betaCommunity) throw Error("no community"); + const postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); // Create a new branch, trunk-level comment branch, from alpha instance let commentRes = await createComment(alpha, postOnAlphaRes.post_view.post.id); // Create a reply comment to previous comment, this has a mention in body @@ -440,9 +444,9 @@ test("Mention beta from alpha", async () => { // Make sure that both new comments are seen on beta and have parent/child relationship let betaPostComments = await waitUntil( () => getComments(beta, betaPost!.post.id), - c => c.comments[1].counts.score === 1, + c => c.comments[1]?.counts.score === 1, ); - expect(betaPostComments.comments.length).toBeGreaterThanOrEqual(2); + expect(betaPostComments.comments.length).toEqual(2); // the trunk-branch root comment will be older than the mention reply comment, so index 1 let betaRootComment = betaPostComments.comments[1]; // the trunk-branch root comment should not have a parent @@ -700,16 +704,17 @@ test("Report a comment", async () => { throw "Missing alpha comment"; } - let alphaReport = ( - await reportComment(alpha, alphaComment.id, randomString(10)) - ).comment_report_view.comment_report; + const reason = randomString(10); + let alphaReport = (await reportComment(alpha, alphaComment.id, reason)) + .comment_report_view.comment_report; - let betaReport = ( - await waitUntil( - () => listCommentReports(beta), - e => !!e.comment_reports[0], - ) - ).comment_reports[0].comment_report; + let betaReport = (await waitUntil( + () => + listCommentReports(beta).then(r => + r.comment_reports.find(rep => rep.comment_report.reason === reason), + ), + e => !!e, + ))!.comment_report; expect(betaReport).toBeDefined(); expect(betaReport.resolved).toBe(false); expect(betaReport.original_comment_text).toBe( From 9868065089dd0ff7751ae73dc9f12a5409011e28 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 18:23:55 +0000 Subject: [PATCH 15/43] minor test improvement --- api_tests/src/comment.spec.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 214988198..36bf5809e 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -374,16 +374,17 @@ test("Reply to a comment from another instance, get notification", async () => { expect(replyRes.comment_view.counts.score).toBe(1); // Make sure that reply comment is seen on alpha - // TODO not sure why, but a searchComment back to alpha, for the ap_id of betas - // comment, isn't working. - // let searchAlpha = await searchComment(alpha, replyRes.comment); + let commentSearch = await waitUntil( + () => resolveComment(alpha, replyRes.comment_view.comment), + c => c.comment?.counts.score === 1, + ); + let alphaComment = commentSearch.comment!; let postComments = await waitUntil( () => getComments(alpha, postOnAlphaRes.post_view.post.id), pc => pc.comments.length >= 2, ); // Note: this test fails when run twice and this count will differ expect(postComments.comments.length).toBeGreaterThanOrEqual(2); - let alphaComment = postComments.comments[0]; expect(alphaComment.comment.content).toBeDefined(); expect(getCommentParentId(alphaComment.comment)).toBe( From 2f0ad53b60e773c194716e8b122b53f7a22b2a65 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 18:39:09 +0000 Subject: [PATCH 16/43] wait longer for fed init --- api_tests/src/shared.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index 6898221f3..9d2bd3e02 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -205,7 +205,7 @@ export async function setupLogins() { // otherwise the first few federated events may be missed // (because last_successful_id is set to current id when federation to an instance is first started) // only needed the first time so do in this try - await delay(6_000); + await delay(10_000); } catch (_) { console.log("Communities already exist"); } From 7dd857e00ba99bca8b8cf4ea9d961b859c26a10f Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 19:19:40 +0000 Subject: [PATCH 17/43] log result --- api_tests/src/shared.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index 9d2bd3e02..df5d299c6 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -875,13 +875,15 @@ export async function waitUntil( delaySeconds = [0.2, 0.5, 1, 2, 3], ) { let retry = 0; + let result; while (retry++ < retries) { - const result = await fetcher(); + result = await fetcher(); if (checker(result)) return result; await delay( delaySeconds[Math.min(retry - 1, delaySeconds.length - 1)] * 1000, ); } + console.error("result", result); throw Error( `Failed "${fetcher}": "${checker}" did not return true after ${retries} retries (delayed ${delaySeconds}s each)`, ); From dff54d5e390e460767fe2bbbdd8dad66cb48fc83 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 19:58:20 +0000 Subject: [PATCH 18/43] tweak tests more --- api_tests/src/comment.spec.ts | 25 +++++++++++++-------- api_tests/src/community.spec.ts | 2 +- api_tests/src/post.spec.ts | 39 +++++++++++++++++++-------------- api_tests/src/shared.ts | 3 ++- 4 files changed, 42 insertions(+), 27 deletions(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 36bf5809e..3c8578d80 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -342,6 +342,8 @@ test("Federated comment like", async () => { }); test("Reply to a comment from another instance, get notification", async () => { + await alpha.client.markAllAsRead({ auth: alpha.auth }); + let betaCommunity = (await resolveBetaCommunity(alpha)).community; if (!betaCommunity) { throw "Missing beta community"; @@ -404,16 +406,20 @@ test("Reply to a comment from another instance, get notification", async () => { // check inbox of replies on alpha, fetching read/unread both let alphaRepliesRes = await getReplies(alpha); - expect(alphaRepliesRes.replies.length).toBe(1); - expect(alphaRepliesRes.replies[0].comment.content).toBeDefined(); - expect(alphaRepliesRes.replies[0].community.local).toBe(false); - expect(alphaRepliesRes.replies[0].creator.local).toBe(false); - expect(alphaRepliesRes.replies[0].counts.score).toBe(1); + const alphaReply = alphaRepliesRes.replies.find( + r => r.comment.id === alphaComment.comment.id, + ); + expect(alphaReply).toBeDefined(); + if (!alphaReply) throw Error(); + expect(alphaReply.comment.content).toBeDefined(); + expect(alphaReply.community.local).toBe(false); + expect(alphaReply.creator.local).toBe(false); + expect(alphaReply.counts.score).toBe(1); // ToDo: interesting alphaRepliesRes.replies[0].comment_reply.id is 1, meaning? how did that come about? - expect(alphaRepliesRes.replies[0].comment.id).toBe(alphaComment.comment.id); + expect(alphaReply.comment.id).toBe(alphaComment.comment.id); // this is a new notification, getReplies fetch was for read/unread both, confirm it is unread. - expect(alphaRepliesRes.replies[0].comment_reply.read).toBe(false); - assertCommentFederation(alphaRepliesRes.replies[0], replyRes.comment_view); + expect(alphaReply.comment_reply.read).toBe(false); + assertCommentFederation(alphaReply, replyRes.comment_view); }); test("Mention beta from alpha", async () => { @@ -494,7 +500,8 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t expect(alphaPost.post_view.community.local).toBe(true); // Make sure gamma sees it - let gammaPost = (await resolvePost(gamma, alphaPost.post_view.post)).post; + let gammaPost = (await resolvePost(gamma, alphaPost.post_view.post, false))! + .post; if (!gammaPost) { throw "Missing gamma post"; diff --git a/api_tests/src/community.spec.ts b/api_tests/src/community.spec.ts index c078cd9b0..a6ecd1ab8 100644 --- a/api_tests/src/community.spec.ts +++ b/api_tests/src/community.spec.ts @@ -237,7 +237,7 @@ test("Admin actions in remote community are not federated to origin", async () = expect(banRes.banned).toBe(true); // ban doesnt federate to community's origin instance alpha - let alphaPost = (await resolvePost(alpha, gammaPost.post)).post; + let alphaPost = (await resolvePost(alpha, gammaPost.post, false)).post; expect(alphaPost?.creator_banned_from_community).toBe(false); // and neither to gamma diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 89dfaa35f..1edabdcc8 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -83,10 +83,10 @@ test("Create a post", async () => { // Make sure that post is liked on beta const res = await waitUntil( - () => resolvePost(beta, postRes.post_view.post), - res => res.post?.counts.score === 1, + () => resolvePost(beta, postRes.post_view.post).catch(e => null), + res => res?.post?.counts.score === 1, ); - let betaPost = res.post; + let betaPost = res?.post; expect(betaPost).toBeDefined(); expect(betaPost?.community.local).toBe(true); @@ -177,7 +177,7 @@ test("Sticky a post", async () => { } let postRes = await createPost(alpha, betaCommunity.community.id); - let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post; + let betaPost1 = (await resolvePost(beta, postRes.post_view.post, false)).post; if (!betaPost1) { throw "Missing beta post1"; } @@ -201,7 +201,8 @@ test("Sticky a post", async () => { expect(betaPost2?.post.featured_community).toBe(false); // Make sure that gamma cannot sticky the post on beta - let gammaPost = (await resolvePost(gamma, postRes.post_view.post)).post; + let gammaPost = (await resolvePost(gamma, postRes.post_view.post, false)) + .post; if (!gammaPost) { throw "Missing gamma post"; } @@ -320,7 +321,8 @@ test("Remove a post from admin and community on different instance", async () => } let postRes = await createPost(gamma, gammaCommunity.id); - let alphaPost = (await resolvePost(alpha, postRes.post_view.post)).post; + let alphaPost = (await resolvePost(alpha, postRes.post_view.post, false)) + .post; if (!alphaPost) { throw "Missing alpha post"; } @@ -329,7 +331,7 @@ test("Remove a post from admin and community on different instance", async () => expect(removedPost.post_view.post.name).toBe(postRes.post_view.post.name); // Make sure lemmy beta sees post is NOT removed - let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; + let betaPost = (await resolvePost(beta, postRes.post_view.post, false)).post; if (!betaPost) { throw "Missing beta post"; } @@ -533,7 +535,7 @@ test("A and G subscribe to B (center) A posts, it gets announced to G", async () let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); - let betaPost = (await resolvePost(gamma, postRes.post_view.post)).post; + let betaPost = (await resolvePost(gamma, postRes.post_view.post, false)).post; expect(betaPost?.post.name).toBeDefined(); }); @@ -546,7 +548,8 @@ test("Report a post", async () => { let postRes = await createPost(beta, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); - let alphaPost = (await resolvePost(alpha, postRes.post_view.post)).post; + let alphaPost = (await resolvePost(alpha, postRes.post_view.post, false)) + .post; if (!alphaPost) { throw "Missing alpha post"; } @@ -554,12 +557,16 @@ test("Report a post", async () => { await reportPost(alpha, alphaPost.post.id, randomString(10)) ).post_report_view.post_report; - let betaReport = ( - await waitUntil( - () => listPostReports(beta), - res => !!res.post_reports[0], - ) - ).post_reports[0].post_report; + let betaReport = (await waitUntil( + () => + listPostReports(beta).then(p => + p.post_reports.find( + r => + r.post_report.original_post_name === alphaReport.original_post_name, + ), + ), + res => !!res, + ))!.post_report; expect(betaReport).toBeDefined(); expect(betaReport.resolved).toBe(false); expect(betaReport.original_post_name).toBe(alphaReport.original_post_name); @@ -588,7 +595,7 @@ test("Sanitize HTML", async () => { "<script>alert('xss');</script> hello &"'", ); - let alphaPost = (await resolvePost(alpha, post.post_view.post)).post; + let alphaPost = (await resolvePost(alpha, post.post_view.post, false)).post; // second escaping over federation, avoid double escape of & expect(alphaPost?.post.body).toBe( "<script>alert('xss');</script> hello &"'", diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index df5d299c6..b3f9f637f 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -296,10 +296,11 @@ export async function lockPost( export async function resolvePost( api: API, post: Post, + localOnly = true, ): Promise { let form: ResolveObject = { q: post.ap_id, - auth: api.auth, + auth: localOnly ? null : api.auth, }; return api.client.resolveObject(form); } From afde8e757681d37309795bf4cccdc9500a40ffc9 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 20:31:12 +0000 Subject: [PATCH 19/43] tweak more tests --- api_tests/src/comment.spec.ts | 5 ++++- api_tests/src/post.spec.ts | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 3c8578d80..24822d1d1 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -468,7 +468,10 @@ test("Mention beta from alpha", async () => { expect(betaRootComment.counts.score).toBe(1); assertCommentFederation(betaRootComment, commentRes.comment_view); - let mentionsRes = await getMentions(beta); + let mentionsRes = await waitUntil( + () => getMentions(beta), + m => !!m.mentions[0], + ); expect(mentionsRes.mentions[0].comment.content).toBeDefined(); expect(mentionsRes.mentions[0].community.local).toBe(true); expect(mentionsRes.mentions[0].creator.local).toBe(false); diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 1edabdcc8..228925d06 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -287,7 +287,6 @@ test("Delete a post", async () => { // Undelete let undeletedPost = await deletePost(alpha, false, postRes.post_view.post); - expect(undeletedPost.post_view.post.deleted).toBe(false); // Make sure lemmy beta sees post is undeleted let betaPost2 = ( From 990445bf8465aee7f1bf77fcad5d53ee2a131ecb Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 18 Sep 2023 21:08:17 +0000 Subject: [PATCH 21/43] tmp tail lemmy log --- .woodpecker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.woodpecker.yml b/.woodpecker.yml index 49fbe3db2..c180d8d58 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -204,7 +204,7 @@ steps: - bash api_tests/prepare-drone-federation-test.sh - cd api_tests/ - yarn - - yarn api-test + - tail -f /tmp/lemmy*.out & yarn api-test when: *slow_check_paths rebuild-cache: From 2e7d2d1956744acc3b0a444f1ebec848c9808adc Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 10:46:25 +0000 Subject: [PATCH 22/43] wait for post api test function, better announce activity id --- api_tests/src/post.spec.ts | 118 ++++++------------ api_tests/src/shared.ts | 13 ++ .../apub/src/activities/community/announce.rs | 13 +- crates/apub/src/activities/mod.rs | 17 ++- 4 files changed, 79 insertions(+), 82 deletions(-) diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 228925d06..54faa1714 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -35,7 +35,7 @@ import { unfollows, resolveCommunity, waitUntil, - delay, + waitForPost, } from "./shared"; import { PostView } from "lemmy-js-client/dist/types/PostView"; import { CreatePost } from "lemmy-js-client/dist/types/CreatePost"; @@ -82,11 +82,11 @@ test("Create a post", async () => { expect(postRes.post_view.counts.score).toBe(1); // Make sure that post is liked on beta - const res = await waitUntil( - () => resolvePost(beta, postRes.post_view.post).catch(e => null), - res => res?.post?.counts.score === 1, + const betaPost = await waitForPost( + beta, + postRes.post_view.post, + res => res?.counts.score === 1, ); - let betaPost = res?.post; expect(betaPost).toBeDefined(); expect(betaPost?.community.local).toBe(true); @@ -122,12 +122,12 @@ test("Unlike a post", async () => { expect(unlike2.post_view.counts.score).toBe(0); // Make sure that post is unliked on beta - const betaPost = ( - await waitUntil( - () => resolvePost(beta, postRes.post_view.post), - b => b.post?.counts.score === 0, - ) - ).post; + const betaPost = await waitForPost( + beta, + postRes.post_view.post, + post => post.counts.score === 0, + ); + expect(betaPost).toBeDefined(); expect(betaPost?.community.local).toBe(true); expect(betaPost?.creator.local).toBe(false); @@ -140,26 +140,16 @@ test("Update a post", async () => { throw "Missing beta community"; } let postRes = await createPost(alpha, betaCommunity.community.id); - await waitUntil( - () => resolvePost(beta, postRes.post_view.post), - res => !!res.post, - ); + await waitForPost(beta, postRes.post_view.post); let updatedName = "A jest test federated post, updated"; let updatedPost = await editPost(alpha, postRes.post_view.post); - await waitUntil( - () => resolvePost(beta, postRes.post_view.post), - res => res.post?.post.name === updatedName, - ); expect(updatedPost.post_view.post.name).toBe(updatedName); expect(updatedPost.post_view.community.local).toBe(false); expect(updatedPost.post_view.creator.local).toBe(true); // Make sure that post is updated on beta - let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; - if (!betaPost) { - throw "Missing beta post"; - } + let betaPost = await waitForPost(beta, updatedPost.post_view.post); expect(betaPost.community.local).toBe(true); expect(betaPost.creator.local).toBe(false); expect(betaPost.post.name).toBe(updatedName); @@ -223,26 +213,17 @@ test("Lock a post", async () => { ); let postRes = await createPost(alpha, betaCommunity.community.id); - // wait for federation - await waitUntil( - () => searchPostLocal(beta, postRes.post_view.post), - res => !!res.posts[0], - ); + let betaPost1 = await waitForPost(beta, postRes.post_view.post); // Lock the post - let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post; - if (!betaPost1) { - throw "Missing beta post1"; - } let lockedPostRes = await lockPost(beta, true, betaPost1.post); expect(lockedPostRes.post_view.post.locked).toBe(true); // Make sure that post is locked on alpha - let searchAlpha = await waitUntil( - () => searchPostLocal(alpha, postRes.post_view.post), - res => res.posts[0]?.post.locked, + let alphaPost1 = await waitForPost( + alpha, + postRes.post_view.post, + post => post.post.locked, ); - let alphaPost1 = searchAlpha.posts[0]; - expect(alphaPost1.post.locked).toBe(true); // Try to make a new comment there, on alpha await expect(createComment(alpha, alphaPost1.post.id)).rejects.toBe("locked"); @@ -252,11 +233,11 @@ test("Lock a post", async () => { expect(unlockedPost.post_view.post.locked).toBe(false); // Make sure that post is unlocked on alpha - let searchAlpha2 = await waitUntil( - () => searchPostLocal(alpha, postRes.post_view.post), - res => !res.posts[0]?.post.locked, + let alphaPost2 = await waitForPost( + alpha, + postRes.post_view.post, + post => !post.post.locked, ); - let alphaPost2 = searchAlpha2.posts[0]; expect(alphaPost2.community.local).toBe(false); expect(alphaPost2.creator.local).toBe(true); expect(alphaPost2.post.locked).toBe(false); @@ -280,21 +261,14 @@ test("Delete a post", async () => { // Make sure lemmy beta sees post is deleted // This will be undefined because of the tombstone - await waitUntil( - () => resolvePost(beta, postRes.post_view.post).catch(e => e), - e => e === "couldnt_find_object", - ); + await waitForPost(beta, postRes.post_view.post, p => !p); // Undelete let undeletedPost = await deletePost(alpha, false, postRes.post_view.post); // Make sure lemmy beta sees post is undeleted - let betaPost2 = ( - await waitUntil( - () => resolvePost(beta, postRes.post_view.post).catch(e => e), - e => e !== "couldnt_find_object", - ) - ).post; + let betaPost2 = await waitForPost(beta, postRes.post_view.post); + if (!betaPost2) { throw "Missing beta post 2"; } @@ -354,11 +328,7 @@ test("Remove a post from admin and community on same instance", async () => { let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); // Get the id for beta - let searchBeta = await waitUntil( - () => searchPostLocal(beta, postRes.post_view.post), - res => !!res.posts[0], - ); - let betaPost = searchBeta.posts[0]; + let betaPost = await waitForPost(beta, postRes.post_view.post); expect(betaPost).toBeDefined(); // The beta admin removes it (the community lives on beta) @@ -366,24 +336,26 @@ test("Remove a post from admin and community on same instance", async () => { expect(removePostRes.post_view.post.removed).toBe(true); // Make sure lemmy alpha sees post is removed - let alphaPost = await waitUntil( - () => getPost(alpha, postRes.post_view.post.id), - p => p.post_view.post.removed, + let alphaPost = await waitForPost( + alpha, + postRes.post_view.post, + p => p.post.removed, ); - expect(alphaPost.post_view.post.removed).toBe(true); - assertPostFederation(alphaPost.post_view, removePostRes.post_view); + expect(alphaPost.post.removed).toBe(true); + assertPostFederation(alphaPost, removePostRes.post_view); // Undelete let undeletedPost = await removePost(beta, false, betaPost.post); expect(undeletedPost.post_view.post.removed).toBe(false); // Make sure lemmy alpha sees post is undeleted - let alphaPost2 = await waitUntil( - () => getPost(alpha, postRes.post_view.post.id), - p => !p.post_view.post.removed, + let alphaPost2 = await waitForPost( + alpha, + postRes.post_view.post, + p => !p.post.removed, ); - expect(alphaPost2.post_view.post.removed).toBe(false); - assertPostFederation(alphaPost2.post_view, undeletedPost.post_view); + expect(alphaPost2.post.removed).toBe(false); + assertPostFederation(alphaPost2, undeletedPost.post_view); await unfollowRemotes(alpha); }); @@ -424,11 +396,7 @@ test("Enforce site ban for federated user", async () => { // alpha makes post in beta community, it federates to beta instance let postRes1 = await createPost(alpha_user, betaCommunity.community.id); - let searchBeta1 = await waitUntil( - () => searchPostLocal(beta, postRes1.post_view.post), - res => !!res.posts[0], - ); - expect(searchBeta1.posts[0]).toBeDefined(); + let searchBeta1 = await waitForPost(beta, postRes1.post_view.post); // ban alpha from its instance let banAlpha = await banPersonFromSite( @@ -447,7 +415,7 @@ test("Enforce site ban for federated user", async () => { expect(alphaUserOnBeta1.person?.person.banned).toBe(true); // existing alpha post should be removed on beta - let searchBeta2 = await getPost(beta, searchBeta1.posts[0].post.id); + let searchBeta2 = await getPost(beta, searchBeta1.post.id); expect(searchBeta2.post_view.post.removed).toBe(true); // Unban alpha @@ -461,11 +429,7 @@ test("Enforce site ban for federated user", async () => { // alpha makes new post in beta community, it federates let postRes2 = await createPost(alpha_user, betaCommunity.community.id); - let searchBeta3 = await waitUntil( - () => searchPostLocal(beta, postRes2.post_view.post), - e => !!e.posts[0], - ); - expect(searchBeta3.posts[0]).toBeDefined(); + let searchBeta3 = await waitForPost(beta, postRes2.post_view.post); let alphaUserOnBeta2 = await resolvePerson(beta, alphaUserActorId); expect(alphaUserOnBeta2.person?.person.banned).toBe(false); diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index b3f9f637f..1166b5134 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -4,6 +4,7 @@ import { GetUnreadCount, GetUnreadCountResponse, LemmyHttp, + PostView, } from "lemmy-js-client"; import { CreatePost } from "lemmy-js-client/dist/types/CreatePost"; import { DeletePost } from "lemmy-js-client/dist/types/DeletePost"; @@ -318,6 +319,18 @@ export async function searchPostLocal( return api.client.search(form); } +/// wait for a post to appear locally without pulling it +export async function waitForPost( + api: API, + post: Post, + checker: (t: PostView) => boolean = p => !!p, +) { + return waitUntil( + () => searchPostLocal(api, post).then(p => p.posts[0] as PostView), + checker, + ); +} + export async function getPost( api: API, post_id: number, diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index 5939c023a..ab82eb1de 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -1,6 +1,7 @@ use crate::{ activities::{ generate_activity_id, + generate_announce_activity_id, send_lemmy_activity, verify_is_public, verify_person_in_community, @@ -75,16 +76,20 @@ impl AnnounceActivity { community: &ApubCommunity, context: &Data, ) -> Result { + let inner_kind = object + .other + .get("type") + .and_then(|e| e.as_str()) + .unwrap_or("other"); + let id = + generate_announce_activity_id(inner_kind, &context.settings().get_protocol_and_hostname())?; Ok(AnnounceActivity { actor: community.id().into(), to: vec![public()], object: IdOrNestedObject::NestedObject(object), cc: vec![community.followers_url.clone().into()], kind: AnnounceType::Announce, - id: generate_activity_id( - &AnnounceType::Announce, - &context.settings().get_protocol_and_hostname(), - )?, + id, }) } diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 81bf806a6..2e26e0678 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -28,7 +28,7 @@ use crate::{ use activitypub_federation::{ config::Data, fetch::object_id::ObjectId, - kinds::public, + kinds::{activity::AnnounceType, public}, protocol::context::WithContext, traits::{ActivityHandler, Actor}, }; @@ -185,6 +185,21 @@ where Url::parse(&id) } +/// like generate_activity_id but also add the inner kind for easier debugging +fn generate_announce_activity_id( + inner_kind: &str, + protocol_and_hostname: &str, +) -> Result { + let id = format!( + "{}/activities/{}/{}/{}", + protocol_and_hostname, + AnnounceType::Announce.to_string().to_lowercase(), + inner_kind, + Uuid::new_v4() + ); + Url::parse(&id) +} + pub(crate) trait GetActorType { fn actor_type(&self) -> ActorType; } From ae3b8f4f76b35bb2450b4ab98a301e02628072cb Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 11:21:47 +0000 Subject: [PATCH 23/43] fix --- api_tests/src/post.spec.ts | 21 ++++++++++----------- api_tests/src/shared.ts | 6 +++--- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 54faa1714..3b0e24129 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -125,7 +125,7 @@ test("Unlike a post", async () => { const betaPost = await waitForPost( beta, postRes.post_view.post, - post => post.counts.score === 0, + post => post?.counts.score === 0, ); expect(betaPost).toBeDefined(); @@ -222,7 +222,7 @@ test("Lock a post", async () => { let alphaPost1 = await waitForPost( alpha, postRes.post_view.post, - post => post.post.locked, + post => !!post && post.post.locked, ); // Try to make a new comment there, on alpha @@ -236,7 +236,7 @@ test("Lock a post", async () => { let alphaPost2 = await waitForPost( alpha, postRes.post_view.post, - post => !post.post.locked, + post => !!post && !post.post.locked, ); expect(alphaPost2.community.local).toBe(false); expect(alphaPost2.creator.local).toBe(true); @@ -336,13 +336,12 @@ test("Remove a post from admin and community on same instance", async () => { expect(removePostRes.post_view.post.removed).toBe(true); // Make sure lemmy alpha sees post is removed - let alphaPost = await waitForPost( - alpha, - postRes.post_view.post, - p => p.post.removed, + let alphaPost = await waitUntil( + () => getPost(alpha, postRes.post_view.post.id), + p => p?.post_view.post.removed ?? false, ); - expect(alphaPost.post.removed).toBe(true); - assertPostFederation(alphaPost, removePostRes.post_view); + expect(alphaPost.post_view?.post.removed).toBe(true); + assertPostFederation(alphaPost.post_view, removePostRes.post_view); // Undelete let undeletedPost = await removePost(beta, false, betaPost.post); @@ -352,7 +351,7 @@ test("Remove a post from admin and community on same instance", async () => { let alphaPost2 = await waitForPost( alpha, postRes.post_view.post, - p => !p.post.removed, + p => !!p && !p.post.removed, ); expect(alphaPost2.post.removed).toBe(false); assertPostFederation(alphaPost2, undeletedPost.post_view); @@ -367,7 +366,7 @@ test("Search for a post", async () => { let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); - let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; + let betaPost = await waitForPost(beta, postRes.post_view.post); expect(betaPost?.post.name).toBeDefined(); }); diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index 1166b5134..54dfb97a0 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -323,10 +323,10 @@ export async function searchPostLocal( export async function waitForPost( api: API, post: Post, - checker: (t: PostView) => boolean = p => !!p, + checker: (t: PostView | undefined) => boolean = p => !!p, ) { - return waitUntil( - () => searchPostLocal(api, post).then(p => p.posts[0] as PostView), + return waitUntil( + () => searchPostLocal(api, post).then(p => p.posts[0]), checker, ); } From c792f46c59147b6dbe64b620e117c2bf3416edf1 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 11:38:12 +0000 Subject: [PATCH 24/43] more wait for --- api_tests/src/comment.spec.ts | 7 ++++--- api_tests/src/post.spec.ts | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 24822d1d1..2afbc9fc7 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -34,6 +34,7 @@ import { getUnreadCount, waitUntil, delay, + waitForPost, } from "./shared"; import { CommentView } from "lemmy-js-client/dist/types/CommentView"; import { CommunityView } from "lemmy-js-client"; @@ -441,7 +442,7 @@ test("Mention beta from alpha", async () => { expect(mentionRes.comment_view.counts.score).toBe(1); // get beta's localized copy of the alpha post - let betaPost = (await resolvePost(beta, postOnAlphaRes.post_view.post)).post; + let betaPost = await waitForPost(beta, postOnAlphaRes.post_view.post); if (!betaPost) { throw "unable to locate post on beta"; } @@ -578,13 +579,13 @@ test("Check that activity from another instance is sent to third instance", asyn expect(betaPost.post_view.community.local).toBe(true); // Make sure gamma and alpha see it - let gammaPost = (await resolvePost(gamma, betaPost.post_view.post)).post; + let gammaPost = await waitForPost(gamma, betaPost.post_view.post); if (!gammaPost) { throw "Missing gamma post"; } expect(gammaPost.post).toBeDefined(); - let alphaPost = (await resolvePost(alpha, betaPost.post_view.post)).post; + let alphaPost = await waitForPost(alpha, betaPost.post_view.post); if (!alphaPost) { throw "Missing alpha post"; } diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 3b0e24129..602aba8cf 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -167,7 +167,7 @@ test("Sticky a post", async () => { } let postRes = await createPost(alpha, betaCommunity.community.id); - let betaPost1 = (await resolvePost(beta, postRes.post_view.post, false)).post; + let betaPost1 = await waitForPost(beta, postRes.post_view.post); if (!betaPost1) { throw "Missing beta post1"; } From 78a8a7b8b79880af28e43ad350f435bc45ef67f4 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 11:41:34 +0000 Subject: [PATCH 25/43] clippy --- crates/apub/src/activities/community/announce.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index ab82eb1de..e84a970f3 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -79,7 +79,7 @@ impl AnnounceActivity { let inner_kind = object .other .get("type") - .and_then(|e| e.as_str()) + .and_then(serde_json::Value::as_str) .unwrap_or("other"); let id = generate_announce_activity_id(inner_kind, &context.settings().get_protocol_and_hostname())?; From a59a94492d9541ad18f1a7dfdcfe4b09cb5a484e Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 12:29:09 +0000 Subject: [PATCH 26/43] more debug log --- crates/apub/src/activities/mod.rs | 2 +- crates/federate/src/worker.rs | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 2e26e0678..fcb90b13a 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -194,7 +194,7 @@ fn generate_announce_activity_id( "{}/activities/{}/{}/{}", protocol_and_hostname, AnnounceType::Announce.to_string().to_lowercase(), - inner_kind, + inner_kind.to_lowercase(), Uuid::new_v4() ); Url::parse(&id) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index b6e174c04..249fc100e 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -168,6 +168,12 @@ impl InstanceWorker { self.state.last_successful_id = id; continue; }; + tracing::info!( + "processing send of {} {} for {}", + ele.0.id, + ele.0.ap_id, + self.instance.domain + ); if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { tracing::warn!( "sending {} errored internally, skipping activity: {:?}", @@ -197,6 +203,13 @@ impl InstanceWorker { .get_inbox_urls(pool, activity) .await .context("failed figuring out inbox urls")?; + tracing::info!( + "inboxes of {} for {} {}: {:?}", + self.instance.domain, + activity.id, + activity.ap_id, + inbox_urls + ); if inbox_urls.is_empty() { self.state.last_successful_id = activity.id; return Ok(()); From a61d56473271eeeec5242c4c8082537c0599e1dd Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 13:07:36 +0000 Subject: [PATCH 27/43] fix delete test --- api_tests/src/post.spec.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 602aba8cf..c8affbab9 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -261,13 +261,17 @@ test("Delete a post", async () => { // Make sure lemmy beta sees post is deleted // This will be undefined because of the tombstone - await waitForPost(beta, postRes.post_view.post, p => !p); + await waitForPost(beta, postRes.post_view.post, p => !p || p.post.deleted); // Undelete let undeletedPost = await deletePost(alpha, false, postRes.post_view.post); // Make sure lemmy beta sees post is undeleted - let betaPost2 = await waitForPost(beta, postRes.post_view.post); + let betaPost2 = await waitForPost( + beta, + postRes.post_view.post, + p => !!p && !p.post.deleted, + ); if (!betaPost2) { throw "Missing beta post 2"; From 1a0c866b5169cfc8130e53df69dc6e1dbc190361 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 13:37:18 +0000 Subject: [PATCH 28/43] logging temporary --- crates/apub/src/activities/mod.rs | 7 ++++--- crates/federate/src/worker.rs | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index fcb90b13a..b7cbbd500 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -217,12 +217,12 @@ where ActorT: Actor + GetActorType, Activity: ActivityHandler, { - info!("Sending activity {}", activity.id().to_string()); + info!("Saving outgoing activity to queue {}", activity.id()); let activity = WithContext::new(activity, CONTEXT.deref().clone()); let form = SentActivityForm { ap_id: activity.id().clone().into(), - data: serde_json::to_value(activity.clone())?, + data: serde_json::to_value(activity)?, sensitive, send_inboxes: send_targets .inboxes @@ -234,7 +234,8 @@ where actor_type: actor.actor_type(), actor_apub_id: actor.id().into(), }; - SentActivity::create(&mut data.pool(), form).await?; + let created = SentActivity::create(&mut data.pool(), form).await?; + info!("Queued for send: {:?}", created); Ok(()) } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 249fc100e..14c86a7d3 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -161,10 +161,12 @@ impl InstanceWorker { { id += 1; processed_activities += 1; + tracing::info!("looking at activity {id}, proc={processed_activities}, latest={latest_id}"); let Some(ele) = get_activity_cached(pool, id) .await .context("failed reading activity from db")? else { + tracing::info!("activity {id} empty, marking latest"); self.state.last_successful_id = id; continue; }; From 3d649e1d3e986ab1541f13183ef5d1cdc3b1aaf0 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 14:18:31 +0000 Subject: [PATCH 29/43] remove synchronous federation flag --- api_tests/prepare-drone-federation-test.sh | 3 +-- api_tests/src/post.spec.ts | 1 + crates/api_common/src/send_activity.rs | 14 ++++---------- crates/api_crud/src/post/create.rs | 10 ++-------- crates/apub/src/activities/mod.rs | 12 ++---------- crates/utils/src/lib.rs | 11 ----------- 6 files changed, 10 insertions(+), 41 deletions(-) diff --git a/api_tests/prepare-drone-federation-test.sh b/api_tests/prepare-drone-federation-test.sh index fc19ee8c8..4044ba0dd 100755 --- a/api_tests/prepare-drone-federation-test.sh +++ b/api_tests/prepare-drone-federation-test.sh @@ -6,8 +6,7 @@ set -e export RUST_BACKTRACE=1 export RUST_LOG="warn,lemmy_server=debug,lemmy_federate=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug" -export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. -export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min +export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do echo "DB URL: ${LEMMY_DATABASE_URL} INSTANCE: $INSTANCE" diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index c8affbab9..1530d6bb0 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -254,6 +254,7 @@ test("Delete a post", async () => { let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); + await waitForPost(beta, postRes.post_view.post); let deletedPost = await deletePost(alpha, true, postRes.post_view.post); expect(deletedPost.post_view.post.deleted).toBe(true); diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index 897f102fe..6d9c722a1 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -17,7 +17,7 @@ use lemmy_db_schema::{ }, }; use lemmy_db_views::structs::PrivateMessageView; -use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; +use lemmy_utils::error::LemmyResult; use once_cell::sync::{Lazy, OnceCell}; use tokio::{ sync::{ @@ -32,7 +32,7 @@ use url::Url; type MatchOutgoingActivitiesBoxed = Box fn(SendActivityData, &'a Data) -> BoxFuture<'a, LemmyResult<()>>>; -/// This static is necessary so that activities can be sent out synchronously for tests and the api_common crates don't need to depend on lemmy_apub +/// This static is necessary so that the api_common crates don't need to depend on lemmy_apub pub static MATCH_OUTGOING_ACTIVITIES: OnceCell = OnceCell::new(); #[derive(Debug)] @@ -88,17 +88,11 @@ impl ActivityChannel { pub async fn submit_activity( data: SendActivityData, - context: &Data, + _context: &Data, ) -> LemmyResult<()> { - if *SYNCHRONOUS_FEDERATION { - MATCH_OUTGOING_ACTIVITIES - .get() - .expect("retrieve function pointer")(data, context) - .await?; - } // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender, // not sure which way is more efficient - else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { + if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { sender.send(data)?; } Ok(()) diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index d0b0f368c..3da75589a 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -37,7 +37,6 @@ use lemmy_utils::{ slurs::{check_slurs, check_slurs_opt}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, }, - SYNCHRONOUS_FEDERATION, }; use tracing::Instrument; use url::Url; @@ -176,7 +175,7 @@ pub async fn create_post( mark_post_as_read(person_id, post_id, &mut context.pool()).await?; if let Some(url) = updated_post.url.clone() { - let task = async move { + spawn_try_task(async move { let mut webmention = Webmention::new::(updated_post.ap_id.clone().into(), url.clone().into())?; webmention.set_checked(true); @@ -189,12 +188,7 @@ pub async fn create_post( Ok(_) => Ok(()), Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), } - }; - if *SYNCHRONOUS_FEDERATION { - task.await?; - } else { - spawn_try_task(task); - } + }); }; build_post_response(&context, community_id, person_id, post_id).await diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index b7cbbd500..54a96014e 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -45,11 +45,7 @@ use lemmy_db_schema::{ }, }; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; -use lemmy_utils::{ - error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}, - spawn_try_task, - SYNCHRONOUS_FEDERATION, -}; +use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}; use serde::Serialize; use std::{ops::Deref, time::Duration}; use tracing::info; @@ -351,10 +347,6 @@ pub async fn match_outgoing_activities( } } }; - if *SYNCHRONOUS_FEDERATION { - fed_task.await?; - } else { - spawn_try_task(fed_task); - } + fed_task.await?; Ok(()) } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 1ef8a842c..c0553de31 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -18,7 +18,6 @@ pub mod version; use error::LemmyError; use futures::Future; -use once_cell::sync::Lazy; use std::time::Duration; use tracing::Instrument; @@ -38,16 +37,6 @@ macro_rules! location_info { }; } -/// if true, all federation should happen synchronously. useful for debugging and testing. -/// defaults to true on debug mode, false on releasemode -/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1 -/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION="" -pub static SYNCHRONOUS_FEDERATION: Lazy = Lazy::new(|| { - std::env::var("LEMMY_SYNCHRONOUS_FEDERATION") - .map(|s| !s.is_empty()) - .unwrap_or(cfg!(debug_assertions)) -}); - /// tokio::spawn, but accepts a future that may fail and also /// * logs errors /// * attaches the spawned task to the tracing span of the caller for better logging From a808d3208a25908edd2da7e6f9cd3f2634e9432f Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 14:51:40 +0000 Subject: [PATCH 30/43] use max(id) instead of seq max value to prevent uncommitted transactions from causing skipped activities --- crates/federate/src/util.rs | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index f00ccadb3..f744d45f4 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -1,8 +1,5 @@ use anyhow::{anyhow, Context, Result}; -use diesel::{ - prelude::*, - sql_types::{Bool, Int8}, -}; +use diesel::prelude::*; use diesel_async::RunQueryDsl; use lemmy_apub::{ activity_lists::SharedInboxActivities, @@ -191,17 +188,11 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result = sent_activity.select(max(id)).get_result(conn).await?; + let latest_id = seq.unwrap_or(0); anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) }) .await @@ -212,11 +203,3 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result Duration { Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) } - -#[derive(QueryableByName)] -struct Sequence { - #[diesel(sql_type = Int8)] - last_value: i64, // this value is bigint for some reason even if sequence is int4 - #[diesel(sql_type = Bool)] - is_called: bool, -} From ae37ca0137b2139cc84285464c3002bf4c7ee6e0 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 15:33:09 +0000 Subject: [PATCH 31/43] re-remove debug comments --- crates/apub/src/activities/mod.rs | 3 +-- crates/federate/src/worker.rs | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 54a96014e..958065ffa 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -230,8 +230,7 @@ where actor_type: actor.actor_type(), actor_apub_id: actor.id().into(), }; - let created = SentActivity::create(&mut data.pool(), form).await?; - info!("Queued for send: {:?}", created); + SentActivity::create(&mut data.pool(), form).await?; Ok(()) } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 14c86a7d3..b6e174c04 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -161,21 +161,13 @@ impl InstanceWorker { { id += 1; processed_activities += 1; - tracing::info!("looking at activity {id}, proc={processed_activities}, latest={latest_id}"); let Some(ele) = get_activity_cached(pool, id) .await .context("failed reading activity from db")? else { - tracing::info!("activity {id} empty, marking latest"); self.state.last_successful_id = id; continue; }; - tracing::info!( - "processing send of {} {} for {}", - ele.0.id, - ele.0.ap_id, - self.instance.domain - ); if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { tracing::warn!( "sending {} errored internally, skipping activity: {:?}", @@ -205,13 +197,6 @@ impl InstanceWorker { .get_inbox_urls(pool, activity) .await .context("failed figuring out inbox urls")?; - tracing::info!( - "inboxes of {} for {} {}: {:?}", - self.instance.domain, - activity.id, - activity.ap_id, - inbox_urls - ); if inbox_urls.is_empty() { self.state.last_successful_id = activity.id; return Ok(()); From ac0c0c88564b1cb6c6aab7ec3aeff8ccc2e2b15f Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 15:33:31 +0000 Subject: [PATCH 32/43] re-remove lemmy logs --- .woodpecker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.woodpecker.yml b/.woodpecker.yml index c180d8d58..49fbe3db2 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -204,7 +204,7 @@ steps: - bash api_tests/prepare-drone-federation-test.sh - cd api_tests/ - yarn - - tail -f /tmp/lemmy*.out & yarn api-test + - yarn api-test when: *slow_check_paths rebuild-cache: From 1b7ab96887d7e7ca7b2b7f217d91faa1ab9dbf95 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 20:26:00 +0000 Subject: [PATCH 33/43] wait until ban post removed --- api_tests/src/post.spec.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 1530d6bb0..68ddbb730 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -419,8 +419,10 @@ test("Enforce site ban for federated user", async () => { expect(alphaUserOnBeta1.person?.person.banned).toBe(true); // existing alpha post should be removed on beta - let searchBeta2 = await getPost(beta, searchBeta1.post.id); - expect(searchBeta2.post_view.post.removed).toBe(true); + let searchBeta2 = await waitUntil( + () => getPost(beta, searchBeta1.post.id), + s => s.post_view.post.removed, + ); // Unban alpha let unBanAlpha = await banPersonFromSite( From f16aa9228d3e47d101898607cdccbe0d4772220e Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 20 Sep 2023 20:55:43 +0000 Subject: [PATCH 34/43] community fix --- api_tests/src/community.spec.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/api_tests/src/community.spec.ts b/api_tests/src/community.spec.ts index a8ea3073a..a313d97d2 100644 --- a/api_tests/src/community.spec.ts +++ b/api_tests/src/community.spec.ts @@ -27,6 +27,7 @@ import { blockInstance, waitUntil, delay, + waitForPost, } from "./shared"; beforeAll(async () => { @@ -351,8 +352,9 @@ test("User blocks instance, communities are hidden", async () => { expect(postRes.post_view.post.id).toBeDefined(); // fetch post to alpha - let alphaPost = await resolvePost(alpha, postRes.post_view.post); - expect(alphaPost.post?.post).toBeDefined(); + let alphaPost = (await resolvePost(alpha, postRes.post_view.post, false)) + .post!; + expect(alphaPost.post).toBeDefined(); // post should be included in listing let listing = await getPosts(alpha, "All"); @@ -360,7 +362,7 @@ test("User blocks instance, communities are hidden", async () => { expect(listing_ids).toContain(postRes.post_view.post.ap_id); // block the beta instance - await blockInstance(alpha, alphaPost.post!.community.instance_id, true); + await blockInstance(alpha, alphaPost.community.instance_id, true); // after blocking, post should not be in listing let listing2 = await getPosts(alpha, "All"); @@ -368,7 +370,7 @@ test("User blocks instance, communities are hidden", async () => { expect(listing_ids2.indexOf(postRes.post_view.post.ap_id)).toBe(-1); // unblock instance again - await blockInstance(alpha, alphaPost.post!.community.instance_id, false); + await blockInstance(alpha, alphaPost.community.instance_id, false); // post should be included in listing let listing3 = await getPosts(alpha, "All"); From 459d5a372648f1e2b58244a1694a5d2c2ba93a34 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 21 Sep 2023 11:07:20 +0000 Subject: [PATCH 36/43] Revert "re-remove lemmy logs" This reverts commit ac0c0c88564b1cb6c6aab7ec3aeff8ccc2e2b15f. --- .woodpecker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.woodpecker.yml b/.woodpecker.yml index 49fbe3db2..c180d8d58 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -204,7 +204,7 @@ steps: - bash api_tests/prepare-drone-federation-test.sh - cd api_tests/ - yarn - - yarn api-test + - tail -f /tmp/lemmy*.out & yarn api-test when: *slow_check_paths rebuild-cache: From 6d519ef376cf45d1231b18c2aa6cfa641e09b411 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 21 Sep 2023 11:07:36 +0000 Subject: [PATCH 37/43] Revert "re-remove debug comments" This reverts commit ae37ca0137b2139cc84285464c3002bf4c7ee6e0. --- crates/apub/src/activities/mod.rs | 3 ++- crates/federate/src/worker.rs | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 958065ffa..54a96014e 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -230,7 +230,8 @@ where actor_type: actor.actor_type(), actor_apub_id: actor.id().into(), }; - SentActivity::create(&mut data.pool(), form).await?; + let created = SentActivity::create(&mut data.pool(), form).await?; + info!("Queued for send: {:?}", created); Ok(()) } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index b6e174c04..14c86a7d3 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -161,13 +161,21 @@ impl InstanceWorker { { id += 1; processed_activities += 1; + tracing::info!("looking at activity {id}, proc={processed_activities}, latest={latest_id}"); let Some(ele) = get_activity_cached(pool, id) .await .context("failed reading activity from db")? else { + tracing::info!("activity {id} empty, marking latest"); self.state.last_successful_id = id; continue; }; + tracing::info!( + "processing send of {} {} for {}", + ele.0.id, + ele.0.ap_id, + self.instance.domain + ); if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { tracing::warn!( "sending {} errored internally, skipping activity: {:?}", @@ -197,6 +205,13 @@ impl InstanceWorker { .get_inbox_urls(pool, activity) .await .context("failed figuring out inbox urls")?; + tracing::info!( + "inboxes of {} for {} {}: {:?}", + self.instance.domain, + activity.id, + activity.ap_id, + inbox_urls + ); if inbox_urls.is_empty() { self.state.last_successful_id = activity.id; return Ok(()); From dec25f9ee8387f3ef78a882e1fcc400344f63364 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 21 Sep 2023 11:11:54 +0000 Subject: [PATCH 38/43] ensure overlap --- crates/federate/src/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 14c86a7d3..dea82ea3f 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -320,7 +320,7 @@ impl InstanceWorker { instance_id: InstanceId, last_fetch: DateTime, ) -> Result<(HashMap>, DateTime)> { - let new_last_fetch = Utc::now(); // update to time before fetch to ensure overlap + let new_last_fetch = Utc::now() - chrono::Duration::seconds(10); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact Ok(( CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) .await? From 2c283d90ca60b5b8e39d71562e1865ad652e6c72 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 21 Sep 2023 11:45:33 +0000 Subject: [PATCH 39/43] Revert "Revert "re-remove debug comments"" This reverts commit 6d519ef376cf45d1231b18c2aa6cfa641e09b411. --- crates/apub/src/activities/mod.rs | 3 +-- crates/federate/src/worker.rs | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 54a96014e..958065ffa 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -230,8 +230,7 @@ where actor_type: actor.actor_type(), actor_apub_id: actor.id().into(), }; - let created = SentActivity::create(&mut data.pool(), form).await?; - info!("Queued for send: {:?}", created); + SentActivity::create(&mut data.pool(), form).await?; Ok(()) } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index dea82ea3f..3eda2e746 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -161,21 +161,13 @@ impl InstanceWorker { { id += 1; processed_activities += 1; - tracing::info!("looking at activity {id}, proc={processed_activities}, latest={latest_id}"); let Some(ele) = get_activity_cached(pool, id) .await .context("failed reading activity from db")? else { - tracing::info!("activity {id} empty, marking latest"); self.state.last_successful_id = id; continue; }; - tracing::info!( - "processing send of {} {} for {}", - ele.0.id, - ele.0.ap_id, - self.instance.domain - ); if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { tracing::warn!( "sending {} errored internally, skipping activity: {:?}", @@ -205,13 +197,6 @@ impl InstanceWorker { .get_inbox_urls(pool, activity) .await .context("failed figuring out inbox urls")?; - tracing::info!( - "inboxes of {} for {} {}: {:?}", - self.instance.domain, - activity.id, - activity.ap_id, - inbox_urls - ); if inbox_urls.is_empty() { self.state.last_successful_id = activity.id; return Ok(()); From 07ffdc3d53ff56cc80209985588bb2dbdc891d19 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 21 Sep 2023 11:45:43 +0000 Subject: [PATCH 40/43] Revert "Revert "re-remove lemmy logs"" This reverts commit 459d5a372648f1e2b58244a1694a5d2c2ba93a34. --- .woodpecker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.woodpecker.yml b/.woodpecker.yml index c180d8d58..49fbe3db2 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -204,7 +204,7 @@ steps: - bash api_tests/prepare-drone-federation-test.sh - cd api_tests/ - yarn - - tail -f /tmp/lemmy*.out & yarn api-test + - yarn api-test when: *slow_check_paths rebuild-cache: From 31f3677270af509ca48965de736e09e2df939778 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 21 Sep 2023 12:01:22 +0000 Subject: [PATCH 41/43] fix after merge --- api_tests/src/comment.spec.ts | 7 +++---- api_tests/src/community.spec.ts | 5 ++--- api_tests/src/post.spec.ts | 15 ++++++--------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 47321f9cc..fe8c3943e 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -504,8 +504,7 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t expect(alphaPost.post_view.community.local).toBe(true); // Make sure gamma sees it - let gammaPost = (await resolvePost(gamma, alphaPost.post_view.post, false))! - .post; + let gammaPost = (await resolvePost(gamma, alphaPost.post_view.post))!.post; if (!gammaPost) { throw "Missing gamma post"; @@ -670,8 +669,8 @@ test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde expect(updateRes.comment_view.comment.content).toBe(updatedCommentContent); // Get the post from alpha - let alphaPostB = (await resolvePost(alpha, postOnBetaRes.post_view.post)) - .post; + let alphaPostB = await waitForPost(alpha, postOnBetaRes.post_view.post); + if (!alphaPostB) { throw "Missing alpha post B"; } diff --git a/api_tests/src/community.spec.ts b/api_tests/src/community.spec.ts index f9d83c8cf..b81dd900c 100644 --- a/api_tests/src/community.spec.ts +++ b/api_tests/src/community.spec.ts @@ -240,7 +240,7 @@ test("Admin actions in remote community are not federated to origin", async () = expect(banRes.banned).toBe(true); // ban doesnt federate to community's origin instance alpha - let alphaPost = (await resolvePost(alpha, gammaPost.post, false)).post; + let alphaPost = (await resolvePost(alpha, gammaPost.post)).post; expect(alphaPost?.creator_banned_from_community).toBe(false); // and neither to gamma @@ -352,8 +352,7 @@ test("User blocks instance, communities are hidden", async () => { expect(postRes.post_view.post.id).toBeDefined(); // fetch post to alpha - let alphaPost = (await resolvePost(alpha, postRes.post_view.post, false)) - .post!; + let alphaPost = (await resolvePost(alpha, postRes.post_view.post)).post!; expect(alphaPost.post).toBeDefined(); // post should be included in listing diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 5e656e2f4..51a10293b 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -192,8 +192,7 @@ test("Sticky a post", async () => { expect(betaPost2?.post.featured_community).toBe(false); // Make sure that gamma cannot sticky the post on beta - let gammaPost = (await resolvePost(gamma, postRes.post_view.post, false)) - .post; + let gammaPost = (await resolvePost(gamma, postRes.post_view.post)).post; if (!gammaPost) { throw "Missing gamma post"; } @@ -300,8 +299,7 @@ test("Remove a post from admin and community on different instance", async () => } let postRes = await createPost(gamma, gammaCommunity.id); - let alphaPost = (await resolvePost(alpha, postRes.post_view.post, false)) - .post; + let alphaPost = (await resolvePost(alpha, postRes.post_view.post)).post; if (!alphaPost) { throw "Missing alpha post"; } @@ -310,7 +308,7 @@ test("Remove a post from admin and community on different instance", async () => expect(removedPost.post_view.post.name).toBe(postRes.post_view.post.name); // Make sure lemmy beta sees post is NOT removed - let betaPost = (await resolvePost(beta, postRes.post_view.post, false)).post; + let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; if (!betaPost) { throw "Missing beta post"; } @@ -504,7 +502,7 @@ test("A and G subscribe to B (center) A posts, it gets announced to G", async () let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); - let betaPost = (await resolvePost(gamma, postRes.post_view.post, false)).post; + let betaPost = (await resolvePost(gamma, postRes.post_view.post)).post; expect(betaPost?.post.name).toBeDefined(); }); @@ -517,8 +515,7 @@ test("Report a post", async () => { let postRes = await createPost(beta, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); - let alphaPost = (await resolvePost(alpha, postRes.post_view.post, false)) - .post; + let alphaPost = (await resolvePost(alpha, postRes.post_view.post)).post; if (!alphaPost) { throw "Missing alpha post"; } @@ -563,7 +560,7 @@ test("Sanitize HTML", async () => { "<script>alert('xss');</script> hello &"'", ); - let alphaPost = (await resolvePost(alpha, post.post_view.post, false)).post; + let alphaPost = (await resolvePost(alpha, post.post_view.post)).post; // second escaping over federation, avoid double escape of & expect(alphaPost?.post.body).toBe( "<script>alert('xss');</script> hello &"'", From fe40adfc39d09273145bd8649374ca43ed81ab5e Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 21 Sep 2023 12:32:41 +0000 Subject: [PATCH 42/43] fix after merge --- api_tests/src/shared.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index f50b64e64..a1868f8f2 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -291,7 +291,7 @@ export async function searchPostLocal( /// wait for a post to appear locally without pulling it export async function waitForPost( - api: API, + api: LemmyHttp, post: Post, checker: (t: PostView | undefined) => boolean = p => !!p, ) { From 9bcadadede08cc5e724bd394e9d21e17cbf458e1 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 21 Sep 2023 12:52:10 +0000 Subject: [PATCH 43/43] fix after merge --- api_tests/src/comment.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index fe8c3943e..6ced2bf33 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -343,7 +343,7 @@ test("Federated comment like", async () => { }); test("Reply to a comment from another instance, get notification", async () => { - await alpha.client.markAllAsRead({ auth: alpha.auth }); + await alpha.markAllAsRead(); let betaCommunity = (await resolveBetaCommunity(alpha)).community; if (!betaCommunity) {