From a75b6cb5c9571323acfa018be50bb99fffd07714 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Mon, 25 Oct 2021 16:15:03 +0200 Subject: [PATCH] Rewrite community outbox to use new fetcher --- Cargo.lock | 1 + crates/apub/Cargo.toml | 1 + .../src/activities/post/create_or_update.rs | 40 +++--- .../apub/src/collections/community_outbox.rs | 132 ++++++++++++++++++ crates/apub/src/collections/mod.rs | 1 + crates/apub/src/fetcher/community.rs | 52 +------ crates/apub/src/fetcher/mod.rs | 2 +- crates/apub/src/fetcher/object_id.rs | 69 +++++---- crates/apub/src/http/community.rs | 41 +++--- crates/apub/src/lib.rs | 4 + crates/apub/src/objects/comment.rs | 2 + crates/apub/src/objects/community.rs | 23 +-- crates/db_schema/src/impls/community.rs | 10 -- 13 files changed, 243 insertions(+), 135 deletions(-) create mode 100644 crates/apub/src/collections/community_outbox.rs create mode 100644 crates/apub/src/collections/mod.rs diff --git a/Cargo.lock b/Cargo.lock index bcb4585dc..9931aef13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1833,6 +1833,7 @@ dependencies = [ "http", "http-signature-normalization-actix", "itertools", + "lazy_static", "lemmy_api_common", "lemmy_apub_lib", "lemmy_db_schema", diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index a3a597928..4b54c87d8 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -50,6 +50,7 @@ thiserror = "1.0.29" background-jobs = "0.9.0" reqwest = { version = "0.11.4", features = ["json"] } html2md = "0.2.13" +lazy_static = "1.4.0" [dev-dependencies] serial_test = "0.5.1" diff --git a/crates/apub/src/activities/post/create_or_update.rs b/crates/apub/src/activities/post/create_or_update.rs index 1187e770f..4a048a965 100644 --- a/crates/apub/src/activities/post/create_or_update.rs +++ b/crates/apub/src/activities/post/create_or_update.rs @@ -48,6 +48,28 @@ pub struct CreateOrUpdatePost { } impl CreateOrUpdatePost { + pub(crate) async fn new( + post: &ApubPost, + actor: &ApubPerson, + community: &ApubCommunity, + kind: CreateOrUpdateType, + context: &LemmyContext, + ) -> Result { + let id = generate_activity_id( + kind.clone(), + &context.settings().get_protocol_and_hostname(), + )?; + Ok(CreateOrUpdatePost { + actor: ObjectId::new(actor.actor_id()), + to: [PublicUrl::Public], + object: post.to_apub(context).await?, + cc: [ObjectId::new(community.actor_id())], + kind, + id: id.clone(), + context: lemmy_context(), + unparsed: Default::default(), + }) + } pub async fn send( post: &ApubPost, actor: &ApubPerson, @@ -60,22 +82,8 @@ impl CreateOrUpdatePost { }) .await?? .into(); - - let id = generate_activity_id( - kind.clone(), - &context.settings().get_protocol_and_hostname(), - )?; - let create_or_update = CreateOrUpdatePost { - actor: ObjectId::new(actor.actor_id()), - to: [PublicUrl::Public], - object: post.to_apub(context).await?, - cc: [ObjectId::new(community.actor_id())], - kind, - id: id.clone(), - context: lemmy_context(), - unparsed: Default::default(), - }; - + let create_or_update = CreateOrUpdatePost::new(post, actor, &community, kind, context).await?; + let id = create_or_update.id.clone(); let activity = AnnouncableActivities::CreateOrUpdatePost(Box::new(create_or_update)); send_to_community(activity, &id, actor, &community, vec![], context).await } diff --git a/crates/apub/src/collections/community_outbox.rs b/crates/apub/src/collections/community_outbox.rs new file mode 100644 index 000000000..6f5eb1b31 --- /dev/null +++ b/crates/apub/src/collections/community_outbox.rs @@ -0,0 +1,132 @@ +use crate::{ + activities::{post::create_or_update::CreateOrUpdatePost, CreateOrUpdateType}, + context::lemmy_context, + generate_outbox_url, + objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost}, +}; +use activitystreams::{ + base::AnyBase, + chrono::NaiveDateTime, + collection::kind::OrderedCollectionType, + object::Tombstone, + primitives::OneOrMany, + url::Url, +}; +use lemmy_api_common::blocking; +use lemmy_apub_lib::{ + data::Data, + traits::{ActivityHandler, ApubObject}, + verify::verify_domains_match, +}; +use lemmy_db_schema::{ + source::{person::Person, post::Post}, + traits::Crud, +}; +use lemmy_utils::LemmyError; +use lemmy_websocket::LemmyContext; +use serde::{Deserialize, Serialize}; +use serde_with::skip_serializing_none; + +#[skip_serializing_none] +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CommunityOutbox { + #[serde(rename = "@context")] + context: OneOrMany, + r#type: OrderedCollectionType, + id: Url, + ordered_items: Vec, +} + +#[derive(Clone, Debug)] +pub(crate) struct ApubCommunityOutbox(Vec); + +/// Put community in the data, so we dont have to read it again from the database. +pub(crate) struct OutboxData(pub ApubCommunity, pub LemmyContext); + +#[async_trait::async_trait(?Send)] +impl ApubObject for ApubCommunityOutbox { + type DataType = OutboxData; + type TombstoneType = Tombstone; + type ApubType = CommunityOutbox; + + fn last_refreshed_at(&self) -> Option { + None + } + + async fn read_from_apub_id( + _object_id: Url, + data: &Self::DataType, + ) -> Result, LemmyError> { + // Only read from database if its a local community, otherwise fetch over http + if data.0.local { + let community_id = data.0.id; + let post_list: Vec = blocking(data.1.pool(), move |conn| { + Post::list_for_community(conn, community_id) + }) + .await?? + .into_iter() + .map(Into::into) + .collect(); + Ok(Some(ApubCommunityOutbox(post_list))) + } else { + Ok(None) + } + } + + async fn delete(self, _data: &Self::DataType) -> Result<(), LemmyError> { + // do nothing (it gets deleted automatically with the community) + Ok(()) + } + + async fn to_apub(&self, data: &Self::DataType) -> Result { + let mut ordered_items = vec![]; + for post in &self.0 { + let actor = post.creator_id; + let actor: ApubPerson = blocking(data.1.pool(), move |conn| Person::read(conn, actor)) + .await?? + .into(); + let a = + CreateOrUpdatePost::new(post, &actor, &data.0, CreateOrUpdateType::Create, &data.1).await?; + ordered_items.push(a); + } + + Ok(CommunityOutbox { + context: lemmy_context(), + r#type: OrderedCollectionType::OrderedCollection, + id: generate_outbox_url(&data.0.actor_id)?.into(), + ordered_items, + }) + } + + fn to_tombstone(&self) -> Result { + // no tombstone for this, there is only a tombstone for the community + unimplemented!() + } + + async fn from_apub( + apub: &Self::ApubType, + data: &Self::DataType, + expected_domain: &Url, + request_counter: &mut i32, + ) -> Result { + verify_domains_match(expected_domain, &apub.id)?; + let mut outbox_activities = apub.ordered_items.clone(); + if outbox_activities.len() > 20 { + outbox_activities = outbox_activities[0..20].to_vec(); + } + + // We intentionally ignore errors here. This is because the outbox might contain posts from old + // Lemmy versions, or from other software which we cant parse. In that case, we simply skip the + // item and only parse the ones that work. + for activity in outbox_activities { + activity + .receive(&Data::new(data.1.clone()), request_counter) + .await + .ok(); + } + + // This return value is unused, so just set an empty vec + Ok(ApubCommunityOutbox { 0: vec![] }) + } +} diff --git a/crates/apub/src/collections/mod.rs b/crates/apub/src/collections/mod.rs new file mode 100644 index 000000000..43a48af33 --- /dev/null +++ b/crates/apub/src/collections/mod.rs @@ -0,0 +1 @@ +pub(crate) mod community_outbox; diff --git a/crates/apub/src/fetcher/community.rs b/crates/apub/src/fetcher/community.rs index 6044a55b6..1c153b5fa 100644 --- a/crates/apub/src/fetcher/community.rs +++ b/crates/apub/src/fetcher/community.rs @@ -1,15 +1,10 @@ use crate::{ - activities::community::announce::AnnounceActivity, fetcher::{fetch::fetch_remote_object, object_id::ObjectId}, objects::{community::Group, person::ApubPerson}, }; -use activitystreams::{ - base::AnyBase, - collection::{CollectionExt, OrderedCollection}, -}; +use activitystreams::collection::{CollectionExt, OrderedCollection}; use anyhow::Context; use lemmy_api_common::blocking; -use lemmy_apub_lib::{data::Data, traits::ActivityHandler}; use lemmy_db_schema::{ source::community::{Community, CommunityModerator, CommunityModeratorForm}, traits::Joinable, @@ -70,51 +65,6 @@ pub(crate) async fn update_community_mods( Ok(()) } -pub(crate) async fn fetch_community_outbox( - context: &LemmyContext, - outbox: &Url, - recursion_counter: &mut i32, -) -> Result<(), LemmyError> { - let outbox = fetch_remote_object::( - context.client(), - &context.settings(), - outbox, - recursion_counter, - ) - .await?; - let outbox_activities = outbox.items().context(location_info!())?.clone(); - let mut outbox_activities = outbox_activities.many().context(location_info!())?; - if outbox_activities.len() > 20 { - outbox_activities = outbox_activities[0..20].to_vec(); - } - - // We intentionally ignore errors here. This is because the outbox might contain posts from old - // Lemmy versions, or from other software which we cant parse. In that case, we simply skip the - // item and only parse the ones that work. - for activity in outbox_activities { - parse_outbox_item(activity, context, recursion_counter) - .await - .ok(); - } - - Ok(()) -} - -async fn parse_outbox_item( - announce: AnyBase, - context: &LemmyContext, - request_counter: &mut i32, -) -> Result<(), LemmyError> { - // TODO: instead of converting like this, we should create a struct CommunityOutbox with - // AnnounceActivity as inner type, but that gives me stackoverflow - let ser = serde_json::to_string(&announce)?; - let announce: AnnounceActivity = serde_json::from_str(&ser)?; - announce - .receive(&Data::new(context.clone()), request_counter) - .await?; - Ok(()) -} - async fn fetch_community_mods( context: &LemmyContext, group: &Group, diff --git a/crates/apub/src/fetcher/mod.rs b/crates/apub/src/fetcher/mod.rs index fd6b982c3..28488ecb8 100644 --- a/crates/apub/src/fetcher/mod.rs +++ b/crates/apub/src/fetcher/mod.rs @@ -47,7 +47,7 @@ pub(crate) async fn get_or_fetch_and_upsert_actor( /// /// TODO it won't pick up new avatars, summaries etc until a day after. /// Actors need an "update" activity pushed to other servers to fix this. -fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool { +fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool { let update_interval = if cfg!(debug_assertions) { // avoid infinite loop when fetching community outbox chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG) diff --git a/crates/apub/src/fetcher/object_id.rs b/crates/apub/src/fetcher/object_id.rs index fb9107c82..f29bbf6c5 100644 --- a/crates/apub/src/fetcher/object_id.rs +++ b/crates/apub/src/fetcher/object_id.rs @@ -1,11 +1,14 @@ -use crate::fetcher::should_refetch_actor; +use crate::fetcher::should_refetch_object; use anyhow::anyhow; use diesel::NotFound; use lemmy_apub_lib::{traits::ApubObject, APUB_JSON_CONTENT_TYPE}; use lemmy_db_schema::newtypes::DbUrl; -use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError}; -use lemmy_websocket::LemmyContext; -use reqwest::StatusCode; +use lemmy_utils::{ + request::{build_user_agent, retry}, + settings::structs::Settings, + LemmyError, +}; +use reqwest::{Client, StatusCode}; use serde::{Deserialize, Serialize}; use std::{ fmt::{Debug, Display, Formatter}, @@ -18,16 +21,24 @@ use url::Url; /// fetch through the search). This should be configurable. static REQUEST_LIMIT: i32 = 25; +// TODO: after moving this file to library, remove lazy_static dependency from apub crate +lazy_static! { + static ref CLIENT: Client = Client::builder() + .user_agent(build_user_agent(&Settings::get())) + .build() + .unwrap(); +} + #[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] #[serde(transparent)] pub struct ObjectId(Url, #[serde(skip)] PhantomData) where - Kind: ApubObject + Send + 'static, + Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>; impl ObjectId where - Kind: ApubObject + Send + 'static, + Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, { pub fn new(url: T) -> Self @@ -44,10 +55,10 @@ where /// Fetches an activitypub object, either from local database (if possible), or over http. pub async fn dereference( &self, - context: &LemmyContext, + data: &::DataType, request_counter: &mut i32, ) -> Result { - let db_object = self.dereference_from_db(context).await?; + let db_object = self.dereference_from_db(data).await?; // if its a local object, only fetch it from the database and not over http if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) { @@ -57,39 +68,48 @@ where }; } + // object found in database if let Some(object) = db_object { + // object is old and should be refetched if let Some(last_refreshed_at) = object.last_refreshed_at() { - // TODO: rename to should_refetch_object() - if should_refetch_actor(last_refreshed_at) { + if should_refetch_object(last_refreshed_at) { return self - .dereference_from_http(context, request_counter, Some(object)) + .dereference_from_http(data, request_counter, Some(object)) .await; } } Ok(object) - } else { + } + // object not found, need to fetch over http + else { self - .dereference_from_http(context, request_counter, None) + .dereference_from_http(data, request_counter, None) .await } } /// Fetch an object from the local db. Instead of falling back to http, this throws an error if /// the object is not found in the database. - pub async fn dereference_local(&self, context: &LemmyContext) -> Result { - let object = self.dereference_from_db(context).await?; + pub async fn dereference_local( + &self, + data: &::DataType, + ) -> Result { + let object = self.dereference_from_db(data).await?; object.ok_or_else(|| anyhow!("object not found in database {}", self).into()) } /// returning none means the object was not found in local db - async fn dereference_from_db(&self, context: &LemmyContext) -> Result, LemmyError> { + async fn dereference_from_db( + &self, + data: &::DataType, + ) -> Result, LemmyError> { let id = self.0.clone(); - ApubObject::read_from_apub_id(id, context).await + ApubObject::read_from_apub_id(id, data).await } async fn dereference_from_http( &self, - context: &LemmyContext, + data: &::DataType, request_counter: &mut i32, db_object: Option, ) -> Result { @@ -102,8 +122,7 @@ where } let res = retry(|| { - context - .client() + CLIENT .get(self.0.as_str()) .header("Accept", APUB_JSON_CONTENT_TYPE) .timeout(Duration::from_secs(60)) @@ -113,20 +132,20 @@ where if res.status() == StatusCode::GONE { if let Some(db_object) = db_object { - db_object.delete(context).await?; + db_object.delete(data).await?; } return Err(anyhow!("Fetched remote object {} which was deleted", self).into()); } let res2: Kind::ApubType = res.json().await?; - Ok(Kind::from_apub(&res2, context, self.inner(), request_counter).await?) + Ok(Kind::from_apub(&res2, data, self.inner(), request_counter).await?) } } impl Display for ObjectId where - Kind: ApubObject + Send + 'static, + Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -136,7 +155,7 @@ where impl From> for Url where - Kind: ApubObject + Send + 'static, + Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn from(id: ObjectId) -> Self { @@ -146,7 +165,7 @@ where impl From> for DbUrl where - Kind: ApubObject + Send + 'static, + Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn from(id: ObjectId) -> Self { diff --git a/crates/apub/src/http/community.rs b/crates/apub/src/http/community.rs index 1b398a56b..14870c888 100644 --- a/crates/apub/src/http/community.rs +++ b/crates/apub/src/http/community.rs @@ -5,24 +5,30 @@ use crate::{ following::{follow::FollowCommunity, undo::UndoFollowCommunity}, report::Report, }, + collections::community_outbox::{ApubCommunityOutbox, OutboxData}, context::lemmy_context, - generate_moderators_url, generate_outbox_url, + fetcher::object_id::ObjectId, + generate_moderators_url, http::{ - create_apub_response, create_apub_tombstone_response, payload_to_string, receive_activity, + create_apub_response, + create_apub_tombstone_response, + payload_to_string, + receive_activity, }, objects::community::ApubCommunity, }; use activitystreams::{ - base::{AnyBase, BaseExt}, + base::BaseExt, collection::{CollectionExt, OrderedCollection, UnorderedCollection}, url::Url, }; use actix_web::{body::Body, web, web::Payload, HttpRequest, HttpResponse}; use lemmy_api_common::blocking; use lemmy_apub_lib::traits::{ActivityFields, ActivityHandler, ApubObject}; -use lemmy_db_schema::source::{activity::Activity, community::Community}; +use lemmy_db_schema::source::community::Community; use lemmy_db_views_actor::{ - community_follower_view::CommunityFollowerView, community_moderator_view::CommunityModeratorView, + community_follower_view::CommunityFollowerView, + community_moderator_view::CommunityModeratorView, }; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; @@ -122,31 +128,18 @@ pub(crate) async fn get_apub_community_followers( /// activites like votes or comments). pub(crate) async fn get_apub_community_outbox( info: web::Path, + req: HttpRequest, context: web::Data, ) -> Result, LemmyError> { let community = blocking(context.pool(), move |conn| { Community::read_from_name(conn, &info.community_name) }) .await??; - - let community_actor_id = community.actor_id.to_owned(); - let activities = blocking(context.pool(), move |conn| { - Activity::read_community_outbox(conn, &community_actor_id) - }) - .await??; - - let activities = activities - .iter() - .map(AnyBase::from_arbitrary_json) - .collect::, serde_json::Error>>()?; - let len = activities.len(); - let mut collection = OrderedCollection::new(); - collection - .set_many_items(activities) - .set_many_contexts(lemmy_context()) - .set_id(generate_outbox_url(&community.actor_id)?.into()) - .set_total_items(len as u64); - Ok(create_apub_response(&collection)) + let outbox_data = OutboxData(community.into(), context.get_ref().clone()); + let url = Url::parse(&req.head().uri.to_string())?; + let id = ObjectId::::new(url); + let outbox = id.dereference(&outbox_data, &mut 0).await?; + Ok(create_apub_response(&outbox.to_apub(&outbox_data).await?)) } pub(crate) async fn get_apub_community_inbox( diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index cc9d5f487..675798b6f 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -1,10 +1,14 @@ pub mod activities; +pub(crate) mod collections; mod context; pub mod fetcher; pub mod http; pub mod migrations; pub mod objects; +#[macro_use] +extern crate lazy_static; + use crate::fetcher::post_or_comment::PostOrComment; use anyhow::{anyhow, Context}; use lemmy_api_common::blocking; diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index 9ac6797b6..7dd04dd2c 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -313,6 +313,8 @@ mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_comment() { + // TODO: changed ObjectId::dereference() so that it always fetches if + // last_refreshed_at() == None. But post doesnt store that and expects to never be refetched let context = init_context(); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let data = prepare_comment_test(&url, &context).await; diff --git a/crates/apub/src/objects/community.rs b/crates/apub/src/objects/community.rs index 3c0fa38c6..ee1f6e80c 100644 --- a/crates/apub/src/objects/community.rs +++ b/crates/apub/src/objects/community.rs @@ -1,7 +1,8 @@ use crate::{ check_is_apub_id_valid, + collections::community_outbox::{ApubCommunityOutbox, OutboxData}, context::lemmy_context, - fetcher::community::{fetch_community_outbox, update_community_mods}, + fetcher::{community::update_community_mods, object_id::ObjectId}, generate_moderators_url, generate_outbox_url, objects::{create_tombstone, get_summary_from_string_or_source, ImageObject, Source}, @@ -65,7 +66,7 @@ pub struct Group { // lemmy extension pub(crate) moderators: Option, inbox: Url, - pub(crate) outbox: Url, + pub(crate) outbox: ObjectId, followers: Url, endpoints: Endpoints, public_key: PublicKey, @@ -193,7 +194,7 @@ impl ApubObject for ApubCommunity { sensitive: Some(self.nsfw), moderators: Some(generate_moderators_url(&self.actor_id)?.into()), inbox: self.inbox_url.clone().into(), - outbox: generate_outbox_url(&self.actor_id)?.into(), + outbox: ObjectId::new(generate_outbox_url(&self.actor_id)?), followers: self.followers_url.clone().into(), endpoints: Endpoints { shared_inbox: self.shared_inbox_url.clone().map(|s| s.into()), @@ -227,19 +228,24 @@ impl ApubObject for ApubCommunity { // Fetching mods and outbox is not necessary for Lemmy to work, so ignore errors. Besides, // we need to ignore these errors so that tests can work entirely offline. - let community = blocking(context.pool(), move |conn| Community::upsert(conn, &form)).await??; + let community: ApubCommunity = + blocking(context.pool(), move |conn| Community::upsert(conn, &form)) + .await?? + .into(); update_community_mods(group, &community, context, request_counter) .await .map_err(|e| debug!("{}", e)) .ok(); - // TODO: doing this unconditionally might cause infinite loop for some reason - fetch_community_outbox(context, &group.outbox, request_counter) + let outbox_data = OutboxData(community.clone(), context.clone()); + group + .outbox + .dereference(&outbox_data, request_counter) .await .map_err(|e| debug!("{}", e)) .ok(); - Ok(community.into()) + Ok(community) } } @@ -318,7 +324,8 @@ mod tests { // change these links so they dont fetch over the network json.moderators = Some(Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_moderators").unwrap()); - json.outbox = Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_outbox").unwrap(); + json.outbox = + ObjectId::new(Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_outbox").unwrap()); let url = Url::parse("https://enterprise.lemmy.ml/c/tenforward").unwrap(); let mut request_counter = 0; diff --git a/crates/db_schema/src/impls/community.rs b/crates/db_schema/src/impls/community.rs index b2ebb3a0a..1fac4a27d 100644 --- a/crates/db_schema/src/impls/community.rs +++ b/crates/db_schema/src/impls/community.rs @@ -126,16 +126,6 @@ impl Community { community.select(actor_id).distinct().load::(conn) } - pub fn read_from_followers_url( - conn: &PgConnection, - followers_url_: &DbUrl, - ) -> Result { - use crate::schema::community::dsl::*; - community - .filter(followers_url.eq(followers_url_)) - .first::(conn) - } - pub fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result { use crate::schema::community::dsl::*; insert_into(community)