federation-send-parallel
phiresky 2024-05-30 12:38:17 +02:00
parent ffb99cd036
commit a0b0a7a0d9
5 changed files with 57 additions and 74 deletions

View File

@ -13,10 +13,13 @@ use std::collections::{HashMap, HashSet};
/// interval with which new additions to community_followers are queried. /// interval with which new additions to community_followers are queried.
/// ///
/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), /// The first time some user on an instance follows a specific remote community (or, more precisely:
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. /// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. /// the maximum time until the follow actually results in activities from that community id being
/// (see https://github.com/LemmyNet/lemmy/issues/3958) /// sent to that inbox url. This delay currently needs to not be too small because the DB load is
/// currently fairly high because of the current structure of storing inboxes for every person, not
/// having a separate list of shared_inboxes, and the architecture of having every instance queue be
/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958)
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| { static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION { if *LEMMY_TEST_FAST_FEDERATION {
chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
@ -24,8 +27,9 @@ static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
} }
}); });
/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community. /// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance
/// This is expected to happen pretty rarely and updating it in a timely manner is not too important. /// unfollows a specific remote community. This is expected to happen pretty rarely and updating it
/// in a timely manner is not too important.
static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds"));
@ -73,7 +77,8 @@ impl CommunityInboxCollector {
self.site_loaded = true; self.site_loaded = true;
} }
if let Some(site) = &self.site { if let Some(site) = &self.site {
// Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these
// activities. So handling it like this is fine.
inbox_urls.insert(site.inbox_url.inner().clone()); inbox_urls.insert(site.inbox_url.inner().clone());
} }
} }
@ -120,14 +125,16 @@ impl CommunityInboxCollector {
Ok(()) Ok(())
} }
/// get a list of local communities with the remote inboxes on the given instance that cares about them /// get a list of local communities with the remote inboxes on the given instance that cares about
/// them
async fn get_communities( async fn get_communities(
&mut self, &mut self,
instance_id: InstanceId, instance_id: InstanceId,
last_fetch: DateTime<Utc>, last_fetch: DateTime<Utc>,
) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> { ) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
let new_last_fetch = let new_last_fetch =
Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if
// published date is not exact
Ok(( Ok((
CommunityFollowerView::get_instance_followed_community_inboxes( CommunityFollowerView::get_instance_followed_community_inboxes(
&mut self.pool(), &mut self.pool(),

View File

@ -1,9 +1,7 @@
use crate::{util::CancellableTask, worker::InstanceWorker}; use crate::{util::CancellableTask, worker::InstanceWorker};
use activitypub_federation::config::FederationConfig; use activitypub_federation::config::FederationConfig;
use chrono::{Local, Timelike};
use lemmy_api_common::{ use lemmy_api_common::{
context::LemmyContext, context::LemmyContext,
federate_retry_sleep_duration,
lemmy_utils::settings::structs::FederationWorkerConfig, lemmy_utils::settings::structs::FederationWorkerConfig,
}; };
use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance}; use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance};
@ -72,7 +70,11 @@ impl SendManager {
} }
} }
pub fn run(opts: Opts, context: FederationConfig<LemmyContext>, config: FederationWorkerConfig) -> CancellableTask { pub fn run(
opts: Opts,
context: FederationConfig<LemmyContext>,
config: FederationWorkerConfig,
) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| {
let opts = opts.clone(); let opts = opts.clone();
let config = config.clone(); let config = config.clone();
@ -133,7 +135,6 @@ impl SendManager {
} }
// create new worker // create new worker
let context = self.context.clone(); let context = self.context.clone();
let context = self.context.clone();
let stats_sender = self.stats_sender.clone(); let stats_sender = self.stats_sender.clone();
let federation_worker_config = self.federation_worker_config.clone(); let federation_worker_config = self.federation_worker_config.clone();

View File

@ -46,7 +46,8 @@ pub(crate) struct SendRetryTask<'a> {
pub inbox_urls: Vec<Url>, pub inbox_urls: Vec<Url>,
/// report to the main instance worker /// report to the main instance worker
pub report: &'a mut UnboundedSender<SendActivityResult>, pub report: &'a mut UnboundedSender<SendActivityResult>,
/// the first request will be sent immediately, but the next one will be delayed according to the number of previous fails + 1 /// the first request will be sent immediately, but the next one will be delayed according to the
/// number of previous fails + 1
pub initial_fail_count: i32, pub initial_fail_count: i32,
/// for logging /// for logging
pub domain: String, pub domain: String,

View File

@ -1,36 +1,16 @@
<<<<<<< HEAD
use crate::{ use crate::{
inboxes::CommunityInboxCollector, inboxes::CommunityInboxCollector,
send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, send::{SendActivityResult, SendRetryTask, SendSuccessInfo},
util::{get_activity_cached, get_latest_activity_id, WORK_FINISHED_RECHECK_DELAY}, util::{
||||||| 51970ffc8
use crate::util::{
get_activity_cached, get_activity_cached,
get_actor_cached,
get_latest_activity_id,
LEMMY_TEST_FAST_FEDERATION,
WORK_FINISHED_RECHECK_DELAY,
};
use activitypub_federation::{
activity_sending::SendActivityTask,
config::Data,
protocol::context::WithContext,
=======
use crate::util::{
get_activity_cached,
get_actor_cached,
get_latest_activity_id, get_latest_activity_id,
FederationQueueStateWithDomain, FederationQueueStateWithDomain,
LEMMY_TEST_FAST_FEDERATION,
WORK_FINISHED_RECHECK_DELAY, WORK_FINISHED_RECHECK_DELAY,
},
}; };
use activitypub_federation::{ use activitypub_federation::{
activity_sending::SendActivityTask, config::{FederationConfig},
config::Data,
protocol::context::WithContext,
>>>>>>> origin/main
}; };
use activitypub_federation::config::FederationConfig;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::{DateTime, Days, TimeZone, Utc}; use chrono::{DateTime, Days, TimeZone, Utc};
use lemmy_api_common::{ use lemmy_api_common::{
@ -53,7 +33,8 @@ use tokio::{
}; };
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) /// Save state to db after this time has passed since the last state (so if the server crashes or is
/// SIGKILLed, less than X seconds of activities are resent)
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
/// Maximum number of successful sends to allow out of order /// Maximum number of successful sends to allow out of order
@ -62,13 +43,9 @@ const MAX_SUCCESSFULS: usize = 1000;
pub(crate) struct InstanceWorker { pub(crate) struct InstanceWorker {
instance: Instance, instance: Instance,
stop: CancellationToken, stop: CancellationToken,
context: Data<LemmyContext>,
stats_sender: UnboundedSender<FederationQueueStateWithDomain>, stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
last_full_communities_fetch: DateTime<Utc>,
last_incremental_communities_fetch: DateTime<Utc>,
federation_lib_config: FederationConfig<LemmyContext>, federation_lib_config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig, federation_worker_config: FederationWorkerConfig,
stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>,
state: FederationQueueState, state: FederationQueueState,
last_state_insert: DateTime<Utc>, last_state_insert: DateTime<Utc>,
pool: ActualDbPool, pool: ActualDbPool,
@ -81,7 +58,7 @@ impl InstanceWorker {
config: FederationConfig<LemmyContext>, config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig, federation_worker_config: FederationWorkerConfig,
stop: CancellationToken, stop: CancellationToken,
stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let pool = config.to_request_data().inner_pool().clone(); let pool = config.to_request_data().inner_pool().clone();
let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?;
@ -103,7 +80,8 @@ impl InstanceWorker {
worker.loop_until_stopped().await worker.loop_until_stopped().await
} }
/// loop fetch new activities from db and send them to the inboxes of the given instances /// loop fetch new activities from db and send them to the inboxes of the given instances
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) /// this worker only returns if (a) there is an internal error or (b) the cancellation token is
/// cancelled (graceful exit)
async fn loop_until_stopped(&mut self) -> Result<()> { async fn loop_until_stopped(&mut self) -> Result<()> {
self.initial_fail_sleep().await?; self.initial_fail_sleep().await?;
let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?;
@ -119,8 +97,9 @@ impl InstanceWorker {
tokio::sync::mpsc::unbounded_channel::<SendActivityResult>(); tokio::sync::mpsc::unbounded_channel::<SendActivityResult>();
while !self.stop.is_cancelled() { while !self.stop.is_cancelled() {
// check if we need to wait for a send to finish before sending the next one // check if we need to wait for a send to finish before sending the next one
// we wait if (a) the last request failed, only if a request is already in flight (not at the start of the loop) // we wait if (a) the last request failed, only if a request is already in flight (not at the
// or (b) if we have too many successfuls in memory or (c) if we have too many in flight // start of the loop) or (b) if we have too many successfuls in memory or (c) if we have
// too many in flight
let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0)
|| successfuls.len() >= MAX_SUCCESSFULS || successfuls.len() >= MAX_SUCCESSFULS
|| in_flight >= self.federation_worker_config.concurrent_sends_per_instance; || in_flight >= self.federation_worker_config.concurrent_sends_per_instance;
@ -131,8 +110,8 @@ impl InstanceWorker {
self self
.handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) .handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight)
.await?; .await?;
// handle_send_results does not guarantee that we are now in a condition where we want to send a new one, // handle_send_results does not guarantee that we are now in a condition where we want to
// so repeat this check until the if no longer applies // send a new one, so repeat this check until the if no longer applies
continue; continue;
} else { } else {
// send a new activity if there is one // send a new activity if there is one
@ -214,8 +193,8 @@ impl InstanceWorker {
if let Some(last) = self.state.last_successful_id { if let Some(last) = self.state.last_successful_id {
Ok((last, latest_id)) Ok((last, latest_id))
} else { } else {
// this is the initial creation (instance first seen) of the federation queue for this instance // this is the initial creation (instance first seen) of the federation queue for this
// skip all past activities: // instance skip all past activities:
self.state.last_successful_id = Some(latest_id); self.state.last_successful_id = Some(latest_id);
// save here to ensure it's not read as 0 again later if no activities have happened // save here to ensure it's not read as 0 again later if no activities have happened
self.save_and_send_state().await?; self.save_and_send_state().await?;
@ -245,7 +224,8 @@ impl InstanceWorker {
} }
SendActivityResult::Failure { fail_count, .. } => { SendActivityResult::Failure { fail_count, .. } => {
if fail_count > self.state.fail_count { if fail_count > self.state.fail_count {
// override fail count - if multiple activities are currently sending this value may get conflicting info but that's fine // override fail count - if multiple activities are currently sending this value may get
// conflicting info but that's fine
self.state.fail_count = fail_count; self.state.fail_count = fail_count;
self.state.last_retry = Some(Utc::now()); self.state.last_retry = Some(Utc::now());
force_write = true; force_write = true;
@ -272,15 +252,19 @@ impl InstanceWorker {
} }
Ok(()) Ok(())
} }
/// Checks that sequential activities `last_successful_id + 1`, `last_successful_id + 2` etc have been sent successfully. /// Checks that sequential activities `last_successful_id + 1`, `last_successful_id + 2` etc have
/// In that case updates `last_successful_id` and saves the state to the database if the time since the last save is greater than `SAVE_STATE_EVERY_TIME`. /// been sent successfully. In that case updates `last_successful_id` and saves the state to the
/// database if the time since the last save is greater than `SAVE_STATE_EVERY_TIME`.
async fn pop_successfuls_and_write( async fn pop_successfuls_and_write(
&mut self, &mut self,
successfuls: &mut BinaryHeap<SendSuccessInfo>, successfuls: &mut BinaryHeap<SendSuccessInfo>,
force_write: bool, force_write: bool,
) -> Result<()> { ) -> Result<()> {
let Some(mut last_id) = self.state.last_successful_id else { let Some(mut last_id) = self.state.last_successful_id else {
tracing::warn!("{} should be impossible: last successful id is None", self.instance.domain); tracing::warn!(
"{} should be impossible: last successful id is None",
self.instance.domain
);
return Ok(()); return Ok(());
}; };
tracing::debug!( tracing::debug!(
@ -292,7 +276,7 @@ impl InstanceWorker {
); );
while successfuls while successfuls
.peek() .peek()
.map(|a| &a.activity_id == &ActivityId(last_id.0 + 1)) .map(|a| a.activity_id == ActivityId(last_id.0 + 1))
.unwrap_or(false) .unwrap_or(false)
{ {
let next = successfuls.pop().unwrap(); let next = successfuls.pop().unwrap();
@ -308,8 +292,9 @@ impl InstanceWorker {
Ok(()) Ok(())
} }
/// we collect the relevant inboxes in the main instance worker task, and only spawn the send task if we have inboxes to send to /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task
/// this limits CPU usage and reduces overhead for the (many) cases where we don't have any inboxes /// if we have inboxes to send to this limits CPU usage and reduces overhead for the (many)
/// cases where we don't have any inboxes
async fn spawn_send_if_needed( async fn spawn_send_if_needed(
&mut self, &mut self,
activity_id: ActivityId, activity_id: ActivityId,
@ -381,23 +366,11 @@ impl InstanceWorker {
async fn save_and_send_state(&mut self) -> Result<()> { async fn save_and_send_state(&mut self) -> Result<()> {
tracing::debug!("{}: saving and sending state", self.instance.domain); tracing::debug!("{}: saving and sending state", self.instance.domain);
self.last_state_insert = Utc::now(); self.last_state_insert = Utc::now();
<<<<<<< HEAD
FederationQueueState::upsert(&mut self.pool(), &self.state).await?; FederationQueueState::upsert(&mut self.pool(), &self.state).await?;
self
.stats_sender
.send((self.instance.domain.clone(), self.state.clone()))?;
||||||| 51970ffc8
FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?;
self
.stats_sender
.send((self.instance.id, self.state.clone()))?;
=======
FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?;
self.stats_sender.send(FederationQueueStateWithDomain { self.stats_sender.send(FederationQueueStateWithDomain {
state: self.state.clone(), state: self.state.clone(),
domain: self.instance.domain.clone(), domain: self.instance.domain.clone(),
})?; })?;
>>>>>>> origin/main
Ok(()) Ok(())
} }

View File

@ -239,7 +239,8 @@ pub struct PrometheusConfig {
// named federation"worker"config to disambiguate from the activitypub library configuration // named federation"worker"config to disambiguate from the activitypub library configuration
pub struct FederationWorkerConfig { pub struct FederationWorkerConfig {
/// Limit to the number of concurrent outgoing federation requests per target instance. /// Limit to the number of concurrent outgoing federation requests per target instance.
/// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up. /// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities
/// per second) and if a receiving instance is not keeping up.
#[default(1)] #[default(1)]
pub concurrent_sends_per_instance: i64, pub concurrent_sends_per_instance: i64,
} }