diff --git a/crates/apub/src/api/user_settings_backup.rs b/crates/apub/src/api/user_settings_backup.rs index 57e1d0f97..8b4202258 100644 --- a/crates/apub/src/api/user_settings_backup.rs +++ b/crates/apub/src/api/user_settings_backup.rs @@ -4,9 +4,10 @@ use crate::objects::{ person::ApubPerson, post::ApubPost, }; -use activitypub_federation::{config::Data, fetch::object_id::ObjectId}; +use activitypub_federation::{config::Data, fetch::object_id::ObjectId, traits::Object}; use actix_web::web::Json; use futures::{future::try_join_all, StreamExt}; +use itertools::Itertools; use lemmy_api_common::{context::LemmyContext, SuccessResponse}; use lemmy_db_schema::{ newtypes::DbUrl, @@ -30,8 +31,11 @@ use lemmy_utils::{ spawn_try_task, }; use serde::{Deserialize, Serialize}; +use std::future::Future; use tracing::info; +const PARALLELISM: usize = 10; + /// Backup of user data. This struct should never be changed so that the data can be used as a /// long-term backup in case the instance goes down unexpectedly. All fields are optional to allow /// importing partial backups. @@ -40,7 +44,7 @@ use tracing::info; /// /// Be careful with any changes to this struct, to avoid breaking changes which could prevent /// importing older backups. -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, Default)] pub struct UserSettingsBackup { pub display_name: Option, pub bio: Option, @@ -133,7 +137,8 @@ pub async fn import_settings( local_user_view.local_user.id, &local_user_form, ) - .await?; + .await + .ok(); // Update the vote display mode settings let vote_display_mode_form = LocalUserVoteDisplayModeUpdateForm { @@ -167,141 +172,91 @@ pub async fn import_settings( } spawn_try_task(async move { - const PARALLELISM: usize = 10; let person_id = local_user_view.person.id; - // These tasks fetch objects from remote instances which might be down. - // TODO: Would be nice if we could send a list of failed items with api response, but then - // the request would likely timeout. - let mut failed_items = vec![]; - info!( - "Starting settings backup for {}", + "Starting settings import for {}", local_user_view.person.name ); - futures::stream::iter( - data - .followed_communities - .clone() - .into_iter() - // reset_request_count works like clone, and is necessary to avoid running into request limit - .map(|f| (f, context.reset_request_count())) - .map(|(followed, context)| async move { - // need to reset outgoing request count to avoid running into limit - let community = followed.dereference(&context).await?; - let form = CommunityFollowerForm { - person_id, - community_id: community.id, - pending: true, - }; - CommunityFollower::follow(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), + let failed_followed_communities = fetch_and_import( + data.followed_communities.clone(), + &context, + |(followed, context)| async move { + let community = followed.dereference(&context).await?; + let form = CommunityFollowerForm { + person_id, + community_id: community.id, + pending: true, + }; + CommunityFollower::follow(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import followed community: {e}"); - } - }); - - futures::stream::iter( - data - .saved_posts - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(saved, context)| async move { - let post = saved.dereference(&context).await?; - let form = PostSavedForm { - person_id, - post_id: post.id, - }; - PostSaved::save(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import saved post community: {e}"); - } - }); - - futures::stream::iter( - data - .saved_comments - .clone() - .into_iter() - .map(|s| (s, context.reset_request_count())) - .map(|(saved, context)| async move { - let comment = saved.dereference(&context).await?; - let form = CommentSavedForm { - person_id, - comment_id: comment.id, - }; - CommentSaved::save(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - }), - ) - .buffer_unordered(PARALLELISM) - .collect::>() - .await - .into_iter() - .enumerate() - .for_each(|(i, r)| { - if let Err(e) = r { - failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone())); - info!("Failed to import saved comment community: {e}"); - } - }); - - let failed_items: Vec<_> = failed_items.into_iter().flatten().collect(); - info!( - "Finished settings backup for {}, failed items: {:#?}", - local_user_view.person.name, failed_items - ); - - // These tasks don't connect to any remote instances but only insert directly in the database. - // That means the only error condition are db connection failures, so no extra error handling is - // needed. - try_join_all(data.blocked_communities.iter().map(|blocked| async { - // dont fetch unknown blocked objects from home server - let community = blocked.dereference_local(&context).await?; - let form = CommunityBlockForm { - person_id, - community_id: community.id, - }; - CommunityBlock::block(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - })) .await?; - try_join_all(data.blocked_users.iter().map(|blocked| async { - // dont fetch unknown blocked objects from home server - let target = blocked.dereference_local(&context).await?; - let form = PersonBlockForm { - person_id, - target_id: target.id, - }; - PersonBlock::block(&mut context.pool(), &form).await?; - LemmyResult::Ok(()) - })) + let failed_saved_posts = fetch_and_import( + data.saved_posts.clone(), + &context, + |(saved, context)| async move { + let post = saved.dereference(&context).await?; + let form = PostSavedForm { + person_id, + post_id: post.id, + }; + PostSaved::save(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_saved_comments = fetch_and_import( + data.saved_comments.clone(), + &context, + |(saved, context)| async move { + let comment = saved.dereference(&context).await?; + let form = CommentSavedForm { + person_id, + comment_id: comment.id, + }; + CommentSaved::save(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_community_blocks = fetch_and_import( + data.blocked_communities.clone(), + &context, + |(blocked, context)| async move { + let community = blocked.dereference(&context).await?; + let form = CommunityBlockForm { + person_id, + community_id: community.id, + }; + CommunityBlock::block(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) + .await?; + + let failed_user_blocks = fetch_and_import( + data.blocked_users.clone(), + &context, + |(blocked, context)| async move { + let context = context.reset_request_count(); + let target = blocked.dereference(&context).await?; + let form = PersonBlockForm { + person_id, + target_id: target.id, + }; + PersonBlock::block(&mut context.pool(), &form).await?; + LemmyResult::Ok(()) + }, + ) .await?; try_join_all(data.blocked_instances.iter().map(|domain| async { - // dont fetch unknown blocked objects from home server let instance = Instance::read_or_create(&mut context.pool(), domain.clone()).await?; let form = InstanceBlockForm { person_id, @@ -312,17 +267,53 @@ pub async fn import_settings( })) .await?; + info!("Settings import completed for {}, the following items failed: {failed_followed_communities}, {failed_saved_posts}, {failed_saved_comments}, {failed_community_blocks}, {failed_user_blocks}", + local_user_view.person.name); + Ok(()) }); Ok(Json(Default::default())) } +async fn fetch_and_import( + objects: Vec>, + context: &Data, + import_fn: impl FnMut((ObjectId, Data)) -> Fut, +) -> LemmyResult +where + Kind: Object + Send + 'static, + for<'de2> ::Kind: Deserialize<'de2>, + Fut: Future>, +{ + let mut failed_items = vec![]; + futures::stream::iter( + objects + .clone() + .into_iter() + // need to reset outgoing request count to avoid running into limit + .map(|s| (s, context.reset_request_count())) + .map(import_fn), + ) + .buffer_unordered(PARALLELISM) + .collect::>() + .await + .into_iter() + .enumerate() + .for_each(|(i, r): (usize, LemmyResult<()>)| { + if r.is_err() { + if let Some(object) = objects.get(i) { + failed_items.push(object.inner().clone()); + } + } + }); + Ok(failed_items.into_iter().join(",")) +} #[cfg(test)] #[allow(clippy::indexing_slicing)] mod tests { - use crate::api::user_settings_backup::{export_settings, import_settings}; + use crate::api::user_settings_backup::{export_settings, import_settings, UserSettingsBackup}; use activitypub_federation::config::Data; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ @@ -420,6 +411,44 @@ mod tests { Ok(()) } + #[tokio::test] + #[serial] + async fn test_settings_partial_import() -> LemmyResult<()> { + let context = LemmyContext::init_test_context().await; + + let export_user = + create_user("hanna".to_string(), Some("my bio".to_string()), &context).await?; + + let community_form = CommunityInsertForm::builder() + .name("testcom".to_string()) + .title("testcom".to_string()) + .instance_id(export_user.person.instance_id) + .build(); + let community = Community::create(&mut context.pool(), &community_form).await?; + let follower_form = CommunityFollowerForm { + community_id: community.id, + person_id: export_user.person.id, + pending: false, + }; + CommunityFollower::follow(&mut context.pool(), &follower_form).await?; + + let backup = export_settings(export_user.clone(), context.reset_request_count()).await?; + + let import_user = create_user("charles".to_string(), None, &context).await?; + + let backup2 = UserSettingsBackup { + followed_communities: backup.followed_communities.clone(), + ..Default::default() + }; + import_settings( + actix_web::web::Json(backup2), + import_user.clone(), + context.reset_request_count(), + ) + .await?; + Ok(()) + } + #[tokio::test] #[serial] async fn disallow_large_backup() -> LemmyResult<()> { diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index a5643b95c..c8960b008 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -29,7 +29,9 @@ pub(crate) mod mentions; pub mod objects; pub mod protocol; -pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50; +/// Maximum number of outgoing HTTP requests to fetch a single object. Needs to be high enough +/// to fetch a new community with posts, moderators and featured posts. +pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 100; /// Only include a basic context to save space and bandwidth. The main context is hosted statically /// on join-lemmy.org. Include activitystreams explicitly for better compat, but this could