delay shorter

add_federation_worker_index
phiresky 2023-09-18 16:49:55 +00:00
parent 88931227b0
commit 9e886fba4a
4 changed files with 46 additions and 27 deletions

View File

@ -279,8 +279,9 @@ test("Delete a post", async () => {
// Make sure lemmy beta sees post is deleted // Make sure lemmy beta sees post is deleted
// This will be undefined because of the tombstone // This will be undefined because of the tombstone
await expect(resolvePost(beta, postRes.post_view.post)).rejects.toBe( await waitUntil(
"couldnt_find_object", () => resolvePost(beta, postRes.post_view.post).catch(e => e),
e => e === "couldnt_find_object",
); );
// Undelete // Undelete
@ -288,7 +289,12 @@ test("Delete a post", async () => {
expect(undeletedPost.post_view.post.deleted).toBe(false); expect(undeletedPost.post_view.post.deleted).toBe(false);
// Make sure lemmy beta sees post is undeleted // Make sure lemmy beta sees post is undeleted
let betaPost2 = (await resolvePost(beta, postRes.post_view.post)).post; let betaPost2 = (
await waitUntil(
() => resolvePost(beta, postRes.post_view.post).catch(e => e),
e => e !== "couldnt_find_object",
)
).post;
if (!betaPost2) { if (!betaPost2) {
throw "Missing beta post 2"; throw "Missing beta post 2";
} }

View File

@ -872,13 +872,15 @@ export async function waitUntil<T>(
fetcher: () => Promise<T>, fetcher: () => Promise<T>,
checker: (t: T) => boolean, checker: (t: T) => boolean,
retries = 10, retries = 10,
delaySeconds = 2, delaySeconds = [0.2, 0.5, 1, 2, 3],
) { ) {
let retry = 0; let retry = 0;
while (retry++ < retries) { while (retry++ < retries) {
const result = await fetcher(); const result = await fetcher();
if (checker(result)) return result; if (checker(result)) return result;
await delay(delaySeconds * 1000); await delay(
delaySeconds[Math.min(retry - 1, delaySeconds.length - 1)] * 1000,
);
} }
throw Error( throw Error(
`Failed "${fetcher}": "${checker}" did not return true after ${retries} retries (delayed ${delaySeconds}s each)`, `Failed "${fetcher}": "${checker}" did not return true after ${retries} retries (delayed ${delaySeconds}s each)`,

View File

@ -31,6 +31,26 @@ use std::{
use tokio::{task::JoinHandle, time::sleep}; use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue.
pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_TEST_FAST_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(false)
});
/// Recheck for new federation work every n seconds.
///
/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch,
/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table.
/// This delay is only applied if no federated activity happens during sending activities of the last batch.
pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
Duration::from_millis(100)
} else {
Duration::from_secs(30)
}
});
pub struct CancellableTask<R: Send + 'static> { pub struct CancellableTask<R: Send + 'static> {
f: Pin<Box<dyn Future<Output = Result<R, anyhow::Error>> + Send + 'static>>, f: Pin<Box<dyn Future<Output = Result<R, anyhow::Error>> + Send + 'static>>,
ended: Arc<RwLock<bool>>, ended: Arc<RwLock<bool>>,
@ -162,7 +182,11 @@ pub(crate) async fn get_activity_cached(
pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> { pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> {
static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| { static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| {
Cache::builder() Cache::builder()
.time_to_live(Duration::from_secs(1)) .time_to_live(if *LEMMY_TEST_FAST_FEDERATION {
*WORK_FINISHED_RECHECK_DELAY
} else {
Duration::from_secs(1)
})
.build() .build()
}); });
CACHE CACHE

View File

@ -1,6 +1,13 @@
use crate::{ use crate::{
federation_queue_state::FederationQueueState, federation_queue_state::FederationQueueState,
util::{get_activity_cached, get_actor_cached, get_latest_activity_id, retry_sleep_duration}, util::{
get_activity_cached,
get_actor_cached,
get_latest_activity_id,
retry_sleep_duration,
LEMMY_TEST_FAST_FEDERATION,
WORK_FINISHED_RECHECK_DELAY,
},
}; };
use activitypub_federation::{activity_sending::SendActivityTask, config::Data}; use activitypub_federation::{activity_sending::SendActivityTask, config::Data};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -23,31 +30,11 @@ use std::{
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue.
static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_TEST_FAST_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(false)
});
/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) /// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt)
/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. /// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent)
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
/// Recheck for new federation work every n seconds.
///
/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch,
/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table.
/// This delay is only applied if no federated activity happens during sending activities of the last batch.
static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
Duration::from_secs(1)
} else {
Duration::from_secs(30)
}
});
/// interval with which new additions to community_followers are queried. /// interval with which new additions to community_followers are queried.
/// ///
/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), /// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears),