From abcfa266af61bc0c9781e8b7156ac19541844fac Mon Sep 17 00:00:00 2001 From: Dessalines Date: Wed, 29 May 2024 17:03:42 -0400 Subject: [PATCH 1/5] Fixing slowness in saved post fetching. #4756 (#4758) * Fixing slowness in saved post fetching. #4756 * Also fix comment_view.rs --- crates/db_views/src/comment_view.rs | 33 +++++++++------------------ crates/db_views/src/post_view.rs | 35 +++++++++-------------------- crates/federate/src/util.rs | 4 ++-- docker/docker-compose.yml | 2 +- 4 files changed, 24 insertions(+), 50 deletions(-) diff --git a/crates/db_views/src/comment_view.rs b/crates/db_views/src/comment_view.rs index 7588943b9..e021578f8 100644 --- a/crates/db_views/src/comment_view.rs +++ b/crates/db_views/src/comment_view.rs @@ -1,5 +1,4 @@ use crate::structs::{CommentView, LocalUserView}; -use chrono::{DateTime, Utc}; use diesel::{ dsl::{exists, not}, pg::Pg, @@ -63,17 +62,6 @@ fn queries<'a>() -> Queries< ) }; - let is_saved = |person_id| { - comment_saved::table - .filter( - comment::id - .eq(comment_saved::comment_id) - .and(comment_saved::person_id.eq(person_id)), - ) - .select(comment_saved::published.nullable()) - .single_value() - }; - let is_community_followed = |person_id| { community_follower::table .filter( @@ -147,14 +135,6 @@ fn queries<'a>() -> Queries< Box::new(None::.into_sql::>()) }; - let is_saved_selection: Box< - dyn BoxableExpression<_, Pg, SqlType = sql_types::Nullable>, - > = if let Some(person_id) = my_person_id { - Box::new(is_saved(person_id)) - } else { - Box::new(None::>.into_sql::>()) - }; - let is_creator_blocked_selection: Box> = if let Some(person_id) = my_person_id { Box::new(is_creator_blocked(person_id)) @@ -167,6 +147,13 @@ fn queries<'a>() -> Queries< .inner_join(post::table) .inner_join(community::table.on(post::community_id.eq(community::id))) .inner_join(comment_aggregates::table) + .left_join( + comment_saved::table.on( + comment::id + .eq(comment_saved::comment_id) + .and(comment_saved::person_id.eq(my_person_id.unwrap_or(PersonId(-1)))), + ), + ) .select(( comment::all_columns, person::all_columns, @@ -178,7 +165,7 @@ fn queries<'a>() -> Queries< creator_is_moderator, creator_is_admin, subscribed_type_selection, - is_saved_selection.is_not_null(), + comment_saved::person_id.nullable().is_not_null(), is_creator_blocked_selection, score_selection, )) @@ -260,8 +247,8 @@ fn queries<'a>() -> Queries< // If its saved only, then filter, and order by the saved time, not the comment creation time. if options.saved_only { query = query - .filter(is_saved(person_id_join).is_not_null()) - .then_order_by(is_saved(person_id_join).desc()); + .filter(comment_saved::person_id.is_not_null()) + .then_order_by(comment_saved::published.desc()); } if let Some(my_id) = my_person_id { diff --git a/crates/db_views/src/post_view.rs b/crates/db_views/src/post_view.rs index afb0f435f..eac44bb39 100644 --- a/crates/db_views/src/post_view.rs +++ b/crates/db_views/src/post_view.rs @@ -1,5 +1,4 @@ use crate::structs::{LocalUserView, PaginationCursor, PostView}; -use chrono::{DateTime, Utc}; use diesel::{ debug_query, dsl::{exists, not, IntervalDsl}, @@ -100,17 +99,6 @@ fn queries<'a>() -> Queries< ), ); - let is_saved = |person_id| { - post_saved::table - .filter( - post_aggregates::post_id - .eq(post_saved::post_id) - .and(post_saved::person_id.eq(person_id)), - ) - .select(post_saved::published.nullable()) - .single_value() - }; - let is_read = |person_id| { exists( post_read::table.filter( @@ -162,14 +150,6 @@ fn queries<'a>() -> Queries< Box::new(false.into_sql::()) }; - let is_saved_selection: Box< - dyn BoxableExpression<_, Pg, SqlType = sql_types::Nullable>, - > = if let Some(person_id) = my_person_id { - Box::new(is_saved(person_id)) - } else { - Box::new(None::>.into_sql::>()) - }; - let is_read_selection: Box> = if let Some(person_id) = my_person_id { Box::new(is_read(person_id)) @@ -237,6 +217,13 @@ fn queries<'a>() -> Queries< .inner_join(person::table) .inner_join(community::table) .inner_join(post::table) + .left_join( + post_saved::table.on( + post_aggregates::post_id + .eq(post_saved::post_id) + .and(post_saved::person_id.eq(my_person_id.unwrap_or(PersonId(-1)))), + ), + ) .select(( post::all_columns, person::all_columns, @@ -247,7 +234,7 @@ fn queries<'a>() -> Queries< creator_is_admin, post_aggregates::all_columns, subscribed_type_selection, - is_saved_selection.is_not_null(), + post_saved::person_id.nullable().is_not_null(), is_read_selection, is_hidden_selection, is_creator_blocked_selection, @@ -426,10 +413,10 @@ fn queries<'a>() -> Queries< }; // If its saved only, then filter, and order by the saved time, not the comment creation time. - if let (true, Some(person_id)) = (options.saved_only, my_person_id) { + if let (true, Some(_person_id)) = (options.saved_only, my_person_id) { query = query - .filter(is_saved(person_id).is_not_null()) - .then_order_by(is_saved(person_id).desc()); + .filter(post_saved::person_id.is_not_null()) + .then_order_by(post_saved::published.desc()); } // Only hide the read posts, if the saved_only is false. Otherwise ppl with the hide_read // setting wont be able to see saved posts. diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 02a90dee9..818e5b072 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -59,13 +59,13 @@ pub struct CancellableTask { impl CancellableTask { /// spawn a task but with graceful shutdown - pub fn spawn( + pub fn spawn( timeout: Duration, task: impl FnOnce(CancellationToken) -> F + Send + 'static, ) -> CancellableTask where F: Future> + Send + 'static, - R: Send + 'static, + R: Send + Debug + 'static, { let stop = CancellationToken::new(); let stop2 = stop.clone(); diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c690a5f48..493b9c205 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -53,7 +53,7 @@ services: lemmy-ui: # use "image" to pull down an already compiled lemmy-ui. make sure to comment out "build". - image: dessalines/lemmy-ui:0.19.3 + image: dessalines/lemmy-ui:0.19.4-rc.3 # platform: linux/x86_64 # no arm64 support. uncomment platform if using m1. # use "build" to build your local lemmy ui image for development. make sure to comment out "image". # run: docker compose up --build From 7d80a3c7d6ead28851cae5b9e19021f1bc4706b6 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 29 May 2024 23:10:25 +0200 Subject: [PATCH 2/5] replace instanceid with domain (#4753) --- crates/federate/src/lib.rs | 10 ++++----- crates/federate/src/stats.rs | 38 +++++++++++------------------------ crates/federate/src/util.rs | 8 ++++++++ crates/federate/src/worker.rs | 12 ++++++----- 4 files changed, 31 insertions(+), 37 deletions(-) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index d3876226f..0e87b12de 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,10 +1,7 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::{ - newtypes::InstanceId, - source::{federation_queue_state::FederationQueueState, instance::Instance}, -}; +use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance}; use lemmy_utils::error::LemmyResult; use stats::receive_print_stats; use std::{collections::HashMap, time::Duration}; @@ -15,6 +12,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::info; +use util::FederationQueueStateWithDomain; mod stats; mod util; @@ -38,7 +36,7 @@ pub struct SendManager { opts: Opts, workers: HashMap, context: FederationConfig, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, exit_print: JoinHandle<()>, } @@ -171,7 +169,7 @@ mod test { collections::HashSet, sync::{Arc, Mutex}, }; - use tokio::{spawn, time::sleep}; + use tokio::spawn; struct TestData { send_manager: SendManager, diff --git a/crates/federate/src/stats.rs b/crates/federate/src/stats.rs index bb6510263..f927f6ddf 100644 --- a/crates/federate/src/stats.rs +++ b/crates/federate/src/stats.rs @@ -1,15 +1,11 @@ -use crate::util::get_latest_activity_id; +use crate::util::{get_latest_activity_id, FederationQueueStateWithDomain}; use chrono::Local; -use diesel::result::Error::NotFound; use lemmy_api_common::federate_retry_sleep_duration; use lemmy_db_schema::{ newtypes::InstanceId, - source::{federation_queue_state::FederationQueueState, instance::Instance}, utils::{ActualDbPool, DbPool}, }; -use lemmy_utils::{error::LemmyResult, CACHE_DURATION_FEDERATION}; -use moka::future::Cache; -use once_cell::sync::Lazy; +use lemmy_utils::error::LemmyResult; use std::{collections::HashMap, time::Duration}; use tokio::{sync::mpsc::UnboundedReceiver, time::interval}; use tracing::{debug, info, warn}; @@ -18,7 +14,7 @@ use tracing::{debug, info, warn}; /// dropped) pub(crate) async fn receive_print_stats( pool: ActualDbPool, - mut receiver: UnboundedReceiver<(InstanceId, FederationQueueState)>, + mut receiver: UnboundedReceiver, ) { let pool = &mut DbPool::Pool(&pool); let mut printerval = interval(Duration::from_secs(60)); @@ -28,7 +24,7 @@ pub(crate) async fn receive_print_stats( ele = receiver.recv() => { match ele { // update stats for instance - Some((instance_id, ele)) => {stats.insert(instance_id, ele);}, + Some(ele) => {stats.insert(ele.state.instance_id, ele);}, // receiver closed, print stats and exit None => { print_stats(pool, &stats).await; @@ -43,7 +39,10 @@ pub(crate) async fn receive_print_stats( } } -async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { +async fn print_stats( + pool: &mut DbPool<'_>, + stats: &HashMap, +) { let res = print_stats_with_error(pool, stats).await; if let Err(e) = res { warn!("Failed to print stats: {e}"); @@ -52,18 +51,8 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap, - stats: &HashMap, + stats: &HashMap, ) -> LemmyResult<()> { - static INSTANCE_CACHE: Lazy>> = Lazy::new(|| { - Cache::builder() - .max_capacity(1) - .time_to_live(CACHE_DURATION_FEDERATION) - .build() - }); - let instances = INSTANCE_CACHE - .try_get_with((), async { Instance::read_all(pool).await }) - .await?; - let last_id = get_latest_activity_id(pool).await?; // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be @@ -72,12 +61,9 @@ async fn print_stats_with_error( // todo: more stats (act/sec, avg http req duration) let mut ok_count = 0; let mut behind_count = 0; - for (instance_id, stat) in stats { - let domain = &instances - .iter() - .find(|i| &i.id == instance_id) - .ok_or(NotFound)? - .domain; + for ele in stats.values() { + let stat = &ele.state; + let domain = &ele.domain; let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0); if stat.fail_count > 0 { info!( diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 818e5b072..b3d080385 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -11,6 +11,7 @@ use lemmy_db_schema::{ source::{ activity::{ActorType, SentActivity}, community::Community, + federation_queue_state::FederationQueueState, person::Person, site::Site, }, @@ -183,3 +184,10 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result>, stop: CancellationToken, context: Data, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, last_full_communities_fetch: DateTime, last_incremental_communities_fetch: DateTime, state: FederationQueueState, @@ -87,7 +88,7 @@ impl InstanceWorker { instance: Instance, context: Data, stop: CancellationToken, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, ) -> Result<(), anyhow::Error> { let mut pool = context.pool(); let state = FederationQueueState::load(&mut pool, instance.id).await?; @@ -350,9 +351,10 @@ impl InstanceWorker { async fn save_and_send_state(&mut self) -> Result<()> { self.last_state_insert = Utc::now(); FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; - self - .stats_sender - .send((self.instance.id, self.state.clone()))?; + self.stats_sender.send(FederationQueueStateWithDomain { + state: self.state.clone(), + domain: self.instance.domain.clone(), + })?; Ok(()) } } From 91e57ff9541891ea6f7f8e133b1babdfeda1a103 Mon Sep 17 00:00:00 2001 From: Richard Schwab Date: Wed, 29 May 2024 23:55:15 +0200 Subject: [PATCH 3/5] Prevent bot replies from increasing unread reply count when bot accounts are not shown (#4747) * Prevent bot replies from increasing unread reply count when bot accounts are not shown * Pass LocalUser for unread replies count query * Prevent bot mentions from increasing unread reply count when bot accounts are not shown --- Cargo.lock | 1 + api_tests/src/comment.spec.ts | 56 ++++++++++++++- api_tests/src/shared.ts | 7 +- .../local_user/notifications/unread_count.rs | 7 +- crates/db_views_actor/Cargo.toml | 1 + .../db_views_actor/src/comment_reply_view.rs | 71 ++++++++++++++++--- .../db_views_actor/src/person_mention_view.rs | 71 ++++++++++++++++--- 7 files changed, 189 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b358fc82d..d0fc78c43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2946,6 +2946,7 @@ dependencies = [ "diesel", "diesel-async", "lemmy_db_schema", + "lemmy_db_views", "lemmy_utils", "pretty_assertions", "serde", diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index dfab4109c..8c3a23ab5 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -37,8 +37,9 @@ import { followCommunity, blockCommunity, delay, + saveUserSettings, } from "./shared"; -import { CommentView, CommunityView } from "lemmy-js-client"; +import { CommentView, CommunityView, SaveUserSettings } from "lemmy-js-client"; let betaCommunity: CommunityView | undefined; let postOnAlphaRes: PostResponse; @@ -443,6 +444,59 @@ test("Reply to a comment from another instance, get notification", async () => { assertCommentFederation(alphaReply, replyRes.comment_view); }); +test("Bot reply notifications are filtered when bots are hidden", async () => { + const newAlphaBot = await registerUser(alpha, alphaUrl); + let form: SaveUserSettings = { + bot_account: true, + }; + await saveUserSettings(newAlphaBot, form); + + const alphaCommunity = ( + await resolveCommunity(alpha, "!main@lemmy-alpha:8541") + ).community; + + if (!alphaCommunity) { + throw "Missing alpha community"; + } + + await alpha.markAllAsRead(); + form = { + show_bot_accounts: false, + }; + await saveUserSettings(alpha, form); + const postOnAlphaRes = await createPost(alpha, alphaCommunity.community.id); + + // Bot reply to alpha's post + let commentRes = await createComment( + newAlphaBot, + postOnAlphaRes.post_view.post.id, + ); + expect(commentRes).toBeDefined(); + + let alphaUnreadCountRes = await getUnreadCount(alpha); + expect(alphaUnreadCountRes.replies).toBe(0); + + let alphaUnreadRepliesRes = await getReplies(alpha, true); + expect(alphaUnreadRepliesRes.replies.length).toBe(0); + + // This both restores the original state that may be expected by other tests + // implicitly and is used by the next steps to ensure replies are still + // returned when a user later decides to show bot accounts again. + form = { + show_bot_accounts: true, + }; + await saveUserSettings(alpha, form); + + alphaUnreadCountRes = await getUnreadCount(alpha); + expect(alphaUnreadCountRes.replies).toBe(1); + + alphaUnreadRepliesRes = await getReplies(alpha, true); + expect(alphaUnreadRepliesRes.replies.length).toBe(1); + expect(alphaUnreadRepliesRes.replies[0].comment.id).toBe( + commentRes.comment_view.comment.id, + ); +}); + test("Mention beta from alpha", async () => { if (!betaCommunity) throw Error("no community"); const postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index 056f25538..2ae3d9e21 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -364,10 +364,13 @@ export async function getUnreadCount( return api.getUnreadCount(); } -export async function getReplies(api: LemmyHttp): Promise { +export async function getReplies( + api: LemmyHttp, + unread_only: boolean = false, +): Promise { let form: GetReplies = { sort: "New", - unread_only: false, + unread_only, }; return api.getReplies(form); } diff --git a/crates/api/src/local_user/notifications/unread_count.rs b/crates/api/src/local_user/notifications/unread_count.rs index 9d06f7c62..4c6c65263 100644 --- a/crates/api/src/local_user/notifications/unread_count.rs +++ b/crates/api/src/local_user/notifications/unread_count.rs @@ -11,9 +11,12 @@ pub async fn unread_count( ) -> LemmyResult> { let person_id = local_user_view.person.id; - let replies = CommentReplyView::get_unread_replies(&mut context.pool(), person_id).await?; + let replies = + CommentReplyView::get_unread_replies(&mut context.pool(), &local_user_view.local_user).await?; - let mentions = PersonMentionView::get_unread_mentions(&mut context.pool(), person_id).await?; + let mentions = + PersonMentionView::get_unread_mentions(&mut context.pool(), &local_user_view.local_user) + .await?; let private_messages = PrivateMessageView::get_unread_messages(&mut context.pool(), person_id).await?; diff --git a/crates/db_views_actor/Cargo.toml b/crates/db_views_actor/Cargo.toml index 1892055d1..d9e6a3352 100644 --- a/crates/db_views_actor/Cargo.toml +++ b/crates/db_views_actor/Cargo.toml @@ -40,6 +40,7 @@ serial_test = { workspace = true } tokio = { workspace = true } pretty_assertions = { workspace = true } url.workspace = true +lemmy_db_views.workspace = true lemmy_utils.workspace = true [package.metadata.cargo-machete] diff --git a/crates/db_views_actor/src/comment_reply_view.rs b/crates/db_views_actor/src/comment_reply_view.rs index 547c00e53..a5939d2e9 100644 --- a/crates/db_views_actor/src/comment_reply_view.rs +++ b/crates/db_views_actor/src/comment_reply_view.rs @@ -31,6 +31,7 @@ use lemmy_db_schema::{ person_block, post, }, + source::local_user::LocalUser, utils::{get_conn, limit_and_offset, DbConn, DbPool, ListFn, Queries, ReadFn}, CommentSortType, }; @@ -193,6 +194,8 @@ fn queries<'a>() -> Queries< }; let list = move |mut conn: DbConn<'a>, options: CommentReplyQuery| async move { + // These filters need to be kept in sync with the filters in + // CommentReplyView::get_unread_replies() let mut query = all_joins(comment_reply::table.into_boxed(), options.my_person_id); if let Some(recipient_id) = options.recipient_id { @@ -204,7 +207,7 @@ fn queries<'a>() -> Queries< } if !options.show_bot_accounts { - query = query.filter(person::bot_account.eq(false)); + query = query.filter(not(person::bot_account)); }; query = match options.sort.unwrap_or(CommentSortType::New) { @@ -246,24 +249,33 @@ impl CommentReplyView { /// Gets the number of unread replies pub async fn get_unread_replies( pool: &mut DbPool<'_>, - my_person_id: PersonId, + local_user: &LocalUser, ) -> Result { use diesel::dsl::count; let conn = &mut get_conn(pool).await?; - comment_reply::table + let mut query = comment_reply::table .inner_join(comment::table) .left_join( person_block::table.on( comment::creator_id .eq(person_block::target_id) - .and(person_block::person_id.eq(my_person_id)), + .and(person_block::person_id.eq(local_user.person_id)), ), ) - // Dont count replies from blocked users + .inner_join(person::table.on(comment::creator_id.eq(person::id))) + .into_boxed(); + + // These filters need to be kept in sync with the filters in queries().list() + if !local_user.show_bot_accounts { + query = query.filter(not(person::bot_account)); + } + + query + // Don't count replies from blocked users .filter(person_block::person_id.is_null()) - .filter(comment_reply::recipient_id.eq(my_person_id)) + .filter(comment_reply::recipient_id.eq(local_user.person_id)) .filter(comment_reply::read.eq(false)) .filter(comment::deleted.eq(false)) .filter(comment::removed.eq(false)) @@ -301,13 +313,15 @@ mod tests { comment_reply::{CommentReply, CommentReplyInsertForm, CommentReplyUpdateForm}, community::{Community, CommunityInsertForm}, instance::Instance, - person::{Person, PersonInsertForm}, + local_user::{LocalUser, LocalUserInsertForm, LocalUserUpdateForm}, + person::{Person, PersonInsertForm, PersonUpdateForm}, person_block::{PersonBlock, PersonBlockForm}, post::{Post, PostInsertForm}, }, traits::{Blockable, Crud}, utils::build_db_pool_for_tests, }; + use lemmy_db_views::structs::LocalUserView; use lemmy_utils::{error::LemmyResult, LemmyErrorType}; use pretty_assertions::assert_eq; use serial_test::serial; @@ -331,11 +345,15 @@ mod tests { .name("terrylakes recipient".into()) .public_key("pubkey".to_string()) .instance_id(inserted_instance.id) + .local(Some(true)) .build(); let inserted_recipient = Person::create(pool, &recipient_form).await?; let recipient_id = inserted_recipient.id; + let recipient_local_user = + LocalUser::create(pool, &LocalUserInsertForm::test_form(recipient_id), vec![]).await?; + let new_community = CommunityInsertForm::builder() .name("test community lake".to_string()) .title("nada".to_owned()) @@ -386,7 +404,7 @@ mod tests { CommentReply::update(pool, inserted_reply.id, &comment_reply_update_form).await?; // Test to make sure counts and blocks work correctly - let unread_replies = CommentReplyView::get_unread_replies(pool, recipient_id).await?; + let unread_replies = CommentReplyView::get_unread_replies(pool, &recipient_local_user).await?; let query = CommentReplyQuery { recipient_id: Some(recipient_id), @@ -409,11 +427,44 @@ mod tests { PersonBlock::block(pool, &block_form).await?; let unread_replies_after_block = - CommentReplyView::get_unread_replies(pool, recipient_id).await?; - let replies_after_block = query.list(pool).await?; + CommentReplyView::get_unread_replies(pool, &recipient_local_user).await?; + let replies_after_block = query.clone().list(pool).await?; assert_eq!(0, unread_replies_after_block); assert_eq!(0, replies_after_block.len()); + // Unblock user so we can reuse the same person + PersonBlock::unblock(pool, &block_form).await?; + + // Turn Terry into a bot account + let person_update_form = PersonUpdateForm { + bot_account: Some(true), + ..Default::default() + }; + Person::update(pool, inserted_terry.id, &person_update_form).await?; + + let recipient_local_user_update_form = LocalUserUpdateForm { + show_bot_accounts: Some(false), + ..Default::default() + }; + LocalUser::update( + pool, + recipient_local_user.id, + &recipient_local_user_update_form, + ) + .await?; + let recipient_local_user_view = LocalUserView::read(pool, recipient_local_user.id) + .await? + .ok_or(LemmyErrorType::CouldntFindLocalUser)?; + + let unread_replies_after_hide_bots = + CommentReplyView::get_unread_replies(pool, &recipient_local_user_view.local_user).await?; + + let mut query_without_bots = query.clone(); + query_without_bots.show_bot_accounts = false; + let replies_after_hide_bots = query_without_bots.list(pool).await?; + assert_eq!(0, unread_replies_after_hide_bots); + assert_eq!(0, replies_after_hide_bots.len()); + Comment::delete(pool, inserted_comment.id).await?; Post::delete(pool, inserted_post.id).await?; Community::delete(pool, inserted_community.id).await?; diff --git a/crates/db_views_actor/src/person_mention_view.rs b/crates/db_views_actor/src/person_mention_view.rs index d42987a68..58ddb011b 100644 --- a/crates/db_views_actor/src/person_mention_view.rs +++ b/crates/db_views_actor/src/person_mention_view.rs @@ -31,6 +31,7 @@ use lemmy_db_schema::{ person_mention, post, }, + source::local_user::LocalUser, utils::{get_conn, limit_and_offset, DbConn, DbPool, ListFn, Queries, ReadFn}, CommentSortType, }; @@ -192,6 +193,8 @@ fn queries<'a>() -> Queries< }; let list = move |mut conn: DbConn<'a>, options: PersonMentionQuery| async move { + // These filters need to be kept in sync with the filters in + // PersonMentionView::get_unread_mentions() let mut query = all_joins(person_mention::table.into_boxed(), options.my_person_id); if let Some(recipient_id) = options.recipient_id { @@ -203,7 +206,7 @@ fn queries<'a>() -> Queries< } if !options.show_bot_accounts { - query = query.filter(person::bot_account.eq(false)); + query = query.filter(not(person::bot_account)); }; query = match options.sort.unwrap_or(CommentSortType::Hot) { @@ -247,23 +250,32 @@ impl PersonMentionView { /// Gets the number of unread mentions pub async fn get_unread_mentions( pool: &mut DbPool<'_>, - my_person_id: PersonId, + local_user: &LocalUser, ) -> Result { use diesel::dsl::count; let conn = &mut get_conn(pool).await?; - person_mention::table + let mut query = person_mention::table .inner_join(comment::table) .left_join( person_block::table.on( comment::creator_id .eq(person_block::target_id) - .and(person_block::person_id.eq(my_person_id)), + .and(person_block::person_id.eq(local_user.person_id)), ), ) - // Dont count replies from blocked users + .inner_join(person::table.on(comment::creator_id.eq(person::id))) + .into_boxed(); + + // These filters need to be kept in sync with the filters in queries().list() + if !local_user.show_bot_accounts { + query = query.filter(not(person::bot_account)); + } + + query + // Don't count replies from blocked users .filter(person_block::person_id.is_null()) - .filter(person_mention::recipient_id.eq(my_person_id)) + .filter(person_mention::recipient_id.eq(local_user.person_id)) .filter(person_mention::read.eq(false)) .filter(comment::deleted.eq(false)) .filter(comment::removed.eq(false)) @@ -300,7 +312,8 @@ mod tests { comment::{Comment, CommentInsertForm}, community::{Community, CommunityInsertForm}, instance::Instance, - person::{Person, PersonInsertForm}, + local_user::{LocalUser, LocalUserInsertForm, LocalUserUpdateForm}, + person::{Person, PersonInsertForm, PersonUpdateForm}, person_block::{PersonBlock, PersonBlockForm}, person_mention::{PersonMention, PersonMentionInsertForm, PersonMentionUpdateForm}, post::{Post, PostInsertForm}, @@ -308,6 +321,7 @@ mod tests { traits::{Blockable, Crud}, utils::build_db_pool_for_tests, }; + use lemmy_db_views::structs::LocalUserView; use lemmy_utils::{error::LemmyResult, LemmyErrorType}; use pretty_assertions::assert_eq; use serial_test::serial; @@ -337,6 +351,9 @@ mod tests { let inserted_recipient = Person::create(pool, &recipient_form).await?; let recipient_id = inserted_recipient.id; + let recipient_local_user = + LocalUser::create(pool, &LocalUserInsertForm::test_form(recipient_id), vec![]).await?; + let new_community = CommunityInsertForm::builder() .name("test community lake".to_string()) .title("nada".to_owned()) @@ -387,7 +404,8 @@ mod tests { PersonMention::update(pool, inserted_mention.id, &person_mention_update_form).await?; // Test to make sure counts and blocks work correctly - let unread_mentions = PersonMentionView::get_unread_mentions(pool, recipient_id).await?; + let unread_mentions = + PersonMentionView::get_unread_mentions(pool, &recipient_local_user).await?; let query = PersonMentionQuery { recipient_id: Some(recipient_id), @@ -410,11 +428,44 @@ mod tests { PersonBlock::block(pool, &block_form).await?; let unread_mentions_after_block = - PersonMentionView::get_unread_mentions(pool, recipient_id).await?; - let mentions_after_block = query.list(pool).await?; + PersonMentionView::get_unread_mentions(pool, &recipient_local_user).await?; + let mentions_after_block = query.clone().list(pool).await?; assert_eq!(0, unread_mentions_after_block); assert_eq!(0, mentions_after_block.len()); + // Unblock user so we can reuse the same person + PersonBlock::unblock(pool, &block_form).await?; + + // Turn Terry into a bot account + let person_update_form = PersonUpdateForm { + bot_account: Some(true), + ..Default::default() + }; + Person::update(pool, inserted_person.id, &person_update_form).await?; + + let recipient_local_user_update_form = LocalUserUpdateForm { + show_bot_accounts: Some(false), + ..Default::default() + }; + LocalUser::update( + pool, + recipient_local_user.id, + &recipient_local_user_update_form, + ) + .await?; + let recipient_local_user_view = LocalUserView::read(pool, recipient_local_user.id) + .await? + .ok_or(LemmyErrorType::CouldntFindLocalUser)?; + + let unread_mentions_after_hide_bots = + PersonMentionView::get_unread_mentions(pool, &recipient_local_user_view.local_user).await?; + + let mut query_without_bots = query.clone(); + query_without_bots.show_bot_accounts = false; + let replies_after_hide_bots = query_without_bots.list(pool).await?; + assert_eq!(0, unread_mentions_after_hide_bots); + assert_eq!(0, replies_after_hide_bots.len()); + Comment::delete(pool, inserted_comment.id).await?; Post::delete(pool, inserted_post.id).await?; Community::delete(pool, inserted_community.id).await?; From e8a7bb07a30a95b922548e69847103b42c18263d Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 30 May 2024 11:08:27 +0200 Subject: [PATCH 4/5] fix both permanent stopping of federation queues and multiple creation of the same federation queues (#4754) Co-authored-by: Nutomic --- crates/federate/src/lib.rs | 48 ++++++++++++++++++++++++++++--------- crates/federate/src/util.rs | 21 +++++++++------- src/lib.rs | 5 ++-- 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 0e87b12de..21b9229b5 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -41,7 +41,7 @@ pub struct SendManager { } impl SendManager { - pub fn new(opts: Opts, context: FederationConfig) -> Self { + fn new(opts: Opts, context: FederationConfig) -> Self { assert!(opts.process_count > 0); assert!(opts.process_index > 0); assert!(opts.process_index <= opts.process_count); @@ -59,11 +59,27 @@ impl SendManager { } } - pub fn run(mut self) -> CancellableTask { - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move { - self.do_loop(cancel).await?; - self.cancel().await?; - Ok(()) + pub fn run(opts: Opts, context: FederationConfig) -> CancellableTask { + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| { + let opts = opts.clone(); + let context = context.clone(); + let mut manager = Self::new(opts, context); + async move { + let result = manager.do_loop(cancel).await; + // the loop function will only return if there is (a) an internal error (e.g. db connection + // failure) or (b) it was cancelled from outside. + if let Err(e) = result { + // don't let this error bubble up, just log it, so the below cancel function will run + // regardless + tracing::error!("SendManager failed: {e}"); + } + // cancel all the dependent workers as well to ensure they don't get orphaned and keep + // running. + manager.cancel().await?; + LemmyResult::Ok(()) + // if the task was not intentionally cancelled, then this whole lambda will be run again by + // CancellableTask after this + } }) } @@ -102,14 +118,24 @@ impl SendManager { continue; } // create new worker - let instance = instance.clone(); - let req_data = self.context.to_request_data(); + let context = self.context.clone(); let stats_sender = self.stats_sender.clone(); self.workers.insert( instance.id, - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| async move { - InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await?; - Ok(()) + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { + // if the instance worker ends unexpectedly due to internal/db errors, this lambda is rerun by cancellabletask. + let instance = instance.clone(); + let req_data = context.to_request_data(); + let stats_sender = stats_sender.clone(); + async move { + InstanceWorker::init_and_loop( + instance, + req_data, + stop, + stats_sender, + ) + .await + } }), ); } else if !should_federate { diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index b3d080385..60361c3c9 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -18,7 +18,6 @@ use lemmy_db_schema::{ traits::ApubActor, utils::{get_conn, DbPool}, }; -use lemmy_utils::error::LemmyResult; use moka::future::Cache; use once_cell::sync::Lazy; use reqwest::Url; @@ -26,7 +25,6 @@ use serde_json::Value; use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; -use tracing::error; /// Decrease the delays of the federation queue. /// Should only be used for federation tests since it significantly increases CPU and DB load of the @@ -62,24 +60,31 @@ impl CancellableTask { /// spawn a task but with graceful shutdown pub fn spawn( timeout: Duration, - task: impl FnOnce(CancellationToken) -> F + Send + 'static, + task: impl Fn(CancellationToken) -> F + Send + 'static, ) -> CancellableTask where - F: Future> + Send + 'static, + F: Future + Send + 'static, R: Send + Debug + 'static, { let stop = CancellationToken::new(); let stop2 = stop.clone(); - let task: JoinHandle> = tokio::spawn(task(stop2)); + let task: JoinHandle<()> = tokio::spawn(async move { + loop { + let res = task(stop2.clone()).await; + if stop2.is_cancelled() { + return; + } else { + tracing::warn!("task exited, restarting: {res:?}"); + } + } + }); let abort = task.abort_handle(); CancellableTask { f: Box::pin(async move { stop.cancel(); tokio::select! { r = task => { - if let Err(ref e) = r? { - error!("CancellableTask threw error: {e}"); - } + r.context("CancellableTask failed to cancel cleanly, returned error")?; Ok(()) }, _ = sleep(timeout) => { diff --git a/src/lib.rs b/src/lib.rs index c2b5e57c2..26740a444 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -210,14 +210,13 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { None }; let federate = (!args.disable_activity_sending).then(|| { - let task = SendManager::new( + SendManager::run( Opts { process_index: args.federate_process_index, process_count: args.federate_process_count, }, federation_config, - ); - task.run() + ) }); let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; From d2083f79d9a9b33a29139f82a638e61e76bb05f0 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 30 May 2024 11:55:34 +0200 Subject: [PATCH 5/5] Version 0.19.4-rc.4 --- Cargo.lock | 26 +++++++++++++------------- Cargo.toml | 24 ++++++++++++------------ 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0fc78c43..72bdab6a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2735,7 +2735,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lemmy_api" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -2764,7 +2764,7 @@ dependencies = [ [[package]] name = "lemmy_api_common" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -2802,7 +2802,7 @@ dependencies = [ [[package]] name = "lemmy_api_crud" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "accept-language", "activitypub_federation", @@ -2825,7 +2825,7 @@ dependencies = [ [[package]] name = "lemmy_apub" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -2863,7 +2863,7 @@ dependencies = [ [[package]] name = "lemmy_db_perf" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "anyhow", "clap", @@ -2878,7 +2878,7 @@ dependencies = [ [[package]] name = "lemmy_db_schema" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "anyhow", @@ -2918,7 +2918,7 @@ dependencies = [ [[package]] name = "lemmy_db_views" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "actix-web", "chrono", @@ -2940,7 +2940,7 @@ dependencies = [ [[package]] name = "lemmy_db_views_actor" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "chrono", "diesel", @@ -2961,7 +2961,7 @@ dependencies = [ [[package]] name = "lemmy_db_views_moderator" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "diesel", "diesel-async", @@ -2973,7 +2973,7 @@ dependencies = [ [[package]] name = "lemmy_federate" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "anyhow", @@ -2998,7 +2998,7 @@ dependencies = [ [[package]] name = "lemmy_routes" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-web", @@ -3023,7 +3023,7 @@ dependencies = [ [[package]] name = "lemmy_server" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "activitypub_federation", "actix-cors", @@ -3066,7 +3066,7 @@ dependencies = [ [[package]] name = "lemmy_utils" -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" dependencies = [ "actix-web", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index d03d195cb..16940d3ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.19.4-rc.3" +version = "0.19.4-rc.4" edition = "2021" description = "A link aggregator for the fediverse" license = "AGPL-3.0" @@ -88,17 +88,17 @@ unused_self = "deny" unwrap_used = "deny" [workspace.dependencies] -lemmy_api = { version = "=0.19.4-rc.3", path = "./crates/api" } -lemmy_api_crud = { version = "=0.19.4-rc.3", path = "./crates/api_crud" } -lemmy_apub = { version = "=0.19.4-rc.3", path = "./crates/apub" } -lemmy_utils = { version = "=0.19.4-rc.3", path = "./crates/utils", default-features = false } -lemmy_db_schema = { version = "=0.19.4-rc.3", path = "./crates/db_schema" } -lemmy_api_common = { version = "=0.19.4-rc.3", path = "./crates/api_common" } -lemmy_routes = { version = "=0.19.4-rc.3", path = "./crates/routes" } -lemmy_db_views = { version = "=0.19.4-rc.3", path = "./crates/db_views" } -lemmy_db_views_actor = { version = "=0.19.4-rc.3", path = "./crates/db_views_actor" } -lemmy_db_views_moderator = { version = "=0.19.4-rc.3", path = "./crates/db_views_moderator" } -lemmy_federate = { version = "=0.19.4-rc.3", path = "./crates/federate" } +lemmy_api = { version = "=0.19.4-rc.4", path = "./crates/api" } +lemmy_api_crud = { version = "=0.19.4-rc.4", path = "./crates/api_crud" } +lemmy_apub = { version = "=0.19.4-rc.4", path = "./crates/apub" } +lemmy_utils = { version = "=0.19.4-rc.4", path = "./crates/utils", default-features = false } +lemmy_db_schema = { version = "=0.19.4-rc.4", path = "./crates/db_schema" } +lemmy_api_common = { version = "=0.19.4-rc.4", path = "./crates/api_common" } +lemmy_routes = { version = "=0.19.4-rc.4", path = "./crates/routes" } +lemmy_db_views = { version = "=0.19.4-rc.4", path = "./crates/db_views" } +lemmy_db_views_actor = { version = "=0.19.4-rc.4", path = "./crates/db_views_actor" } +lemmy_db_views_moderator = { version = "=0.19.4-rc.4", path = "./crates/db_views_moderator" } +lemmy_federate = { version = "=0.19.4-rc.4", path = "./crates/federate" } activitypub_federation = { version = "0.5.6", default-features = false, features = [ "actix-web", ] }