From c09c462a6e224ef678ff4842d085610f50c5ad03 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 27 Jan 2021 15:02:28 +0100 Subject: [PATCH] Serve activities in community outbox (fixes #1216) --- Cargo.lock | 12 ++++++ crates/apub/src/fetcher/community.rs | 48 ++++++++++++------------ crates/apub/src/http/community.rs | 25 ++++++------ crates/apub/src/inbox/user_inbox.rs | 2 +- crates/db_queries/Cargo.toml | 1 + crates/db_queries/src/source/activity.rs | 31 ++++++++++++++- 6 files changed, 79 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e43b9219c..f15cf8e88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1023,6 +1023,17 @@ dependencies = [ "syn", ] +[[package]] +name = "diesel_json" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2812f0f63b6d3508fb7bfdb872c2dc2321ba938f5e0f4cb9751ec899e8b297c9" +dependencies = [ + "diesel", + "serde 1.0.118", + "serde_json", +] + [[package]] name = "diesel_migrations" version = "1.4.0" @@ -1794,6 +1805,7 @@ dependencies = [ "bcrypt", "chrono", "diesel", + "diesel_json", "diesel_migrations", "lazy_static", "lemmy_db_schema", diff --git a/crates/apub/src/fetcher/community.rs b/crates/apub/src/fetcher/community.rs index 0249dee1e..7547e0dbd 100644 --- a/crates/apub/src/fetcher/community.rs +++ b/crates/apub/src/fetcher/community.rs @@ -1,28 +1,23 @@ use crate::{ - check_is_apub_id_valid, fetcher::{ fetch::fetch_remote_object, get_or_fetch_and_upsert_user, is_deleted, should_refetch_actor, }, + inbox::user_inbox::receive_announce, objects::FromApub, ActorType, GroupExt, - PageExt, }; use activitystreams::{ - base::{BaseExt, ExtendsExt}, collection::{CollectionExt, OrderedCollection}, object::ObjectExt, }; use anyhow::Context; use diesel::result::Error::NotFound; use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable}; -use lemmy_db_schema::source::{ - community::{Community, CommunityModerator, CommunityModeratorForm}, - post::Post, -}; +use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm}; use lemmy_structs::blocking; use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::LemmyContext; @@ -119,29 +114,34 @@ async fn fetch_remote_community( .await??; } - // fetch outbox (maybe make this conditional) + // only fetch outbox for new communities, otherwise this can create an infinite loop + if old_community.is_none() { + fetch_community_outbox(context, &community, recursion_counter).await? + } + + Ok(community) +} + +async fn fetch_community_outbox( + context: &LemmyContext, + community: &Community, + recursion_counter: &mut i32, +) -> Result<(), LemmyError> { let outbox = fetch_remote_object::( context.client(), &community.get_outbox_url()?, recursion_counter, ) .await?; - let outbox_items = outbox.items().context(location_info!())?.clone(); - let mut outbox_items = outbox_items.many().context(location_info!())?; - if outbox_items.len() > 20 { - outbox_items = outbox_items[0..20].to_vec(); - } - for o in outbox_items { - let page = PageExt::from_any_base(o)?.context(location_info!())?; - let page_id = page.id_unchecked().context(location_info!())?; - - // The post creator may be from a blocked instance, if it errors, then skip it - if check_is_apub_id_valid(page_id).is_err() { - continue; - } - Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?; - // TODO: we need to send a websocket update here + let outbox_activities = outbox.items().context(location_info!())?.clone(); + let mut outbox_activities = outbox_activities.many().context(location_info!())?; + if outbox_activities.len() > 20 { + outbox_activities = outbox_activities[0..20].to_vec(); } - Ok(community) + for activity in outbox_activities { + receive_announce(context, activity, community, recursion_counter).await?; + } + + Ok(()) } diff --git a/crates/apub/src/http/community.rs b/crates/apub/src/http/community.rs index a0ec7518b..8d6549ad6 100644 --- a/crates/apub/src/http/community.rs +++ b/crates/apub/src/http/community.rs @@ -5,12 +5,12 @@ use crate::{ ActorType, }; use activitystreams::{ - base::{AnyBase, BaseExt, ExtendsExt}, + base::{AnyBase, BaseExt}, collection::{CollectionExt, OrderedCollection, UnorderedCollection}, }; use actix_web::{body::Body, web, HttpResponse}; -use lemmy_db_queries::source::{community::Community_, post::Post_}; -use lemmy_db_schema::source::{community::Community, post::Post}; +use lemmy_db_queries::source::{activity::Activity_, community::Community_}; +use lemmy_db_schema::source::{activity::Activity, community::Community}; use lemmy_db_views_actor::community_follower_view::CommunityFollowerView; use lemmy_structs::blocking; use lemmy_utils::LemmyError; @@ -76,21 +76,20 @@ pub async fn get_apub_community_outbox( }) .await??; - let community_id = community.id; - let posts = blocking(context.pool(), move |conn| { - Post::list_for_community(conn, community_id) + let community_actor_id = community.actor_id.to_owned(); + let activities = blocking(context.pool(), move |conn| { + Activity::read_community_outbox(conn, &community_actor_id) }) .await??; - let mut pages: Vec = vec![]; - for p in posts { - pages.push(p.to_apub(context.pool()).await?.into_any_base()?); - } - - let len = pages.len(); + let activities = activities + .iter() + .map(AnyBase::from_arbitrary_json) + .collect::, serde_json::Error>>()?; + let len = activities.len(); let mut collection = OrderedCollection::new(); collection - .set_many_items(pages) + .set_many_items(activities) .set_many_contexts(lemmy_context()?) .set_id(community.get_outbox_url()?) .set_total_items(len as u64); diff --git a/crates/apub/src/inbox/user_inbox.rs b/crates/apub/src/inbox/user_inbox.rs index 6496a60a0..7b90fafad 100644 --- a/crates/apub/src/inbox/user_inbox.rs +++ b/crates/apub/src/inbox/user_inbox.rs @@ -236,7 +236,7 @@ async fn receive_accept( } /// Takes an announce and passes the inner activity to the appropriate handler. -async fn receive_announce( +pub async fn receive_announce( context: &LemmyContext, activity: AnyBase, actor: &dyn ActorType, diff --git a/crates/db_queries/Cargo.toml b/crates/db_queries/Cargo.toml index 5385854cb..42e159fba 100644 --- a/crates/db_queries/Cargo.toml +++ b/crates/db_queries/Cargo.toml @@ -23,3 +23,4 @@ url = { version = "2.2.0", features = ["serde"] } lazy_static = "1.4.0" regex = "1.4.2" bcrypt = "0.9.0" +diesel_json = "0.1.1" diff --git a/crates/db_queries/src/source/activity.rs b/crates/db_queries/src/source/activity.rs index 662db3aed..964e50424 100644 --- a/crates/db_queries/src/source/activity.rs +++ b/crates/db_queries/src/source/activity.rs @@ -1,8 +1,9 @@ use crate::Crud; -use diesel::{dsl::*, result::Error, *}; -use lemmy_db_schema::source::activity::*; +use diesel::{dsl::*, result::Error, sql_types::Text, *}; +use lemmy_db_schema::{source::activity::*, Url}; use log::debug; use serde::Serialize; +use serde_json::Value; use std::{ fmt::Debug, io::{Error as IoError, ErrorKind}, @@ -47,7 +48,14 @@ pub trait Activity_ { ) -> Result where T: Serialize + Debug; + fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result; + + /// Returns up to 20 activities of type `Announce/Create/Page` from the community + fn read_community_outbox( + conn: &PgConnection, + community_actor_id: &Url, + ) -> Result, Error>; } impl Activity_ for Activity { @@ -83,6 +91,25 @@ impl Activity_ for Activity { use lemmy_db_schema::schema::activity::dsl::*; activity.filter(ap_id.eq(object_id)).first::(conn) } + + fn read_community_outbox( + conn: &PgConnection, + community_actor_id: &Url, + ) -> Result, Error> { + use lemmy_db_schema::schema::activity::dsl::*; + let res: Vec = activity + .select(data) + .filter( + sql("activity.data ->> 'type' = 'Announce'") + .sql(" AND activity.data -> 'object' ->> 'type' = 'Create'") + .sql(" AND activity.data -> 'object' -> 'object' ->> 'type' = 'Page'") + .sql(" AND activity.data ->> 'actor' = ") + .bind::(community_actor_id), + ) + .limit(20) + .get_results(conn)?; + Ok(res) + } } #[cfg(test)]