From a0b0a7a0d993c811884b1a79af6edf11773cd1c9 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 30 May 2024 12:38:17 +0200 Subject: [PATCH] lint fix --- crates/federate/src/inboxes.rs | 25 +++++--- crates/federate/src/lib.rs | 9 +-- crates/federate/src/send.rs | 3 +- crates/federate/src/worker.rs | 91 ++++++++++------------------ crates/utils/src/settings/structs.rs | 3 +- 5 files changed, 57 insertions(+), 74 deletions(-) diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs index dde096053..6519a7dd2 100644 --- a/crates/federate/src/inboxes.rs +++ b/crates/federate/src/inboxes.rs @@ -13,10 +13,13 @@ use std::collections::{HashMap, HashSet}; /// interval with which new additions to community_followers are queried. /// -/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), -/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. -/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. -/// (see https://github.com/LemmyNet/lemmy/issues/3958) +/// The first time some user on an instance follows a specific remote community (or, more precisely: +/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits +/// the maximum time until the follow actually results in activities from that community id being +/// sent to that inbox url. This delay currently needs to not be too small because the DB load is +/// currently fairly high because of the current structure of storing inboxes for every person, not +/// having a separate list of shared_inboxes, and the architecture of having every instance queue be +/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958) static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { if *LEMMY_TEST_FAST_FEDERATION { chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") @@ -24,8 +27,9 @@ static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") } }); -/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. -/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. +/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance +/// unfollows a specific remote community. This is expected to happen pretty rarely and updating it +/// in a timely manner is not too important. static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); @@ -73,7 +77,8 @@ impl CommunityInboxCollector { self.site_loaded = true; } if let Some(site) = &self.site { - // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these + // activities. So handling it like this is fine. inbox_urls.insert(site.inbox_url.inner().clone()); } } @@ -120,14 +125,16 @@ impl CommunityInboxCollector { Ok(()) } - /// get a list of local communities with the remote inboxes on the given instance that cares about them + /// get a list of local communities with the remote inboxes on the given instance that cares about + /// them async fn get_communities( &mut self, instance_id: InstanceId, last_fetch: DateTime, ) -> Result<(HashMap>, DateTime)> { let new_last_fetch = - Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact + Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if + // published date is not exact Ok(( CommunityFollowerView::get_instance_followed_community_inboxes( &mut self.pool(), diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index b4fe49db9..88870af83 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,9 +1,7 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; -use chrono::{Local, Timelike}; use lemmy_api_common::{ context::LemmyContext, - federate_retry_sleep_duration, lemmy_utils::settings::structs::FederationWorkerConfig, }; use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance}; @@ -72,7 +70,11 @@ impl SendManager { } } - pub fn run(opts: Opts, context: FederationConfig, config: FederationWorkerConfig) -> CancellableTask { + pub fn run( + opts: Opts, + context: FederationConfig, + config: FederationWorkerConfig, + ) -> CancellableTask { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| { let opts = opts.clone(); let config = config.clone(); @@ -133,7 +135,6 @@ impl SendManager { } // create new worker let context = self.context.clone(); - let context = self.context.clone(); let stats_sender = self.stats_sender.clone(); let federation_worker_config = self.federation_worker_config.clone(); diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs index 41516eef7..2c9fc9c64 100644 --- a/crates/federate/src/send.rs +++ b/crates/federate/src/send.rs @@ -46,7 +46,8 @@ pub(crate) struct SendRetryTask<'a> { pub inbox_urls: Vec, /// report to the main instance worker pub report: &'a mut UnboundedSender, - /// the first request will be sent immediately, but the next one will be delayed according to the number of previous fails + 1 + /// the first request will be sent immediately, but the next one will be delayed according to the + /// number of previous fails + 1 pub initial_fail_count: i32, /// for logging pub domain: String, diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 6a5c11126..a6783c2fc 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,36 +1,16 @@ -<<<<<<< HEAD use crate::{ inboxes::CommunityInboxCollector, send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, - util::{get_activity_cached, get_latest_activity_id, WORK_FINISHED_RECHECK_DELAY}, -||||||| 51970ffc8 -use crate::util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - LEMMY_TEST_FAST_FEDERATION, - WORK_FINISHED_RECHECK_DELAY, + util::{ + get_activity_cached, + get_latest_activity_id, + FederationQueueStateWithDomain, + WORK_FINISHED_RECHECK_DELAY, + }, }; use activitypub_federation::{ - activity_sending::SendActivityTask, - config::Data, - protocol::context::WithContext, -======= -use crate::util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - FederationQueueStateWithDomain, - LEMMY_TEST_FAST_FEDERATION, - WORK_FINISHED_RECHECK_DELAY, + config::{FederationConfig}, }; -use activitypub_federation::{ - activity_sending::SendActivityTask, - config::Data, - protocol::context::WithContext, ->>>>>>> origin/main -}; -use activitypub_federation::config::FederationConfig; use anyhow::{Context, Result}; use chrono::{DateTime, Days, TimeZone, Utc}; use lemmy_api_common::{ @@ -53,7 +33,8 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; -/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) +/// Save state to db after this time has passed since the last state (so if the server crashes or is +/// SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); /// Maximum number of successful sends to allow out of order @@ -62,13 +43,9 @@ const MAX_SUCCESSFULS: usize = 1000; pub(crate) struct InstanceWorker { instance: Instance, stop: CancellationToken, - context: Data, stats_sender: UnboundedSender, - last_full_communities_fetch: DateTime, - last_incremental_communities_fetch: DateTime, federation_lib_config: FederationConfig, federation_worker_config: FederationWorkerConfig, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, state: FederationQueueState, last_state_insert: DateTime, pool: ActualDbPool, @@ -81,7 +58,7 @@ impl InstanceWorker { config: FederationConfig, federation_worker_config: FederationWorkerConfig, stop: CancellationToken, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, ) -> Result<(), anyhow::Error> { let pool = config.to_request_data().inner_pool().clone(); let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; @@ -103,7 +80,8 @@ impl InstanceWorker { worker.loop_until_stopped().await } /// loop fetch new activities from db and send them to the inboxes of the given instances - /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) + /// this worker only returns if (a) there is an internal error or (b) the cancellation token is + /// cancelled (graceful exit) async fn loop_until_stopped(&mut self) -> Result<()> { self.initial_fail_sleep().await?; let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; @@ -119,8 +97,9 @@ impl InstanceWorker { tokio::sync::mpsc::unbounded_channel::(); while !self.stop.is_cancelled() { // check if we need to wait for a send to finish before sending the next one - // we wait if (a) the last request failed, only if a request is already in flight (not at the start of the loop) - // or (b) if we have too many successfuls in memory or (c) if we have too many in flight + // we wait if (a) the last request failed, only if a request is already in flight (not at the + // start of the loop) or (b) if we have too many successfuls in memory or (c) if we have + // too many in flight let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) || successfuls.len() >= MAX_SUCCESSFULS || in_flight >= self.federation_worker_config.concurrent_sends_per_instance; @@ -131,8 +110,8 @@ impl InstanceWorker { self .handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) .await?; - // handle_send_results does not guarantee that we are now in a condition where we want to send a new one, - // so repeat this check until the if no longer applies + // handle_send_results does not guarantee that we are now in a condition where we want to + // send a new one, so repeat this check until the if no longer applies continue; } else { // send a new activity if there is one @@ -214,8 +193,8 @@ impl InstanceWorker { if let Some(last) = self.state.last_successful_id { Ok((last, latest_id)) } else { - // this is the initial creation (instance first seen) of the federation queue for this instance - // skip all past activities: + // this is the initial creation (instance first seen) of the federation queue for this + // instance skip all past activities: self.state.last_successful_id = Some(latest_id); // save here to ensure it's not read as 0 again later if no activities have happened self.save_and_send_state().await?; @@ -245,7 +224,8 @@ impl InstanceWorker { } SendActivityResult::Failure { fail_count, .. } => { if fail_count > self.state.fail_count { - // override fail count - if multiple activities are currently sending this value may get conflicting info but that's fine + // override fail count - if multiple activities are currently sending this value may get + // conflicting info but that's fine self.state.fail_count = fail_count; self.state.last_retry = Some(Utc::now()); force_write = true; @@ -272,15 +252,19 @@ impl InstanceWorker { } Ok(()) } - /// Checks that sequential activities `last_successful_id + 1`, `last_successful_id + 2` etc have been sent successfully. - /// In that case updates `last_successful_id` and saves the state to the database if the time since the last save is greater than `SAVE_STATE_EVERY_TIME`. + /// Checks that sequential activities `last_successful_id + 1`, `last_successful_id + 2` etc have + /// been sent successfully. In that case updates `last_successful_id` and saves the state to the + /// database if the time since the last save is greater than `SAVE_STATE_EVERY_TIME`. async fn pop_successfuls_and_write( &mut self, successfuls: &mut BinaryHeap, force_write: bool, ) -> Result<()> { let Some(mut last_id) = self.state.last_successful_id else { - tracing::warn!("{} should be impossible: last successful id is None", self.instance.domain); + tracing::warn!( + "{} should be impossible: last successful id is None", + self.instance.domain + ); return Ok(()); }; tracing::debug!( @@ -292,7 +276,7 @@ impl InstanceWorker { ); while successfuls .peek() - .map(|a| &a.activity_id == &ActivityId(last_id.0 + 1)) + .map(|a| a.activity_id == ActivityId(last_id.0 + 1)) .unwrap_or(false) { let next = successfuls.pop().unwrap(); @@ -308,8 +292,9 @@ impl InstanceWorker { Ok(()) } - /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task if we have inboxes to send to - /// this limits CPU usage and reduces overhead for the (many) cases where we don't have any inboxes + /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task + /// if we have inboxes to send to this limits CPU usage and reduces overhead for the (many) + /// cases where we don't have any inboxes async fn spawn_send_if_needed( &mut self, activity_id: ActivityId, @@ -381,23 +366,11 @@ impl InstanceWorker { async fn save_and_send_state(&mut self) -> Result<()> { tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now(); -<<<<<<< HEAD FederationQueueState::upsert(&mut self.pool(), &self.state).await?; - self - .stats_sender - .send((self.instance.domain.clone(), self.state.clone()))?; -||||||| 51970ffc8 - FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; - self - .stats_sender - .send((self.instance.id, self.state.clone()))?; -======= - FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; self.stats_sender.send(FederationQueueStateWithDomain { state: self.state.clone(), domain: self.instance.domain.clone(), })?; ->>>>>>> origin/main Ok(()) } diff --git a/crates/utils/src/settings/structs.rs b/crates/utils/src/settings/structs.rs index 8a2c7dd27..dcacc37b6 100644 --- a/crates/utils/src/settings/structs.rs +++ b/crates/utils/src/settings/structs.rs @@ -239,7 +239,8 @@ pub struct PrometheusConfig { // named federation"worker"config to disambiguate from the activitypub library configuration pub struct FederationWorkerConfig { /// Limit to the number of concurrent outgoing federation requests per target instance. - /// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up. + /// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities + /// per second) and if a receiving instance is not keeping up. #[default(1)] pub concurrent_sends_per_instance: i64, }