fix both permanent stopping of federation queues and multiple creation of the same federation queues

fix-dupe-activity-sending
phiresky 2024-05-29 15:11:29 +02:00
parent 51970ffc81
commit 13ff059f83
3 changed files with 52 additions and 22 deletions

View File

@ -43,7 +43,7 @@ pub struct SendManager {
}
impl SendManager {
pub fn new(opts: Opts, context: FederationConfig<LemmyContext>) -> Self {
fn new(opts: Opts, context: FederationConfig<LemmyContext>) -> Self {
assert!(opts.process_count > 0);
assert!(opts.process_index > 0);
assert!(opts.process_index <= opts.process_count);
@ -61,11 +61,27 @@ impl SendManager {
}
}
pub fn run(mut self) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move {
self.do_loop(cancel).await?;
self.cancel().await?;
Ok(())
pub fn run(opts: Opts, context: FederationConfig<LemmyContext>) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| {
let opts = opts.clone();
let context = context.clone();
let mut manager = Self::new(opts, context);
async move {
let result = manager.do_loop(cancel).await;
// the loop function will only return if there is (a) an internal error (e.g. db connection
// failure) or (b) it was cancelled from outside.
if let Err(e) = result {
// don't let this error bubble up, just log it, so the below cancel function will run
// regardless
tracing::error!("SendManager failed: {e}");
}
// cancel all the dependent workers as well to ensure they don't get orphaned and keep
// running.
manager.cancel().await?;
LemmyResult::Ok(())
// if the task was not intentionally cancelled, then this whole lambda will be run again by
// CancellableTask after this
}
})
}
@ -104,14 +120,24 @@ impl SendManager {
continue;
}
// create new worker
let instance = instance.clone();
let req_data = self.context.to_request_data();
let context = self.context.clone();
let stats_sender = self.stats_sender.clone();
self.workers.insert(
instance.id,
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| async move {
InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await?;
Ok(())
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
// if the instance worker ends unexpectedly due to internal/db errors, this lambda is rerun by cancellabletask.
let instance = instance.clone();
let req_data = context.to_request_data();
let stats_sender = stats_sender.clone();
async move {
InstanceWorker::init_and_loop(
instance,
req_data,
stop,
stats_sender,
)
.await
}
}),
);
} else if !should_federate {

View File

@ -17,7 +17,6 @@ use lemmy_db_schema::{
traits::ApubActor,
utils::{get_conn, DbPool},
};
use lemmy_utils::error::LemmyResult;
use moka::future::Cache;
use once_cell::sync::Lazy;
use reqwest::Url;
@ -25,7 +24,6 @@ use serde_json::Value;
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken;
use tracing::error;
/// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the
@ -61,24 +59,31 @@ impl CancellableTask {
/// spawn a task but with graceful shutdown
pub fn spawn<F, R: Debug>(
timeout: Duration,
task: impl FnOnce(CancellationToken) -> F + Send + 'static,
task: impl Fn(CancellationToken) -> F + Send + 'static,
) -> CancellableTask
where
F: Future<Output = LemmyResult<R>> + Send + 'static,
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let stop = CancellationToken::new();
let stop2 = stop.clone();
let task: JoinHandle<LemmyResult<R>> = tokio::spawn(task(stop2));
let task: JoinHandle<()> = tokio::spawn(async move {
loop {
let res = task(stop2.clone()).await;
if stop2.is_cancelled() {
return;
} else {
tracing::warn!("task exited, restarting: {res:?}");
}
}
});
let abort = task.abort_handle();
CancellableTask {
f: Box::pin(async move {
stop.cancel();
tokio::select! {
r = task => {
if let Err(ref e) = r? {
error!("CancellableTask threw error: {e}");
}
r.context("CancellableTask failed to cancel cleanly, returned error")?;
Ok(())
},
_ = sleep(timeout) => {

View File

@ -210,14 +210,13 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
None
};
let federate = (!args.disable_activity_sending).then(|| {
let task = SendManager::new(
SendManager::run(
Opts {
process_index: args.federate_process_index,
process_count: args.federate_process_count,
},
federation_config,
);
task.run()
)
});
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;