instead of changing fed delays in debug mode, change them via env variable

add_federation_worker_index
phiresky 2023-09-13 11:20:09 +00:00
parent dca43dcfd9
commit b09ffa7197
2 changed files with 42 additions and 13 deletions

View File

@ -2,6 +2,7 @@
set -e set -e
export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432
export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min
pushd .. pushd ..
cargo build cargo build
rm target/lemmy_server || true rm target/lemmy_server || true

View File

@ -22,20 +22,47 @@ use std::{
}; };
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt)
/// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue.
static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_TEST_FAST_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(false)
});
/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt)
/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent)
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
/// recheck for new federation work every n seconds /// Recheck for new federation work every n seconds.
#[cfg(debug_assertions)] ///
static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(1); /// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch,
#[cfg(not(debug_assertions))] /// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table.
static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30); /// This delay is only applied if no federated activity happens during sending activities of the last batch.
#[cfg(debug_assertions)] static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::Duration> = if *LEMMY_TEST_FAST_FEDERATION {
Lazy::new(|| chrono::Duration::minutes(1)); Duration::from_secs(1)
#[cfg(not(debug_assertions))] } else {
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::Duration> = Duration::from_secs(30)
Lazy::new(|| chrono::Duration::minutes(5)); }
});
/// interval with which new additions to community_followers are queried.
///
/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears),
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url.
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate.
/// (see https://github.com/LemmyNet/lemmy/issues/3958)
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::Duration> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
chrono::Duration::seconds(1)
} else {
chrono::Duration::minutes(2)
}
});
/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community.
/// This is expected to happen pretty rarely and updating it in a timely manner is not too important.
static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::Duration> = static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::Duration> =
Lazy::new(|| chrono::Duration::hours(1)); Lazy::new(|| chrono::Duration::hours(1));
pub(crate) struct InstanceWorker { pub(crate) struct InstanceWorker {
@ -121,6 +148,7 @@ impl InstanceWorker {
} }
Ok(()) Ok(())
} }
/// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities
async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> {
let latest_id = get_latest_activity_id(pool).await?; let latest_id = get_latest_activity_id(pool).await?;
if self.state.last_successful_id == -1 { if self.state.last_successful_id == -1 {
@ -134,7 +162,7 @@ impl InstanceWorker {
if id == latest_id { if id == latest_id {
// no more work to be done, wait before rechecking // no more work to be done, wait before rechecking
tokio::select! { tokio::select! {
() = sleep(WORK_FINISHED_RECHECK_DELAY) => {}, () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {},
() = self.stop.cancelled() => {} () = self.stop.cancelled() => {}
} }
return Ok(()); return Ok(());