Use new fetcher implementation for post/comment

rewrite-fetcher
Felix Ableitner 2021-09-22 11:56:07 +02:00
parent 1e7f3a4b23
commit 0ed466dfdd
12 changed files with 208 additions and 145 deletions

View File

@ -13,7 +13,7 @@ use lemmy_apub::{
undo_vote::UndoVote,
vote::{Vote, VoteType},
},
PostOrComment,
fetcher::post_or_comment::PostOrComment,
};
use lemmy_db_queries::{source::comment::Comment_, Likeable, Saveable};
use lemmy_db_schema::{source::comment::*, LocalUserId};

View File

@ -19,7 +19,7 @@ use lemmy_apub::{
},
CreateOrUpdateType,
},
PostOrComment,
fetcher::post_or_comment::PostOrComment,
};
use lemmy_db_queries::{source::post::Post_, Crud, Likeable, Saveable};
use lemmy_db_schema::source::{moderator::*, post::*};

View File

@ -15,9 +15,9 @@ use lemmy_apub::{
voting::vote::{Vote, VoteType},
CreateOrUpdateType,
},
fetcher::post_or_comment::PostOrComment,
generate_apub_endpoint,
EndpointType,
PostOrComment,
};
use lemmy_db_queries::{source::comment::Comment_, Crud, Likeable};
use lemmy_db_schema::source::comment::*;

View File

@ -13,9 +13,9 @@ use lemmy_apub::{
voting::vote::{Vote, VoteType},
CreateOrUpdateType,
},
fetcher::post_or_comment::PostOrComment,
generate_apub_endpoint,
EndpointType,
PostOrComment,
};
use lemmy_db_queries::{source::post::Post_, Crud, Likeable};
use lemmy_db_schema::source::post::*;

View File

@ -12,10 +12,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::{
objects::get_or_fetch_and_insert_post_or_comment,
person::get_or_fetch_and_upsert_person,
},
fetcher::{new_fetcher::dereference, person::get_or_fetch_and_upsert_person},
ActorType,
PostOrComment,
};
@ -105,8 +102,7 @@ impl ActivityHandler for UndoVote {
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?;
let object =
get_or_fetch_and_insert_post_or_comment(&self.object.object, context, request_counter)
.await?;
dereference::<PostOrComment>(&self.object.object, context, request_counter).await?;
match object {
PostOrComment::Post(p) => undo_vote_post(actor, p.deref(), context).await,
PostOrComment::Comment(c) => undo_vote_comment(actor, c.deref(), context).await,

View File

@ -8,10 +8,7 @@ use crate::{
},
activity_queue::send_to_community_new,
extensions::context::lemmy_context,
fetcher::{
objects::get_or_fetch_and_insert_post_or_comment,
person::get_or_fetch_and_upsert_person,
},
fetcher::{new_fetcher::dereference, person::get_or_fetch_and_upsert_person},
ActorType,
PostOrComment,
};
@ -130,8 +127,7 @@ impl ActivityHandler for Vote {
request_counter: &mut i32,
) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?;
let object =
get_or_fetch_and_insert_post_or_comment(&self.object, context, request_counter).await?;
let object = dereference::<PostOrComment>(&self.object, context, request_counter).await?;
match object {
PostOrComment::Post(p) => vote_post(&self.kind, actor, p.deref(), context).await,
PostOrComment::Comment(c) => vote_comment(&self.kind, actor, c.deref(), context).await,

View File

@ -1,7 +1,8 @@
pub mod community;
mod fetch;
pub mod objects;
pub mod new_fetcher;
pub mod person;
pub mod post_or_comment;
pub mod search;
use crate::{

View File

@ -0,0 +1,96 @@
use crate::{objects::FromApub, APUB_JSON_CONTENT_TYPE};
use anyhow::anyhow;
use diesel::NotFound;
use lemmy_api_common::blocking;
use lemmy_db_queries::{ApubObject, DbPool};
use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
use reqwest::StatusCode;
use std::time::Duration;
use url::Url;
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
/// fetch through the search). This should be configurable.
static REQUEST_LIMIT: i32 = 25;
/// Fetches an activitypub object, either from local database (if possible), or over http.
pub(crate) async fn dereference<Kind>(
id: &Url,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<Kind, LemmyError>
where
Kind: FromApub + ApubObject + Send + 'static,
for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
{
let local_object = dereference_locally(id.clone(), context.pool()).await?;
if let Some(object) = local_object {
// TODO: for actors, also refetch after 24 hours
Ok(object)
} else {
dereference_remotely(id, context, request_counter).await
}
}
/// returning none means the object was not found in local db
async fn dereference_locally<Kind>(id: Url, pool: &DbPool) -> Result<Option<Kind>, LemmyError>
where
Kind: FromApub + ApubObject + Send + 'static,
for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
{
let object = blocking(pool, move |conn| {
ApubObject::read_from_apub_id(conn, &id.into())
})
.await?;
match object {
Ok(o) => Ok(Some(o)),
Err(NotFound {}) => Ok(None),
Err(e) => Err(e.into()),
}
}
async fn dereference_remotely<Kind>(
id: &Url,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<Kind, LemmyError>
where
Kind: FromApub + ApubObject + Send + 'static,
for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
{
// dont fetch local objects this way
debug_assert!(id.domain() != Some(&Settings::get().hostname));
*request_counter += 1;
if *request_counter > REQUEST_LIMIT {
return Err(LemmyError::from(anyhow!("Request limit reached")));
}
let res = retry(|| {
context
.client()
.get(id.as_str())
.header("Accept", APUB_JSON_CONTENT_TYPE)
.timeout(Duration::from_secs(60))
.send()
})
.await?;
if res.status() == StatusCode::GONE {
mark_object_deleted::<Kind>(id, context).await?;
return Err(anyhow!("Fetched remote object {} which was deleted", id.to_string()).into());
}
let res2: Kind::ApubType = res.json().await?;
Ok(Kind::from_apub(&res2, context, id, request_counter).await?)
}
async fn mark_object_deleted<Kind>(_id: &Url, _context: &LemmyContext) -> Result<(), LemmyError>
where
Kind: FromApub + ApubObject + Send + 'static,
{
// TODO: need to move methods update_deleted, update_removed etc into a trait to use them here.
// also, how do we know if the object was deleted vs removed?
todo!()
}

View File

@ -1,97 +0,0 @@
use crate::{
fetcher::fetch::fetch_remote_object,
objects::{comment::Note, post::Page, FromApub},
PostOrComment,
};
use anyhow::anyhow;
use diesel::result::Error::NotFound;
use lemmy_api_common::blocking;
use lemmy_db_queries::{ApubObject, Crud};
use lemmy_db_schema::source::{comment::Comment, post::Post};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url;
/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
/// pulled from its apub ID, inserted and returned.
///
/// The parent community is also pulled if necessary. Comments are not pulled.
pub(crate) async fn get_or_fetch_and_insert_post(
post_ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Post, LemmyError> {
let post_ap_id_owned = post_ap_id.to_owned();
let post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, &post_ap_id_owned.into())
})
.await?;
match post {
Ok(p) => Ok(p),
Err(NotFound {}) => {
debug!("Fetching and creating remote post: {}", post_ap_id);
let page =
fetch_remote_object::<Page>(context.client(), post_ap_id, recursion_counter).await?;
let post = Post::from_apub(&page, context, post_ap_id, recursion_counter).await?;
Ok(post)
}
Err(e) => Err(e.into()),
}
}
/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
/// pulled from its apub ID, inserted and returned.
///
/// The parent community, post and comment are also pulled if necessary.
pub(crate) async fn get_or_fetch_and_insert_comment(
comment_ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Comment, LemmyError> {
let comment_ap_id_owned = comment_ap_id.to_owned();
let comment = blocking(context.pool(), move |conn| {
Comment::read_from_apub_id(conn, &comment_ap_id_owned.into())
})
.await?;
match comment {
Ok(p) => Ok(p),
Err(NotFound {}) => {
debug!(
"Fetching and creating remote comment and its parents: {}",
comment_ap_id
);
let comment =
fetch_remote_object::<Note>(context.client(), comment_ap_id, recursion_counter).await?;
let comment = Comment::from_apub(&comment, context, comment_ap_id, recursion_counter).await?;
let post_id = comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
if post.locked {
return Err(anyhow!("Post is locked").into());
}
Ok(comment)
}
Err(e) => Err(e.into()),
}
}
pub(crate) async fn get_or_fetch_and_insert_post_or_comment(
ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<PostOrComment, LemmyError> {
Ok(
match get_or_fetch_and_insert_post(ap_id, context, recursion_counter).await {
Ok(p) => PostOrComment::Post(Box::new(p)),
Err(_) => {
let c = get_or_fetch_and_insert_comment(ap_id, context, recursion_counter).await?;
PostOrComment::Comment(Box::new(c))
}
},
)
}

View File

@ -0,0 +1,93 @@
use crate::objects::{comment::Note, post::Page, FromApub};
use diesel::{result::Error, PgConnection};
use lemmy_db_queries::ApubObject;
use lemmy_db_schema::{
source::{
comment::{Comment, CommentForm},
post::{Post, PostForm},
},
DbUrl,
};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
use url::Url;
pub enum PostOrComment {
Comment(Box<Comment>),
Post(Box<Post>),
}
pub enum PostOrCommentForm {
PostForm(PostForm),
CommentForm(CommentForm),
}
#[derive(Deserialize)]
pub enum PageOrNote {
Page(Page),
Note(Note),
}
#[async_trait::async_trait(?Send)]
impl ApubObject for PostOrComment {
type Form = PostOrCommentForm;
// TODO: this can probably be implemented using a single sql query
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error>
where
Self: Sized,
{
let post = Post::read_from_apub_id(conn, object_id);
Ok(match post {
Ok(o) => PostOrComment::Post(Box::new(o)),
Err(_) => PostOrComment::Comment(Box::new(Comment::read_from_apub_id(conn, object_id)?)),
})
}
fn upsert(conn: &PgConnection, user_form: &PostOrCommentForm) -> Result<Self, Error>
where
Self: Sized,
{
Ok(match user_form {
PostOrCommentForm::PostForm(f) => PostOrComment::Post(Box::new(Post::upsert(conn, f)?)),
PostOrCommentForm::CommentForm(f) => {
PostOrComment::Comment(Box::new(Comment::upsert(conn, f)?))
}
})
}
}
#[async_trait::async_trait(?Send)]
impl FromApub for PostOrComment {
type ApubType = PageOrNote;
async fn from_apub(
apub: &PageOrNote,
context: &LemmyContext,
expected_domain: &Url,
request_counter: &mut i32,
) -> Result<Self, LemmyError>
where
Self: Sized,
{
Ok(match apub {
PageOrNote::Page(p) => PostOrComment::Post(Box::new(
Post::from_apub(p, context, expected_domain, request_counter).await?,
)),
PageOrNote::Note(n) => PostOrComment::Comment(Box::new(
Comment::from_apub(n, context, expected_domain, request_counter).await?,
)),
})
}
}
impl PostOrComment {
pub(crate) fn ap_id(&self) -> Url {
match self {
PostOrComment::Post(p) => p.ap_id.clone(),
PostOrComment::Comment(c) => c.ap_id.clone(),
}
.into()
}
}

View File

@ -9,7 +9,7 @@ pub mod http;
pub mod migrations;
pub mod objects;
use crate::extensions::signatures::PublicKey;
use crate::{extensions::signatures::PublicKey, fetcher::post_or_comment::PostOrComment};
use anyhow::{anyhow, Context};
use diesel::NotFound;
use lemmy_api_common::blocking;
@ -244,21 +244,6 @@ where
Ok(())
}
pub enum PostOrComment {
Comment(Box<Comment>),
Post(Box<Post>),
}
impl PostOrComment {
pub(crate) fn ap_id(&self) -> Url {
match self {
PostOrComment::Post(p) => p.ap_id.clone(),
PostOrComment::Comment(c) => c.ap_id.clone(),
}
.into()
}
}
/// Tries to find a post or comment in the local database, without any network requests.
/// This is used to handle deletions and removals, because in case we dont have the object, we can
/// simply ignore the activity.

View File

@ -1,11 +1,7 @@
use crate::{
activities::verify_person_in_community,
extensions::context::lemmy_context,
fetcher::objects::{
get_or_fetch_and_insert_comment,
get_or_fetch_and_insert_post,
get_or_fetch_and_insert_post_or_comment,
},
fetcher::new_fetcher::dereference,
migrations::CommentInReplyToMigration,
objects::{create_tombstone, get_or_fetch_and_upsert_person, FromApub, Source, ToApub},
ActorType,
@ -86,18 +82,13 @@ impl Note {
CommentInReplyToMigration::Old(in_reply_to) => {
// This post, or the parent comment might not yet exist on this server yet, fetch them.
let post_id = in_reply_to.get(0).context(location_info!())?;
let post = Box::pin(get_or_fetch_and_insert_post(
post_id,
context,
request_counter,
))
.await?;
let post = Box::pin(dereference::<Post>(post_id, context, request_counter)).await?;
// The 2nd item, if it exists, is the parent comment apub_id
// Nested comments will automatically get fetched recursively
let parent_id: Option<CommentId> = match in_reply_to.get(1) {
Some(parent_comment_uri) => {
let parent_comment = Box::pin(get_or_fetch_and_insert_comment(
let parent_comment = Box::pin(dereference::<Comment>(
parent_comment_uri,
context,
request_counter,
@ -112,9 +103,8 @@ impl Note {
Ok((post, parent_id))
}
CommentInReplyToMigration::New(in_reply_to) => {
let parent = Box::pin(
get_or_fetch_and_insert_post_or_comment(in_reply_to, context, request_counter).await?,
);
let parent =
Box::pin(dereference::<PostOrComment>(in_reply_to, context, request_counter).await?);
match parent.deref() {
PostOrComment::Post(p) => {
// Workaround because I cant figure ut how to get the post out of the box (and we dont
@ -229,6 +219,9 @@ impl FromApub for Comment {
let creator =
get_or_fetch_and_upsert_person(&note.attributed_to, context, request_counter).await?;
let (post, parent_comment_id) = note.get_parents(context, request_counter).await?;
if post.locked {
return Err(anyhow!("Post is locked").into());
}
let content = &note.source.content;
let content_slurs_removed = remove_slurs(content);