From 024ab7d5309ffcc8d9891a92210d3db51d3a9985 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 3 Jan 2024 19:30:06 +0100 Subject: [PATCH] Fix federate loop (#4330) * make activity channel infallible * clippy * federate: make cancellabletask loop itself --- crates/federate/src/lib.rs | 58 ++++++++++++++++------------------- crates/federate/src/util.rs | 50 +++++++++++++----------------- crates/federate/src/worker.rs | 1 + 3 files changed, 48 insertions(+), 61 deletions(-) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 382aa59b8..a4dc49536 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -23,6 +23,7 @@ static INSTANCES_RECHECK_DELAY: Duration = Duration::from_secs(5); #[cfg(not(debug_assertions))] static INSTANCES_RECHECK_DELAY: Duration = Duration::from_secs(60); +#[derive(Clone)] pub struct Opts { /// how many processes you are starting in total pub process_count: i32, @@ -36,7 +37,7 @@ async fn start_stop_federation_workers( federation_config: FederationConfig, cancel: CancellationToken, ) -> anyhow::Result<()> { - let mut workers = HashMap::>::new(); + let mut workers = HashMap::::new(); let (stats_sender, stats_receiver) = unbounded_channel(); let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); @@ -66,40 +67,30 @@ async fn start_stop_federation_workers( let should_federate = allowed && !is_dead; if should_federate { if workers.contains_key(&instance.id) { - if workers - .get(&instance.id) - .map(util::CancellableTask::has_ended) - .unwrap_or(false) - { - // task must have errored out, remove and recreated it - let worker = workers - .remove(&instance.id) - .expect("just checked contains_key"); - tracing::error!( - "worker for {} has stopped, recreating: {:?}", - instance.domain, - worker.cancel().await - ); - } else { - continue; - } + // worker already running + continue; } // create new worker + let config = federation_config.clone(); let stats_sender = stats_sender.clone(); - let context = federation_config.to_request_data(); let pool = pool.clone(); workers.insert( instance.id, - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, |stop| async move { - InstanceWorker::init_and_loop( - instance, - context, - &mut DbPool::Pool(&pool), - stop, - stats_sender, - ) - .await?; - Ok(()) + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { + let instance = instance.clone(); + let req_data = config.clone().to_request_data(); + let stats_sender = stats_sender.clone(); + let pool = pool.clone(); + async move { + InstanceWorker::init_and_loop( + instance, + req_data, + &mut DbPool::Pool(&pool), + stop, + stats_sender, + ) + .await + } }), ); } else if !should_federate { @@ -135,9 +126,12 @@ pub fn start_stop_federation_workers_cancellable( opts: Opts, pool: ActualDbPool, config: FederationConfig, -) -> CancellableTask<()> { - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| { - start_stop_federation_workers(opts, pool, config, c) +) -> CancellableTask { + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { + let opts = opts.clone(); + let pool = pool.clone(); + let config = config.clone(); + async move { start_stop_federation_workers(opts, pool, config, stop).await } }) } diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 1775e4153..848785836 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -20,12 +20,7 @@ use moka::future::Cache; use once_cell::sync::Lazy; use reqwest::Url; use serde_json::Value; -use std::{ - future::Future, - pin::Pin, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; @@ -49,41 +44,41 @@ pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { } }); -pub struct CancellableTask { - f: Pin> + Send + 'static>>, - ended: Arc>, +/// A task that will be run in an infinite loop, unless it is cancelled. +/// If the task exits without being cancelled, an error will be logged and the task will be restarted. +pub struct CancellableTask { + f: Pin> + Send + 'static>>, } -impl CancellableTask { +impl CancellableTask { /// spawn a task but with graceful shutdown - pub fn spawn( + pub fn spawn( timeout: Duration, - task: impl FnOnce(CancellationToken) -> F, - ) -> CancellableTask + task: impl Fn(CancellationToken) -> F + Send + 'static, + ) -> CancellableTask where - F: Future> + Send + 'static, + F: Future + Send + 'static, { let stop = CancellationToken::new(); - let task = task(stop.clone()); - let ended = Arc::new(RwLock::new(false)); - let ended_write = ended.clone(); - let task: JoinHandle> = tokio::spawn(async move { - match task.await { - Ok(o) => Ok(o), - Err(e) => { - *ended_write.write().expect("poisoned") = true; - Err(e) + let stop2 = stop.clone(); + let task: JoinHandle<()> = tokio::spawn(async move { + loop { + let res = task(stop2.clone()).await; + if stop2.is_cancelled() { + return; + } else { + tracing::warn!("task exited, restarting: {res:?}"); } } }); let abort = task.abort_handle(); CancellableTask { - ended, f: Box::pin(async move { stop.cancel(); tokio::select! { r = task => { - Ok(r.context("could not join")??) + r.context("could not join")?; + Ok(()) }, _ = sleep(timeout) => { abort.abort(); @@ -96,12 +91,9 @@ impl CancellableTask { } /// cancel the cancel signal, wait for timeout for the task to stop gracefully, otherwise abort it - pub async fn cancel(self) -> Result { + pub async fn cancel(self) -> Result<(), anyhow::Error> { self.f.await } - pub fn has_ended(&self) -> bool { - *self.ended.read().expect("poisoned") - } } /// assuming apub priv key and ids are immutable, then we don't need to have TTL diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 963814ad9..6dff325b6 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -206,6 +206,7 @@ impl InstanceWorker { .await .context("failed figuring out inbox urls")?; if inbox_urls.is_empty() { + tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); self.state.last_successful_id = Some(activity.id); self.state.last_successful_published_time = Some(activity.published); return Ok(());