From 7902138df2c1959cf3775fe3f242b163d3d74448 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 8 May 2024 12:40:14 +0200 Subject: [PATCH] extract helper fn --- crates/apub/src/api/user_settings_backup.rs | 284 ++++++++------------ 1 file changed, 118 insertions(+), 166 deletions(-) diff --git a/crates/apub/src/api/user_settings_backup.rs b/crates/apub/src/api/user_settings_backup.rs index 18f28e12d..dbf74e4ed 100644 --- a/crates/apub/src/api/user_settings_backup.rs +++ b/crates/apub/src/api/user_settings_backup.rs @@ -3,8 +3,8 @@ use crate::objects::{ community::ApubCommunity, person::ApubPerson, post::ApubPost, -}; -use activitypub_federation::{config::Data, fetch::object_id::ObjectId}; +};use itertools::Itertools; +use activitypub_federation::{config::Data, fetch::object_id::ObjectId, traits::Object}; use actix_web::web::Json; use futures::{future::try_join_all, StreamExt}; use lemmy_api_common::{context::LemmyContext, SuccessResponse}; @@ -30,8 +30,11 @@ use lemmy_utils::{ spawn_try_task, }; use serde::{Deserialize, Serialize}; +use std::future::Future; use tracing::info; +const PARALLELISM: usize = 10; + /// Backup of user data. This struct should never be changed so that the data can be used as a /// long-term backup in case the instance goes down unexpectedly. All fields are optional to allow /// importing partial backups. @@ -168,170 +171,88 @@ pub async fn import_settings( } spawn_try_task(async move { - const PARALLELISM: usize = 10; let person_id = local_user_view.person.id; - // These tasks fetch objects from remote instances which might be down. - // TODO: Would be nice if we could send a list of failed items with api response, but then - // the request would likely timeout. - let mut failed_items = vec![]; - info!( - "Starting settings backup for {}", + "Starting settings import for {}", local_user_view.person.name ); - futures::stream::iter( - data - .followed_communities - .clone() - .into_iter() - // reset_request_count works like clone, and is necessary to avoid running into request limit - .map(|f| (f, context.reset_request_count())) - .map(|(followed, context)| async move { - // need to reset outgoing request count to avoid running into limit - let community = followed.dereference(&context).await?; - let form = CommunityFollowerForm { - person_id, - community_id: community.id, - pending: true, - }; - CommunityFollower::follow(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import followed community: {e}"); - } - }); - - futures::stream::iter( - data - .saved_posts - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(saved, context)| async move { - let post = saved.dereference(&context).await?; - let form = PostSavedForm { - person_id, - post_id: post.id, - }; - PostSaved::save(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import saved post community: {e}"); - } - }); - - futures::stream::iter( - data - .saved_comments - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(saved, context)| async move { - let comment = saved.dereference(&context).await?; - let form = CommentSavedForm { - person_id, - comment_id: comment.id, - }; - CommentSaved::save(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import saved comment community: {e}"); - } - }); - - let failed_items: Vec<_> = failed_items.into_iter().flatten().collect(); - info!( - "Finished settings backup for {}, failed items: {:#?}", - local_user_view.person.name, failed_items - ); - - // These tasks don't connect to any remote instances but only insert directly in the database. - // That means the only error condition are db connection failures, so no extra error handling is - // needed. - futures::stream::iter( - data - .blocked_communities - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(blocked, context)| async move { - let community = blocked.dereference(&context).await?; - let form = CommunityBlockForm { - person_id, - community_id: community.id, - }; - CommunityBlock::block(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - //failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - //info!("Failed to import saved post community: {e}"); - } - }); - /* - try_join_all(data.blocked_communities.iter().map(|blocked| async { - let context = context.reset_request_count(); - // Ignore fetch errors - let community = blocked.dereference(&context).await?; - let form = CommunityBlockForm { - person_id, - community_id: community.id, - }; - CommunityBlock::block(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - })) - .await?; - */ - - try_join_all(data.blocked_users.iter().map(|blocked| async { - let context = context.reset_request_count(); - // Ignore fetch errors - let target = blocked.dereference(&context).await.ok(); - if let Some(target) = target { - let form = PersonBlockForm { + let failed_followed_communities = fetch_and_import( + data.followed_communities.clone(), + &context, + |(followed, context)| async move { + let community = followed.dereference(&context).await?; + let form = CommunityFollowerForm { person_id, - target_id: target.id, + community_id: community.id, + pending: true, }; - PersonBlock::block(&mut context.pool(), &form).await?; - } - LemmyResult::Ok(()) - })) + CommunityFollower::follow(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_saved_posts = fetch_and_import( + data.saved_posts.clone(), + &context, + |(saved, context)| async move { + let post = saved.dereference(&context).await?; + let form = PostSavedForm { + person_id, + post_id: post.id, + }; + PostSaved::save(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_saved_comments = fetch_and_import( + data.saved_comments.clone(), + &context, + |(saved, context)| async move { + let comment = saved.dereference(&context).await?; + let form = CommentSavedForm { + person_id, + comment_id: comment.id, + }; + CommentSaved::save(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_community_blocks = fetch_and_import( + data.blocked_communities.clone(), + &context, + |(blocked, context)| async move { + let community = blocked.dereference(&context).await?; + let form = CommunityBlockForm { + person_id, + community_id: community.id, + }; + CommunityBlock::block(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_user_blocks = fetch_and_import( + data.blocked_users.clone(), + &context, + |(blocked, context)| async move { + let context = context.reset_request_count(); + let target = blocked.dereference(&context).await?; + let form = PersonBlockForm { + person_id, + target_id: target.id, + }; + PersonBlock::block(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) .await?; try_join_all(data.blocked_instances.iter().map(|domain| async { @@ -345,18 +266,54 @@ pub async fn import_settings( })) .await?; + info!("Settings import completed for {}, the following items failed: {failed_followed_communities}, {failed_saved_posts}, {failed_saved_comments}, {failed_community_blocks}, {failed_user_blocks}", + local_user_view.person.name); + Ok(()) }); Ok(Json(Default::default())) } +async fn fetch_and_import( + objects: Vec>, + context: &Data, + import_fn: impl FnMut((ObjectId, Data)) -> Fut, +) -> LemmyResult +where + Kind: Object + Send + 'static, + for<'de2> ::Kind: Deserialize<'de2>, + Fut: Future>, +{ + let mut failed_items = vec![]; + futures::stream::iter( + objects + .clone() + .into_iter() + // need to reset outgoing request count to avoid running into limit + .map(|s| (s, context.reset_request_count())) + .map(import_fn), + ) + .buffer_unordered(PARALLELISM) + .collect::>() + .await + .into_iter() + .enumerate() + .for_each(|(i, r): (usize, LemmyResult<()>)| { + if r.is_err() { + if let Some(object) = objects.get(i) { + failed_items.push(object.inner().clone()); + } + } + }); + Ok(failed_items.into_iter().join(",")) +} #[cfg(test)] #[allow(clippy::indexing_slicing)] mod tests { use crate::api::user_settings_backup::{export_settings, import_settings, UserSettingsBackup}; - use activitypub_federation::config::Data;use lemmy_db_views_actor::structs::CommunityBlockView; + use activitypub_federation::config::Data; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ source::{ @@ -368,7 +325,7 @@ mod tests { traits::{Crud, Followable}, }; use lemmy_db_views::structs::LocalUserView; - use lemmy_db_views_actor::structs::CommunityFollowerView; + use lemmy_db_views_actor::structs::{CommunityBlockView, CommunityFollowerView}; use lemmy_utils::error::{LemmyErrorType, LemmyResult}; use pretty_assertions::assert_eq; use serial_test::serial; @@ -527,7 +484,6 @@ mod tests { Ok(()) } - #[tokio::test] #[serial] async fn test_settings_fetch_and_import() -> LemmyResult<()> { @@ -554,11 +510,7 @@ mod tests { // wait for background task to finish sleep(Duration::from_millis(1000)).await; - let blocks = CommunityBlockView::for_person( - &mut context.pool(), - import_user.person.id, - ) - .await?; + let blocks = CommunityBlockView::for_person(&mut context.pool(), import_user.person.id).await?; assert_eq!(blocks.len(), 3); LocalUser::delete(&mut context.pool(), import_user.local_user.id).await?; Ok(())