From a3d705f0d68e12b72d05f2123661650af8bc3895 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 18:39:07 +0200 Subject: [PATCH] split inbox functions into separate file --- crates/federate/src/inboxes.rs | 149 +++++++++++++++++++++++++++++++++ crates/federate/src/lib.rs | 1 + crates/federate/src/worker.rs | 148 +++++--------------------------- 3 files changed, 170 insertions(+), 128 deletions(-) create mode 100644 crates/federate/src/inboxes.rs diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs new file mode 100644 index 000000000..dde096053 --- /dev/null +++ b/crates/federate/src/inboxes.rs @@ -0,0 +1,149 @@ +use crate::util::LEMMY_TEST_FAST_FEDERATION; +use anyhow::Result; +use chrono::{DateTime, TimeZone, Utc}; +use lemmy_db_schema::{ + newtypes::{CommunityId, InstanceId}, + source::{activity::SentActivity, site::Site}, + utils::{ActualDbPool, DbPool}, +}; +use lemmy_db_views_actor::structs::CommunityFollowerView; +use once_cell::sync::Lazy; +use reqwest::Url; +use std::collections::{HashMap, HashSet}; + +/// interval with which new additions to community_followers are queried. +/// +/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), +/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. +/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. +/// (see https://github.com/LemmyNet/lemmy/issues/3958) +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if *LEMMY_TEST_FAST_FEDERATION { + chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") + } else { + chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") + } +}); +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. +/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. +static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = + Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +pub(crate) struct CommunityInboxCollector { + // load site lazily because if an instance is first seen due to being on allowlist, + // the corresponding row in `site` may not exist yet since that is only added once + // `fetch_instance_actor_for_object` is called. + // (this should be unlikely to be relevant outside of the federation tests) + site_loaded: bool, + site: Option, + followed_communities: HashMap>, + last_full_communities_fetch: DateTime, + last_incremental_communities_fetch: DateTime, + instance_id: InstanceId, + domain: String, + pool: ActualDbPool, +} +impl CommunityInboxCollector { + pub fn new( + pool: ActualDbPool, + instance_id: InstanceId, + domain: String, + ) -> CommunityInboxCollector { + CommunityInboxCollector { + pool, + site_loaded: false, + site: None, + followed_communities: HashMap::new(), + last_full_communities_fetch: Utc.timestamp_nanos(0), + last_incremental_communities_fetch: Utc.timestamp_nanos(0), + instance_id, + domain, + } + } + /// get inbox urls of sending the given activity to the given instance + /// most often this will return 0 values (if instance doesn't care about the activity) + /// or 1 value (the shared inbox) + /// > 1 values only happens for non-lemmy software + pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { + let mut inbox_urls: HashSet = HashSet::new(); + + if activity.send_all_instances { + if !self.site_loaded { + self.site = Site::read_from_instance_id(&mut self.pool(), self.instance_id).await?; + self.site_loaded = true; + } + if let Some(site) = &self.site { + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. + inbox_urls.insert(site.inbox_url.inner().clone()); + } + } + if let Some(t) = &activity.send_community_followers_of { + if let Some(urls) = self.followed_communities.get(t) { + inbox_urls.extend(urls.iter().cloned()); + } + } + inbox_urls.extend( + activity + .send_inboxes + .iter() + .filter_map(std::option::Option::as_ref) + .filter(|&u| (u.domain() == Some(&self.domain))) + .map(|u| u.inner().clone()), + ); + Ok(inbox_urls.into_iter().collect()) + } + + pub async fn update_communities(&mut self) -> Result<()> { + if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { + tracing::debug!("{}: fetching full list of communities", self.domain); + // process removals every hour + (self.followed_communities, self.last_full_communities_fetch) = self + .get_communities(self.instance_id, Utc.timestamp_nanos(0)) + .await?; + self.last_incremental_communities_fetch = self.last_full_communities_fetch; + } + if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { + // process additions every minute + let (news, time) = self + .get_communities(self.instance_id, self.last_incremental_communities_fetch) + .await?; + if !news.is_empty() { + tracing::debug!( + "{}: fetched {} incremental new followed communities", + self.domain, + news.len() + ); + } + self.followed_communities.extend(news); + self.last_incremental_communities_fetch = time; + } + Ok(()) + } + + /// get a list of local communities with the remote inboxes on the given instance that cares about them + async fn get_communities( + &mut self, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result<(HashMap>, DateTime)> { + let new_last_fetch = + Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact + Ok(( + CommunityFollowerView::get_instance_followed_community_inboxes( + &mut self.pool(), + instance_id, + last_fetch, + ) + .await? + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_default().insert(u.into()); + map + }), + new_last_fetch, + )) + } + fn pool(&self) -> DbPool<'_> { + DbPool::Pool(&self.pool) + } +} diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 6bcd453f1..ebc6c783e 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -14,6 +14,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; +mod inboxes; mod util; mod worker; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 6f151257f..5d952cc64 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,9 +1,11 @@ -use crate::util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - LEMMY_TEST_FAST_FEDERATION, - WORK_FINISHED_RECHECK_DELAY, +use crate::{ + inboxes::CommunityInboxCollector, + util::{ + get_activity_cached, + get_actor_cached, + get_latest_activity_id, + WORK_FINISHED_RECHECK_DELAY, + }, }; use activitypub_federation::{ activity_sending::SendActivityTask, @@ -15,20 +17,18 @@ use chrono::{DateTime, Days, TimeZone, Utc}; use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; use lemmy_db_schema::{ - newtypes::{ActivityId, CommunityId, InstanceId}, + newtypes::{ActivityId}, source::{ activity::SentActivity, federation_queue_state::FederationQueueState, instance::{Instance, InstanceForm}, - site::Site, }, utils::{naive_now, ActualDbPool, DbPool}, }; -use lemmy_db_views_actor::structs::CommunityFollowerView; use once_cell::sync::Lazy; use reqwest::Url; use std::{ - collections::{BinaryHeap, HashMap, HashSet}, + collections::{BinaryHeap}, ops::{Add, Deref}, time::Duration, }; @@ -40,23 +40,6 @@ use tokio_util::sync::CancellationToken; /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -/// interval with which new additions to community_followers are queried. -/// -/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), -/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. -/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. -/// (see https://github.com/LemmyNet/lemmy/issues/3958) -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { - if *LEMMY_TEST_FAST_FEDERATION { - chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") - } else { - chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") - } -}); -/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. -/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. -static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = - Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); static CONCURRENT_SENDS: Lazy = Lazy::new(|| { std::env::var("LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE") @@ -69,21 +52,13 @@ const MAX_SUCCESSFULS: usize = 1000; pub(crate) struct InstanceWorker { instance: Instance, - // load site lazily because if an instance is first seen due to being on allowlist, - // the corresponding row in `site` may not exist yet since that is only added once - // `fetch_instance_actor_for_object` is called. - // (this should be unlikely to be relevant outside of the federation tests) - site_loaded: bool, - site: Option, - followed_communities: HashMap>, stop: CancellationToken, config: FederationConfig, stats_sender: UnboundedSender<(String, FederationQueueState)>, - last_full_communities_fetch: DateTime, - last_incremental_communities_fetch: DateTime, state: FederationQueueState, last_state_insert: DateTime, pool: ActualDbPool, + inbox_collector: CommunityInboxCollector, } #[derive(Debug, PartialEq, Eq)] @@ -120,15 +95,15 @@ impl InstanceWorker { let pool = config.to_request_data().inner_pool().clone(); let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; let mut worker = InstanceWorker { + inbox_collector: CommunityInboxCollector::new( + pool.clone(), + instance.id, + instance.domain.clone(), + ), instance, - site_loaded: false, - site: None, - followed_communities: HashMap::new(), stop, config, stats_sender, - last_full_communities_fetch: Utc.timestamp_nanos(0), - last_incremental_communities_fetch: Utc.timestamp_nanos(0), state, last_state_insert: Utc.timestamp_nanos(0), pool, @@ -169,7 +144,7 @@ impl InstanceWorker { continue; } else { // send a new activity if there is one - self.update_communities().await?; + self.inbox_collector.update_communities().await?; let next_id = { // calculate next id to send based on the last id and the in flight requests let last_successful_id = self @@ -344,6 +319,7 @@ impl InstanceWorker { }; let activity = &ele.0; let inbox_urls = self + .inbox_collector .get_inbox_urls(activity) .await .context("failed figuring out inbox urls")?; @@ -403,6 +379,8 @@ impl InstanceWorker { context: Data, stop: CancellationToken, ) -> Result<()> { + debug_assert!(!inbox_urls.is_empty()); + let pool = &mut context.pool(); let Some(actor_apub_id) = &activity.actor_apub_id else { return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); @@ -447,92 +425,7 @@ impl InstanceWorker { }))?; Ok(()) } - /// get inbox urls of sending the given activity to the given instance - /// most often this will return 0 values (if instance doesn't care about the activity) - /// or 1 value (the shared inbox) - /// > 1 values only happens for non-lemmy software - async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { - let mut inbox_urls: HashSet = HashSet::new(); - if activity.send_all_instances { - if !self.site_loaded { - self.site = Site::read_from_instance_id(&mut self.pool(), self.instance.id).await?; - self.site_loaded = true; - } - if let Some(site) = &self.site { - // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. - inbox_urls.insert(site.inbox_url.inner().clone()); - } - } - if let Some(t) = &activity.send_community_followers_of { - if let Some(urls) = self.followed_communities.get(t) { - inbox_urls.extend(urls.iter().cloned()); - } - } - inbox_urls.extend( - activity - .send_inboxes - .iter() - .filter_map(std::option::Option::as_ref) - .filter(|&u| (u.domain() == Some(&self.instance.domain))) - .map(|u| u.inner().clone()), - ); - Ok(inbox_urls.into_iter().collect()) - } - - async fn update_communities(&mut self) -> Result<()> { - if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { - tracing::debug!( - "{}: fetching full list of communities", - self.instance.domain - ); - // process removals every hour - (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(self.instance.id, Utc.timestamp_nanos(0)) - .await?; - self.last_incremental_communities_fetch = self.last_full_communities_fetch; - } - if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { - // process additions every minute - let (news, time) = self - .get_communities(self.instance.id, self.last_incremental_communities_fetch) - .await?; - if !news.is_empty() { - tracing::debug!( - "{}: fetched {} incremental new followed communities", - self.instance.domain, - news.len() - ); - } - self.followed_communities.extend(news); - self.last_incremental_communities_fetch = time; - } - Ok(()) - } - - /// get a list of local communities with the remote inboxes on the given instance that cares about them - async fn get_communities( - &mut self, - instance_id: InstanceId, - last_fetch: DateTime, - ) -> Result<(HashMap>, DateTime)> { - let new_last_fetch = - Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact - Ok(( - CommunityFollowerView::get_instance_followed_community_inboxes( - &mut self.pool(), - instance_id, - last_fetch, - ) - .await? - .into_iter() - .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_default().insert(u.into()); - map - }), - new_last_fetch, - )) - } async fn save_and_send_state(&mut self) -> Result<()> { tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now(); @@ -544,7 +437,6 @@ impl InstanceWorker { } fn pool(&self) -> DbPool<'_> { - //self.config.to_request_data() DbPool::Pool(&self.pool) } }