diff --git a/Cargo.lock b/Cargo.lock index aa8d170b8..e93bf0ae0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -423,9 +423,9 @@ dependencies = [ [[package]] name = "background-jobs" -version = "0.9.1" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7df0fd6abf9d55139d4c9e569c0a8cd271ec265862c41bd215b46b36c52397" +checksum = "77f4508c6c5b5cfc6c18d43d0ba6ecda339710206854da9e1c9ac9dfb7e3eb6f" dependencies = [ "background-jobs-actix", "background-jobs-core", @@ -433,9 +433,9 @@ dependencies = [ [[package]] name = "background-jobs-actix" -version = "0.9.6" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38aebb545b0fac45046421993890eb49cc04896a93b85bbfb1b9017decc413f9" +checksum = "5dabf6a2204fe034db7910a38f8e2d183fe24eb92abd4c0aaca59f8cacf4e48b" dependencies = [ "actix-rt", "anyhow", @@ -443,31 +443,32 @@ dependencies = [ "async-trait", "background-jobs-core", "chrono", - "log", "num_cpus", "serde", "serde_json", "thiserror", "tokio", + "tracing", + "tracing-futures", "uuid", ] [[package]] name = "background-jobs-core" -version = "0.9.5" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee8604ff89c62ca8eefc1ea2c3f359a53b7930e640fb22bf7890eab13b4640d2" +checksum = "174d36b170699ecc13b7513bda9eff6f12cc889eae5d16b792daa3f7b21be452" dependencies = [ "actix-rt", "anyhow", "async-mutex", "async-trait", "chrono", - "log", "serde", "serde_json", "thiserror", - "tokio", + "tracing", + "tracing-futures", "uuid", ] diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index 7d53cb330..8dbc254a0 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -49,5 +49,5 @@ async-trait = "0.1.51" captcha = "0.0.8" anyhow = "1.0.44" thiserror = "1.0.29" -background-jobs = "0.9.1" +background-jobs = "0.11.0" reqwest = { version = "0.11.4", features = ["json"] } diff --git a/crates/api_crud/Cargo.toml b/crates/api_crud/Cargo.toml index 6372ebf4f..6843b56ad 100644 --- a/crates/api_crud/Cargo.toml +++ b/crates/api_crud/Cargo.toml @@ -43,6 +43,6 @@ sha2 = "0.9.8" async-trait = "0.1.51" anyhow = "1.0.44" thiserror = "1.0.29" -background-jobs = "0.9.1" +background-jobs = "0.11.0" reqwest = { version = "0.11.4", features = ["json"] } webmention = "0.4.0" diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index 8e5c312df..74c146793 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -47,7 +47,7 @@ sha2 = "0.9.8" async-trait = "0.1.51" anyhow = "1.0.44" thiserror = "1.0.29" -background-jobs = "0.9.1" +background-jobs = "0.11.0" reqwest = { version = "0.11.4", features = ["json"] } html2md = "0.2.13" once_cell = "1.8.0" diff --git a/crates/apub/src/collections/community_moderators.rs b/crates/apub/src/collections/community_moderators.rs index 1b0911ab7..9b8d26d9f 100644 --- a/crates/apub/src/collections/community_moderators.rs +++ b/crates/apub/src/collections/community_moderators.rs @@ -136,6 +136,7 @@ mod tests { person::tests::parse_lemmy_person, tests::{file_to_json_object, init_context}, }; + use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::{ source::{ community::Community, @@ -148,7 +149,8 @@ mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_community_moderators() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let community = parse_lemmy_community(&context).await; let community_id = community.id; diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index 007d3def4..41ea12b9a 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -214,6 +214,7 @@ pub(crate) mod tests { tests::{file_to_json_object, init_context}, }; use assert_json_diff::assert_json_include; + use lemmy_apub_lib::activity_queue::create_activity_queue; use serial_test::serial; async fn prepare_comment_test( @@ -241,7 +242,8 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] pub(crate) async fn test_parse_lemmy_comment() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let data = prepare_comment_test(&url, &context).await; @@ -270,7 +272,8 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_pleroma_comment() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let data = prepare_comment_test(&url, &context).await; diff --git a/crates/apub/src/objects/community.rs b/crates/apub/src/objects/community.rs index 300ad2f2c..e2c7f0af8 100644 --- a/crates/apub/src/objects/community.rs +++ b/crates/apub/src/objects/community.rs @@ -214,6 +214,7 @@ impl ApubCommunity { pub(crate) mod tests { use super::*; use crate::objects::tests::{file_to_json_object, init_context}; + use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::traits::Crud; use serial_test::serial; @@ -240,7 +241,8 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_community() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let community = parse_lemmy_community(&context).await; assert_eq!(community.title, "Ten Forward"); diff --git a/crates/apub/src/objects/mod.rs b/crates/apub/src/objects/mod.rs index b577dabef..5e0a08096 100644 --- a/crates/apub/src/objects/mod.rs +++ b/crates/apub/src/objects/mod.rs @@ -21,11 +21,11 @@ pub(crate) fn get_summary_from_string_or_source( #[cfg(test)] pub(crate) mod tests { use actix::Actor; + use background_jobs::QueueHandle; use diesel::{ r2d2::{ConnectionManager, Pool}, PgConnection, }; - use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::{ establish_unpooled_connection, get_database_url_from_env, @@ -45,7 +45,7 @@ pub(crate) mod tests { // TODO: would be nice if we didnt have to use a full context for tests. // or at least write a helper function so this code is shared with main.rs - pub(crate) fn init_context() -> LemmyContext { + pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext { // call this to run migrations establish_unpooled_connection(); let settings = Settings::init().unwrap(); @@ -57,7 +57,6 @@ pub(crate) mod tests { .user_agent(build_user_agent(&settings)) .build() .unwrap(); - let activity_queue = create_activity_queue(); let secret = Secret { id: 0, jwt_secret: "".to_string(), diff --git a/crates/apub/src/objects/person.rs b/crates/apub/src/objects/person.rs index 8c0587ddb..9c93e76d7 100644 --- a/crates/apub/src/objects/person.rs +++ b/crates/apub/src/objects/person.rs @@ -198,6 +198,7 @@ impl ActorType for ApubPerson { pub(crate) mod tests { use super::*; use crate::objects::tests::{file_to_json_object, init_context}; + use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::traits::Crud; use serial_test::serial; @@ -218,7 +219,8 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_person() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let person = parse_lemmy_person(&context).await; assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string())); @@ -231,7 +233,8 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_pleroma_person() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let json = file_to_json_object("assets/pleroma/objects/person.json"); let url = Url::parse("https://queer.hacktivis.me/users/lanodan").unwrap(); let mut request_counter = 0; diff --git a/crates/apub/src/objects/post.rs b/crates/apub/src/objects/post.rs index 1d4eddb9c..cb86b97df 100644 --- a/crates/apub/src/objects/post.rs +++ b/crates/apub/src/objects/post.rs @@ -205,12 +205,14 @@ mod tests { post::ApubPost, tests::{file_to_json_object, init_context}, }; + use lemmy_apub_lib::activity_queue::create_activity_queue; use serial_test::serial; #[actix_rt::test] #[serial] async fn test_parse_lemmy_post() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let community = parse_lemmy_community(&context).await; let person = parse_lemmy_person(&context).await; diff --git a/crates/apub/src/objects/private_message.rs b/crates/apub/src/objects/private_message.rs index 30c8e4dc5..d624e133e 100644 --- a/crates/apub/src/objects/private_message.rs +++ b/crates/apub/src/objects/private_message.rs @@ -162,6 +162,7 @@ mod tests { tests::{file_to_json_object, init_context}, }; use assert_json_diff::assert_json_include; + use lemmy_apub_lib::activity_queue::create_activity_queue; use serial_test::serial; async fn prepare_comment_test(url: &Url, context: &LemmyContext) -> (ApubPerson, ApubPerson) { @@ -191,7 +192,8 @@ mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_pm() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap(); let data = prepare_comment_test(&url, &context).await; let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json"); @@ -218,7 +220,8 @@ mod tests { #[actix_rt::test] #[serial] async fn test_parse_pleroma_pm() { - let context = init_context(); + let manager = create_activity_queue(); + let context = init_context(manager.queue_handle().clone()); let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap(); let data = prepare_comment_test(&url, &context).await; let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap(); diff --git a/crates/apub_lib/Cargo.toml b/crates/apub_lib/Cargo.toml index 1e30cd8d6..502e5dea3 100644 --- a/crates/apub_lib/Cargo.toml +++ b/crates/apub_lib/Cargo.toml @@ -26,5 +26,5 @@ sha2 = "0.9.8" actix-web = { version = "4.0.0-beta.9", default-features = false } http-signature-normalization-actix = { version = "0.5.0-beta.10", default-features = false, features = ["server", "sha-2"] } http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["sha-2"] } -background-jobs = "0.9.1" +background-jobs = "0.11.0" diesel = "1.4.8" diff --git a/crates/apub_lib/src/activity_queue.rs b/crates/apub_lib/src/activity_queue.rs index 31b18f7c0..3c7c321e6 100644 --- a/crates/apub_lib/src/activity_queue.rs +++ b/crates/apub_lib/src/activity_queue.rs @@ -1,10 +1,10 @@ use crate::{signatures::sign_and_send, traits::ActorType}; use anyhow::{anyhow, Context, Error}; use background_jobs::{ - create_server, memory_storage::Storage, ActixJob, Backoff, + Manager, MaxRetries, QueueHandle, WorkerConfig, @@ -35,7 +35,7 @@ pub async fn send_activity( if env::var("APUB_TESTING_SEND_SYNC").is_ok() { do_send(message, client).await?; } else { - activity_queue.queue::(message)?; + activity_queue.queue::(message).await?; } } @@ -101,19 +101,13 @@ async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> { Ok(()) } -pub fn create_activity_queue() -> QueueHandle { - // Start the application server. This guards access to to the jobs store - let queue_handle = create_server(Storage::new()); - let arbiter = actix_web::rt::Arbiter::new(); - +pub fn create_activity_queue() -> Manager { // Configure and start our workers - WorkerConfig::new(|| MyState { + WorkerConfig::new_managed(Storage::new(), |_| MyState { client: Client::default(), }) .register::() - .start_in_arbiter(&arbiter, queue_handle.clone()); - - queue_handle + .start() } #[derive(Clone)] diff --git a/crates/websocket/Cargo.toml b/crates/websocket/Cargo.toml index 8d4f192de..1f50393ad 100644 --- a/crates/websocket/Cargo.toml +++ b/crates/websocket/Cargo.toml @@ -26,7 +26,7 @@ serde_json = { version = "1.0.68", features = ["preserve_order"] } actix = "0.12.0" anyhow = "1.0.44" diesel = "1.4.8" -background-jobs = "0.9.1" +background-jobs = "0.11.0" tokio = "1.12.0" strum = "0.21.0" strum_macros = "0.21.1" diff --git a/src/main.rs b/src/main.rs index 059262a03..468419c84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -87,7 +87,9 @@ async fn main() -> Result<(), LemmyError> { .user_agent(build_user_agent(&settings)) .build()?; - let activity_queue = create_activity_queue(); + let queue_manager = create_activity_queue(); + + let activity_queue = queue_manager.queue_handle().clone(); let chat_server = ChatServer::startup( pool.clone(), @@ -128,5 +130,7 @@ async fn main() -> Result<(), LemmyError> { .run() .await?; + drop(queue_manager); + Ok(()) }