diff --git a/crates/apub/src/activities/block/block_user.rs b/crates/apub/src/activities/block/block_user.rs index f8a1e4b8d..55642f862 100644 --- a/crates/apub/src/activities/block/block_user.rs +++ b/crates/apub/src/activities/block/block_user.rs @@ -9,7 +9,7 @@ use crate::{ verify_person_in_community, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, objects::{instance::remote_instance_inboxes, person::ApubPerson}, protocol::activities::block::block_user::BlockUser, }; @@ -124,6 +124,7 @@ impl ActivityHandler for BlockUser { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; match self.target.dereference(context).await? { SiteOrCommunity::Site(site) => { @@ -147,7 +148,6 @@ impl ActivityHandler for BlockUser { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; let expires = self.expires.map(|u| u.naive_local()); let mod_person = self.actor.dereference(context).await?; let blocked_person = self.object.dereference(context).await?; diff --git a/crates/apub/src/activities/block/undo_block_user.rs b/crates/apub/src/activities/block/undo_block_user.rs index b31f8b4b2..f68349794 100644 --- a/crates/apub/src/activities/block/undo_block_user.rs +++ b/crates/apub/src/activities/block/undo_block_user.rs @@ -7,7 +7,7 @@ use crate::{ verify_is_public, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, objects::{instance::remote_instance_inboxes, person::ApubPerson}, protocol::activities::block::{block_user::BlockUser, undo_block_user::UndoBlockUser}, }; @@ -88,6 +88,7 @@ impl ActivityHandler for UndoBlockUser { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; verify_domains_match(self.actor.inner(), self.object.actor.inner())?; self.object.verify(context).await?; @@ -96,7 +97,6 @@ impl ActivityHandler for UndoBlockUser { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; let expires = self.object.expires.map(|u| u.naive_local()); let mod_person = self.actor.dereference(context).await?; let blocked_person = self.object.object.dereference(context).await?; diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index e33e9fbf4..ed489158e 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -6,7 +6,7 @@ use crate::{ verify_person_in_community, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, objects::community::ApubCommunity, protocol::{ activities::community::announce::{AnnounceActivity, RawAnnouncableActivities}, @@ -133,14 +133,14 @@ impl ActivityHandler for AnnounceActivity { } #[tracing::instrument(skip_all)] - async fn verify(&self, _context: &Data) -> Result<(), LemmyError> { + async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; Ok(()) } #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; let object: AnnouncableActivities = self.object.object(context).await?.try_into()?; // This is only for sending, not receiving so we reject it. if let AnnouncableActivities::Page(_) = object { diff --git a/crates/apub/src/activities/community/collection_add.rs b/crates/apub/src/activities/community/collection_add.rs index d08b0cb48..c36a8f0da 100644 --- a/crates/apub/src/activities/community/collection_add.rs +++ b/crates/apub/src/activities/community/collection_add.rs @@ -7,7 +7,7 @@ use crate::{ verify_person_in_community, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost}, protocol::{ activities::community::{collection_add::CollectionAdd, collection_remove::CollectionRemove}, @@ -108,6 +108,7 @@ impl ActivityHandler for CollectionAdd { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; @@ -117,7 +118,6 @@ impl ActivityHandler for CollectionAdd { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; let (community, collection_type) = Community::get_by_collection_url(&mut context.pool(), &self.target.into()).await?; match collection_type { diff --git a/crates/apub/src/activities/community/collection_remove.rs b/crates/apub/src/activities/community/collection_remove.rs index a1c443ea8..28214284b 100644 --- a/crates/apub/src/activities/community/collection_remove.rs +++ b/crates/apub/src/activities/community/collection_remove.rs @@ -7,7 +7,7 @@ use crate::{ verify_person_in_community, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost}, protocol::{activities::community::collection_remove::CollectionRemove, InCommunity}, }; @@ -101,6 +101,7 @@ impl ActivityHandler for CollectionRemove { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; @@ -110,7 +111,6 @@ impl ActivityHandler for CollectionRemove { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; let (community, collection_type) = Community::get_by_collection_url(&mut context.pool(), &self.target.into()).await?; match collection_type { diff --git a/crates/apub/src/activities/community/lock_page.rs b/crates/apub/src/activities/community/lock_page.rs index 0416b972a..94135ede9 100644 --- a/crates/apub/src/activities/community/lock_page.rs +++ b/crates/apub/src/activities/community/lock_page.rs @@ -8,7 +8,7 @@ use crate::{ verify_person_in_community, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, protocol::{ activities::community::lock_page::{LockPage, LockType, UndoLockPage}, InCommunity, @@ -79,6 +79,7 @@ impl ActivityHandler for UndoLockPage { } async fn verify(&self, context: &Data) -> Result<(), Self::Error> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; @@ -94,7 +95,6 @@ impl ActivityHandler for UndoLockPage { } async fn receive(self, context: &Data) -> Result<(), Self::Error> { - insert_activity(&self.id, &self, false, false, context).await?; let form = PostUpdateForm::builder().locked(Some(false)).build(); let post = self.object.object.dereference(context).await?; Post::update(&mut context.pool(), post.id, &form).await?; diff --git a/crates/apub/src/activities/community/report.rs b/crates/apub/src/activities/community/report.rs index 1dffacc39..67b84644e 100644 --- a/crates/apub/src/activities/community/report.rs +++ b/crates/apub/src/activities/community/report.rs @@ -1,6 +1,6 @@ use crate::{ activities::{generate_activity_id, send_lemmy_activity, verify_person_in_community}, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson}, protocol::{activities::community::report::Report, InCommunity}, PostOrComment, @@ -115,6 +115,7 @@ impl ActivityHandler for Report { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; Ok(()) @@ -122,7 +123,6 @@ impl ActivityHandler for Report { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, true, context).await?; let actor = self.actor.dereference(context).await?; match self.object.dereference(context).await? { PostOrComment::Post(post) => { diff --git a/crates/apub/src/activities/community/update.rs b/crates/apub/src/activities/community/update.rs index 3e697fddc..fe2477d6e 100644 --- a/crates/apub/src/activities/community/update.rs +++ b/crates/apub/src/activities/community/update.rs @@ -7,7 +7,7 @@ use crate::{ verify_person_in_community, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson}, protocol::{activities::community::update::UpdateCommunity, InCommunity}, SendActivity, @@ -82,6 +82,7 @@ impl ActivityHandler for UpdateCommunity { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; @@ -92,7 +93,6 @@ impl ActivityHandler for UpdateCommunity { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; let community = self.community(context).await?; let community_update_form = self.object.into_update_form(); diff --git a/crates/apub/src/activities/create_or_update/comment.rs b/crates/apub/src/activities/create_or_update/comment.rs index 804f1827b..51b87ed27 100644 --- a/crates/apub/src/activities/create_or_update/comment.rs +++ b/crates/apub/src/activities/create_or_update/comment.rs @@ -7,7 +7,7 @@ use crate::{ verify_person_in_community, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, mentions::MentionOrValue, objects::{comment::ApubComment, community::ApubCommunity, person::ApubPerson}, protocol::{ @@ -154,6 +154,7 @@ impl ActivityHandler for CreateOrUpdateNote { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; let post = self.object.get_parents(context).await?.0; let community = self.community(context).await?; @@ -169,7 +170,6 @@ impl ActivityHandler for CreateOrUpdateNote { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; // Need to do this check here instead of Note::from_json because we need the person who // send the activity, not the comment author. let existing_comment = self.object.id.dereference_local(context).await.ok(); diff --git a/crates/apub/src/activities/create_or_update/post.rs b/crates/apub/src/activities/create_or_update/post.rs index e0ce0fec4..77199056d 100644 --- a/crates/apub/src/activities/create_or_update/post.rs +++ b/crates/apub/src/activities/create_or_update/post.rs @@ -8,7 +8,7 @@ use crate::{ verify_person_in_community, }, activity_lists::AnnouncableActivities, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost}, protocol::{ activities::{create_or_update::page::CreateOrUpdatePage, CreateOrUpdateType}, @@ -146,6 +146,7 @@ impl ActivityHandler for CreateOrUpdatePage { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &self.cc)?; let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; @@ -180,7 +181,6 @@ impl ActivityHandler for CreateOrUpdatePage { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; let post = ApubPost::from_json(self.object, context).await?; // author likes their own post by default diff --git a/crates/apub/src/activities/create_or_update/private_message.rs b/crates/apub/src/activities/create_or_update/private_message.rs index 36c9785da..3eaad2f71 100644 --- a/crates/apub/src/activities/create_or_update/private_message.rs +++ b/crates/apub/src/activities/create_or_update/private_message.rs @@ -1,6 +1,6 @@ use crate::{ activities::{generate_activity_id, send_lemmy_activity, verify_person}, - insert_activity, + insert_received_activity, objects::{person::ApubPerson, private_message::ApubPrivateMessage}, protocol::activities::{ create_or_update::chat_message::CreateOrUpdateChatMessage, @@ -109,6 +109,7 @@ impl ActivityHandler for CreateOrUpdateChatMessage { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_person(&self.actor, context).await?; verify_domains_match(self.actor.inner(), self.object.id.inner())?; verify_domains_match(self.to[0].inner(), self.object.to[0].inner())?; @@ -118,7 +119,6 @@ impl ActivityHandler for CreateOrUpdateChatMessage { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, true, context).await?; ApubPrivateMessage::from_json(self.object, context).await?; Ok(()) } diff --git a/crates/apub/src/activities/deletion/delete.rs b/crates/apub/src/activities/deletion/delete.rs index 8ad104173..fcdede8d7 100644 --- a/crates/apub/src/activities/deletion/delete.rs +++ b/crates/apub/src/activities/deletion/delete.rs @@ -3,7 +3,7 @@ use crate::{ deletion::{receive_delete_action, verify_delete_activity, DeletableObjects}, generate_activity_id, }, - insert_activity, + insert_received_activity, objects::person::ApubPerson, protocol::{activities::deletion::delete::Delete, IdOrNestedObject}, }; @@ -43,13 +43,13 @@ impl ActivityHandler for Delete { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_delete_activity(self, self.summary.is_some(), context).await?; Ok(()) } #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; if let Some(reason) = self.summary { // We set reason to empty string if it doesn't exist, to distinguish between delete and // remove. Here we change it back to option, so we don't write it to db. diff --git a/crates/apub/src/activities/deletion/delete_user.rs b/crates/apub/src/activities/deletion/delete_user.rs index d74a3c8aa..b388ed9e1 100644 --- a/crates/apub/src/activities/deletion/delete_user.rs +++ b/crates/apub/src/activities/deletion/delete_user.rs @@ -1,6 +1,6 @@ use crate::{ activities::{generate_activity_id, send_lemmy_activity, verify_is_public, verify_person}, - insert_activity, + insert_received_activity, objects::{instance::remote_instance_inboxes, person::ApubPerson}, protocol::activities::deletion::delete_user::DeleteUser, SendActivity, @@ -73,6 +73,7 @@ impl ActivityHandler for DeleteUser { } async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_is_public(&self.to, &[])?; verify_person(&self.actor, context).await?; verify_urls_match(self.actor.inner(), self.object.inner())?; @@ -80,7 +81,6 @@ impl ActivityHandler for DeleteUser { } async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; let actor = self.actor.dereference(context).await?; delete_user_account( actor.id, diff --git a/crates/apub/src/activities/deletion/undo_delete.rs b/crates/apub/src/activities/deletion/undo_delete.rs index e10bd0660..541a7455f 100644 --- a/crates/apub/src/activities/deletion/undo_delete.rs +++ b/crates/apub/src/activities/deletion/undo_delete.rs @@ -3,7 +3,7 @@ use crate::{ deletion::{receive_delete_action, verify_delete_activity, DeletableObjects}, generate_activity_id, }, - insert_activity, + insert_received_activity, objects::person::ApubPerson, protocol::activities::deletion::{delete::Delete, undo_delete::UndoDelete}, }; @@ -42,6 +42,7 @@ impl ActivityHandler for UndoDelete { } async fn verify(&self, data: &Data) -> Result<(), Self::Error> { + insert_received_activity(&self.id, data).await?; self.object.verify(data).await?; verify_delete_activity(&self.object, self.object.summary.is_some(), data).await?; Ok(()) @@ -49,7 +50,6 @@ impl ActivityHandler for UndoDelete { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, false, context).await?; if self.object.summary.is_some() { UndoDelete::receive_undo_remove_action( &self.actor.dereference(context).await?, diff --git a/crates/apub/src/activities/following/accept.rs b/crates/apub/src/activities/following/accept.rs index af7d63725..adaad51d1 100644 --- a/crates/apub/src/activities/following/accept.rs +++ b/crates/apub/src/activities/following/accept.rs @@ -1,6 +1,6 @@ use crate::{ activities::{generate_activity_id, send_lemmy_activity}, - insert_activity, + insert_received_activity, protocol::activities::following::{accept::AcceptFollow, follow::Follow}, }; use activitypub_federation::{ @@ -50,6 +50,7 @@ impl ActivityHandler for AcceptFollow { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_urls_match(self.actor.inner(), self.object.object.inner())?; self.object.verify(context).await?; if let Some(to) = &self.to { @@ -60,7 +61,6 @@ impl ActivityHandler for AcceptFollow { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, true, context).await?; let community = self.actor.dereference(context).await?; let person = self.object.actor.dereference(context).await?; // This will throw an error if no follow was requested diff --git a/crates/apub/src/activities/following/follow.rs b/crates/apub/src/activities/following/follow.rs index 073784da1..2f0f5037a 100644 --- a/crates/apub/src/activities/following/follow.rs +++ b/crates/apub/src/activities/following/follow.rs @@ -6,7 +6,7 @@ use crate::{ verify_person_in_community, }, fetcher::user_or_community::UserOrCommunity, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson}, protocol::activities::following::{ accept::AcceptFollow, @@ -90,6 +90,7 @@ impl ActivityHandler for Follow { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_person(&self.actor, context).await?; let object = self.object.dereference(context).await?; if let UserOrCommunity::Community(c) = object { @@ -103,7 +104,6 @@ impl ActivityHandler for Follow { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, true, context).await?; let actor = self.actor.dereference(context).await?; let object = self.object.dereference(context).await?; match object { diff --git a/crates/apub/src/activities/following/undo_follow.rs b/crates/apub/src/activities/following/undo_follow.rs index 9f18ccfbc..c36b36df8 100644 --- a/crates/apub/src/activities/following/undo_follow.rs +++ b/crates/apub/src/activities/following/undo_follow.rs @@ -1,7 +1,7 @@ use crate::{ activities::{generate_activity_id, send_lemmy_activity, verify_person}, fetcher::user_or_community::UserOrCommunity, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson}, protocol::activities::following::{follow::Follow, undo_follow::UndoFollow}, }; @@ -60,6 +60,7 @@ impl ActivityHandler for UndoFollow { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; verify_urls_match(self.actor.inner(), self.object.actor.inner())?; verify_person(&self.actor, context).await?; self.object.verify(context).await?; @@ -71,7 +72,6 @@ impl ActivityHandler for UndoFollow { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, true, context).await?; let person = self.actor.dereference(context).await?; let object = self.object.object.dereference(context).await?; diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index e0b46e0e7..4fd8da536 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -1,5 +1,4 @@ use crate::{ - insert_activity, objects::{community::ApubCommunity, person::ApubPerson}, CONTEXT, }; @@ -15,7 +14,11 @@ use anyhow::anyhow; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ newtypes::CommunityId, - source::{community::Community, instance::Instance}, + source::{ + activity::{SentActivity, SentActivityForm}, + community::Community, + instance::Instance, + }, }; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType}; @@ -184,7 +187,12 @@ where info!("Sending activity {}", activity.id().to_string()); let activity = WithContext::new(activity, CONTEXT.deref().clone()); - insert_activity(activity.id(), &activity, true, sensitive, data).await?; + let form = SentActivityForm { + ap_id: activity.id().clone().into(), + data: serde_json::to_value(activity.clone())?, + sensitive, + }; + SentActivity::create(&mut data.pool(), form).await?; send_activity(activity, actor, inbox, data).await?; Ok(()) diff --git a/crates/apub/src/activities/voting/undo_vote.rs b/crates/apub/src/activities/voting/undo_vote.rs index bcb8ee406..9616c651f 100644 --- a/crates/apub/src/activities/voting/undo_vote.rs +++ b/crates/apub/src/activities/voting/undo_vote.rs @@ -4,7 +4,7 @@ use crate::{ verify_person_in_community, voting::{undo_vote_comment, undo_vote_post}, }, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson}, protocol::{ activities::voting::{undo_vote::UndoVote, vote::Vote}, @@ -57,6 +57,7 @@ impl ActivityHandler for UndoVote { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; verify_urls_match(self.actor.inner(), self.object.actor.inner())?; @@ -66,7 +67,6 @@ impl ActivityHandler for UndoVote { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, true, context).await?; let actor = self.actor.dereference(context).await?; let object = self.object.object.dereference(context).await?; match object { diff --git a/crates/apub/src/activities/voting/vote.rs b/crates/apub/src/activities/voting/vote.rs index 4de9a8c17..ef4572986 100644 --- a/crates/apub/src/activities/voting/vote.rs +++ b/crates/apub/src/activities/voting/vote.rs @@ -4,7 +4,7 @@ use crate::{ verify_person_in_community, voting::{vote_comment, vote_post}, }, - insert_activity, + insert_received_activity, objects::{community::ApubCommunity, person::ApubPerson}, protocol::{ activities::voting::vote::{Vote, VoteType}, @@ -56,6 +56,7 @@ impl ActivityHandler for Vote { #[tracing::instrument(skip_all)] async fn verify(&self, context: &Data) -> Result<(), LemmyError> { + insert_received_activity(&self.id, context).await?; let community = self.community(context).await?; verify_person_in_community(&self.actor, &community, context).await?; let enable_downvotes = LocalSite::read(&mut context.pool()) @@ -70,7 +71,6 @@ impl ActivityHandler for Vote { #[tracing::instrument(skip_all)] async fn receive(self, context: &Data) -> Result<(), LemmyError> { - insert_activity(&self.id, &self, false, true, context).await?; let actor = self.actor.dereference(context).await?; let object = self.object.dereference(context).await?; match object { diff --git a/crates/apub/src/http/mod.rs b/crates/apub/src/http/mod.rs index 52a014434..c261d9e49 100644 --- a/crates/apub/src/http/mod.rs +++ b/crates/apub/src/http/mod.rs @@ -13,7 +13,7 @@ use activitypub_federation::{ use actix_web::{web, web::Bytes, HttpRequest, HttpResponse}; use http::StatusCode; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::source::activity::Activity; +use lemmy_db_schema::source::activity::SentActivity; use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult}; use serde::{Deserialize, Serialize}; use std::ops::Deref; @@ -88,12 +88,10 @@ pub(crate) async fn get_activity( info.id ))? .into(); - let activity = Activity::read_from_apub_id(&mut context.pool(), &activity_id).await?; + let activity = SentActivity::read_from_apub_id(&mut context.pool(), &activity_id).await?; let sensitive = activity.sensitive; - if !activity.local { - Err(err_object_not_local()) - } else if sensitive { + if sensitive { Ok(HttpResponse::Forbidden().finish()) } else { create_apub_response(&activity.data) diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index 8d8186022..9a45284f2 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -3,18 +3,12 @@ use activitypub_federation::config::{Data, UrlVerifier}; use async_trait::async_trait; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ - source::{ - activity::{Activity, ActivityInsertForm}, - instance::Instance, - local_site::LocalSite, - }, - traits::Crud, + source::{activity::ReceivedActivity, instance::Instance, local_site::LocalSite}, utils::{ActualDbPool, DbPool}, }; use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult}; use moka::future::Cache; use once_cell::sync::Lazy; -use serde::Serialize; use std::{sync::Arc, time::Duration}; use url::Url; @@ -178,30 +172,16 @@ pub(crate) async fn check_apub_id_valid_with_strictness( Ok(()) } -/// Store a sent or received activity in the database. +/// Store received activities in the database. /// -/// Stored activities are served over the HTTP endpoint `GET /activities/{type_}/{id}`. This also -/// ensures that the same activity cannot be received more than once. -#[tracing::instrument(skip(data, activity))] -async fn insert_activity( +/// This ensures that the same activity doesnt get received and processed more than once, which +/// would be a waste of resources. +#[tracing::instrument(skip(data))] +async fn insert_received_activity( ap_id: &Url, - activity: &T, - local: bool, - sensitive: bool, data: &Data, -) -> Result<(), LemmyError> -where - T: Serialize, -{ - let ap_id = ap_id.clone().into(); - let form = ActivityInsertForm { - ap_id, - data: serde_json::to_value(activity)?, - local: Some(local), - sensitive: Some(sensitive), - updated: None, - }; - Activity::create(&mut data.pool(), &form).await?; +) -> Result<(), LemmyError> { + ReceivedActivity::create(&mut data.pool(), &ap_id.clone().into()).await?; Ok(()) } diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index 4e581f95c..adda4fc76 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -1,143 +1,111 @@ use crate::{ + diesel::OptionalExtension, newtypes::DbUrl, - schema::activity::dsl::{activity, ap_id}, - source::activity::{Activity, ActivityInsertForm, ActivityUpdateForm}, - traits::Crud, + source::activity::{ReceivedActivity, SentActivity, SentActivityForm}, utils::{get_conn, DbPool}, }; -use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl}; +use diesel::{ + dsl::insert_into, + result::{DatabaseErrorKind, Error, Error::DatabaseError}, + ExpressionMethods, + QueryDsl, +}; use diesel_async::RunQueryDsl; -#[async_trait] -impl Crud for Activity { - type InsertForm = ActivityInsertForm; - type UpdateForm = ActivityUpdateForm; - type IdType = i32; - async fn read(pool: &mut DbPool<'_>, activity_id: i32) -> Result { +impl SentActivity { + pub async fn create(pool: &mut DbPool<'_>, form: SentActivityForm) -> Result { + use crate::schema::sent_activity::dsl::sent_activity; let conn = &mut get_conn(pool).await?; - activity.find(activity_id).first::(conn).await - } - - async fn create(pool: &mut DbPool<'_>, new_activity: &Self::InsertForm) -> Result { - let conn = &mut get_conn(pool).await?; - insert_into(activity) - .values(new_activity) + insert_into(sent_activity) + .values(form) .get_result::(conn) .await } - async fn update( - pool: &mut DbPool<'_>, - activity_id: i32, - new_activity: &Self::UpdateForm, - ) -> Result { + pub async fn read_from_apub_id(pool: &mut DbPool<'_>, object_id: &DbUrl) -> Result { + use crate::schema::sent_activity::dsl::{ap_id, sent_activity}; let conn = &mut get_conn(pool).await?; - diesel::update(activity.find(activity_id)) - .set(new_activity) - .get_result::(conn) - .await - } - async fn delete(pool: &mut DbPool<'_>, activity_id: i32) -> Result { - let conn = &mut get_conn(pool).await?; - diesel::delete(activity.find(activity_id)) - .execute(conn) - .await - } -} - -impl Activity { - pub async fn read_from_apub_id( - pool: &mut DbPool<'_>, - object_id: &DbUrl, - ) -> Result { - let conn = &mut get_conn(pool).await?; - activity + sent_activity .filter(ap_id.eq(object_id)) .first::(conn) .await } } +impl ReceivedActivity { + pub async fn create(pool: &mut DbPool<'_>, ap_id_: &DbUrl) -> Result<(), Error> { + use crate::schema::received_activity::dsl::{ap_id, id, received_activity}; + let conn = &mut get_conn(pool).await?; + let res = insert_into(received_activity) + .values(ap_id.eq(ap_id_)) + .on_conflict_do_nothing() + .returning(id) + .get_result::(conn) + .await + .optional()?; + if res.is_some() { + // new activity inserted successfully + Ok(()) + } else { + // duplicate activity + Err(DatabaseError( + DatabaseErrorKind::UniqueViolation, + Box::::default(), + )) + } + } +} + #[cfg(test)] mod tests { use super::*; - use crate::{ - newtypes::DbUrl, - source::{ - activity::{Activity, ActivityInsertForm}, - instance::Instance, - person::{Person, PersonInsertForm}, - }, - utils::build_db_pool_for_tests, - }; - use serde_json::Value; + use crate::utils::build_db_pool_for_tests; + use serde_json::json; use serial_test::serial; use url::Url; #[tokio::test] #[serial] - async fn test_crud() { + async fn receive_activity_duplicate() { let pool = &build_db_pool_for_tests().await; let pool = &mut pool.into(); + let ap_id: DbUrl = Url::parse("http://example.com/activity/531") + .unwrap() + .into(); - let inserted_instance = Instance::read_or_create(pool, "my_domain.tld".to_string()) - .await - .unwrap(); + // inserting activity for first time + let res = ReceivedActivity::create(pool, &ap_id).await; + assert!(res.is_ok()); - let creator_form = PersonInsertForm::builder() - .name("activity_creator_ pm".into()) - .public_key("pubkey".to_string()) - .instance_id(inserted_instance.id) - .build(); + let res = ReceivedActivity::create(pool, &ap_id).await; + assert!(res.is_err()); + } - let inserted_creator = Person::create(pool, &creator_form).await.unwrap(); + #[tokio::test] + #[serial] + async fn sent_activity_write_read() { + let pool = &build_db_pool_for_tests().await; + let pool = &mut pool.into(); + let ap_id: DbUrl = Url::parse("http://example.com/activity/412") + .unwrap() + .into(); + let data = json!({ + "key1": "0xF9BA143B95FF6D82", + "key2": "42", + }); + let sensitive = false; - let ap_id_: DbUrl = Url::parse( - "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c", - ) - .unwrap() - .into(); - let test_json: Value = serde_json::from_str( - r#"{ - "@context": "https://www.w3.org/ns/activitystreams", - "id": "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c", - "type": "Delete", - "actor": "https://enterprise.lemmy.ml/u/riker", - "to": "https://www.w3.org/ns/activitystreams#Public", - "cc": [ - "https://enterprise.lemmy.ml/c/main/" - ], - "object": "https://enterprise.lemmy.ml/post/32" - }"#, - ) - .unwrap(); - let activity_form = ActivityInsertForm { - ap_id: ap_id_.clone(), - data: test_json.clone(), - local: Some(true), - sensitive: Some(false), - updated: None, + let form = SentActivityForm { + ap_id: ap_id.clone(), + data: data.clone(), + sensitive, }; - let inserted_activity = Activity::create(pool, &activity_form).await.unwrap(); + SentActivity::create(pool, form).await.unwrap(); - let expected_activity = Activity { - ap_id: ap_id_.clone(), - id: inserted_activity.id, - data: test_json, - local: true, - sensitive: false, - published: inserted_activity.published, - updated: None, - }; - - let read_activity = Activity::read(pool, inserted_activity.id).await.unwrap(); - let read_activity_by_apub_id = Activity::read_from_apub_id(pool, &ap_id_).await.unwrap(); - Person::delete(pool, inserted_creator.id).await.unwrap(); - Activity::delete(pool, inserted_activity.id).await.unwrap(); - - assert_eq!(expected_activity, read_activity); - assert_eq!(expected_activity, read_activity_by_apub_id); - assert_eq!(expected_activity, inserted_activity); + let res = SentActivity::read_from_apub_id(pool, &ap_id).await.unwrap(); + assert_eq!(res.ap_id, ap_id); + assert_eq!(res.data, data); + assert_eq!(res.sensitive, sensitive); } } diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index e503a8274..ae75c31d8 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -14,18 +14,6 @@ pub mod sql_types { pub struct SortTypeEnum; } -diesel::table! { - activity (id) { - id -> Int4, - data -> Jsonb, - local -> Bool, - published -> Timestamp, - updated -> Nullable, - ap_id -> Text, - sensitive -> Bool, - } -} - diesel::table! { admin_purge_comment (id) { id -> Int4, @@ -762,6 +750,14 @@ diesel::table! { } } +diesel::table! { + received_activity (id) { + id -> Int8, + ap_id -> Text, + published -> Timestamp, + } +} + diesel::table! { registration_application (id) { id -> Int4, @@ -780,6 +776,16 @@ diesel::table! { } } +diesel::table! { + sent_activity (id) { + id -> Int8, + ap_id -> Text, + data -> Json, + sensitive -> Bool, + published -> Timestamp, + } +} + diesel::table! { site (id) { id -> Int4, @@ -920,7 +926,6 @@ diesel::joinable!(site_language -> site (site_id)); diesel::joinable!(tagline -> local_site (local_site_id)); diesel::allow_tables_to_appear_in_same_query!( - activity, admin_purge_comment, admin_purge_community, admin_purge_person, @@ -977,8 +982,10 @@ diesel::allow_tables_to_appear_in_same_query!( post_saved, private_message, private_message_report, + received_activity, registration_application, secret, + sent_activity, site, site_aggregates, site_language, diff --git a/crates/db_schema/src/source/activity.rs b/crates/db_schema/src/source/activity.rs index c5c8dd359..85b193f51 100644 --- a/crates/db_schema/src/source/activity.rs +++ b/crates/db_schema/src/source/activity.rs @@ -1,34 +1,28 @@ -use crate::{newtypes::DbUrl, schema::activity}; +use crate::{newtypes::DbUrl, schema::sent_activity}; use serde_json::Value; use std::fmt::Debug; -#[derive(PartialEq, Eq, Debug, Queryable, Identifiable)] -#[diesel(table_name = activity)] -pub struct Activity { - pub id: i32, - pub data: Value, - pub local: bool, - pub published: chrono::NaiveDateTime, - pub updated: Option, +#[derive(PartialEq, Eq, Debug, Queryable)] +#[diesel(table_name = sent_activity)] +pub struct SentActivity { + pub id: i64, pub ap_id: DbUrl, + pub data: Value, + pub sensitive: bool, + pub published: chrono::NaiveDateTime, +} +#[derive(Insertable)] +#[diesel(table_name = sent_activity)] +pub struct SentActivityForm { + pub ap_id: DbUrl, + pub data: Value, pub sensitive: bool, } -#[derive(Insertable)] -#[diesel(table_name = activity)] -pub struct ActivityInsertForm { - pub data: Value, - pub local: Option, - pub updated: Option, +#[derive(PartialEq, Eq, Debug, Queryable)] +#[diesel(table_name = received_activity)] +pub struct ReceivedActivity { + pub id: i64, pub ap_id: DbUrl, - pub sensitive: Option, -} - -#[derive(AsChangeset)] -#[diesel(table_name = activity)] -pub struct ActivityUpdateForm { - pub data: Option, - pub local: Option, - pub updated: Option>, - pub sensitive: Option, + pub published: chrono::NaiveDateTime, } diff --git a/migrations/2023-07-11-084714_receive_activity_table/down.sql b/migrations/2023-07-11-084714_receive_activity_table/down.sql new file mode 100644 index 000000000..ea4f4d4a3 --- /dev/null +++ b/migrations/2023-07-11-084714_receive_activity_table/down.sql @@ -0,0 +1,21 @@ +create table activity ( + id serial primary key, + data jsonb not null, + local boolean not null default true, + published timestamp not null default now(), + updated timestamp, + ap_id text not null, + sensitive boolean not null default true +); + +insert into activity(ap_id, data, sensitive, published) + select ap_id, data, sensitive, published + from sent_activity + order by id desc + limit 100000; + +-- We cant copy received_activity entries back into activities table because we dont have data +-- which is mandatory. + +drop table sent_activity; +drop table received_activity; \ No newline at end of file diff --git a/migrations/2023-07-11-084714_receive_activity_table/up.sql b/migrations/2023-07-11-084714_receive_activity_table/up.sql new file mode 100644 index 000000000..c6b30b7b7 --- /dev/null +++ b/migrations/2023-07-11-084714_receive_activity_table/up.sql @@ -0,0 +1,35 @@ +-- outgoing activities, need to be stored to be later server over http +-- we change data column from jsonb to json for decreased size +-- https://stackoverflow.com/a/22910602 +create table sent_activity ( + id bigserial primary key, + ap_id text unique not null, + data json not null, + sensitive boolean not null, + published timestamp not null default now() +); + +-- incoming activities, we only need the id to avoid processing the same activity multiple times +create table received_activity ( + id bigserial primary key, + ap_id text unique not null, + published timestamp not null default now() +); + +-- copy sent activities to new table. only copy last 100k for faster migration +insert into sent_activity(ap_id, data, sensitive, published) + select ap_id, data, sensitive, published + from activity + where local = true + order by id desc + limit 100000; + +-- copy received activities to new table. only last 1m for faster migration +insert into received_activity(ap_id, published) + select ap_id, published + from activity + where local = false + order by id desc + limit 1000000; + +drop table activity; diff --git a/src/scheduled_tasks.rs b/src/scheduled_tasks.rs index f20e61e12..ad97d1934 100644 --- a/src/scheduled_tasks.rs +++ b/src/scheduled_tasks.rs @@ -13,7 +13,16 @@ use diesel::{ use diesel::{sql_query, PgConnection, RunQueryDsl}; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ - schema::{activity, captcha_answer, comment, community_person_ban, instance, person, post}, + schema::{ + captcha_answer, + comment, + community_person_ban, + instance, + person, + post, + received_activity, + sent_activity, + }, source::instance::{Instance, InstanceForm}, utils::{naive_now, DELETED_REPLACEMENT_TEXT}, }; @@ -211,16 +220,17 @@ fn delete_expired_captcha_answers(conn: &mut PgConnection) { /// Clear old activities (this table gets very large) fn clear_old_activities(conn: &mut PgConnection) { info!("Clearing old activities..."); - match diesel::delete(activity::table.filter(activity::published.lt(now - 6.months()))) + diesel::delete(sent_activity::table.filter(sent_activity::published.lt(now - 3.months()))) .execute(conn) - { - Ok(_) => { - info!("Done."); - } - Err(e) => { - error!("Failed to clear old activities: {}", e) - } - } + .map_err(|e| error!("Failed to clear old sent activities: {}", e)) + .ok(); + + diesel::delete( + received_activity::table.filter(received_activity::published.lt(now - 3.months())), + ) + .execute(conn) + .map_err(|e| error!("Failed to clear old received activities: {}", e)) + .ok(); } /// overwrite posts and comments 30d after deletion