rewrite apub object search to be generic

rewrite-fetcher
Felix Ableitner 2021-09-24 14:11:12 +02:00
parent 64619d7eb1
commit d4330dfea5
8 changed files with 191 additions and 321 deletions

View File

@ -135,7 +135,8 @@ test('Update a post', async () => {
test('Sticky a post', async () => {
let postRes = await createPost(alpha, betaCommunity.community.id);
let stickiedPostRes = await stickyPost(alpha, true, postRes.post_view.post);
let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post;
let stickiedPostRes = await stickyPost(beta, true, betaPost1.post);
expect(stickiedPostRes.post_view.post.stickied).toBe(true);
// Make sure that post is stickied on beta
@ -145,7 +146,7 @@ test('Sticky a post', async () => {
expect(betaPost.post.stickied).toBe(true);
// Unsticky a post
let unstickiedPost = await stickyPost(alpha, false, postRes.post_view.post);
let unstickiedPost = await stickyPost(beta, false, betaPost1.post);
expect(unstickiedPost.post_view.post.stickied).toBe(false);
// Make sure that post is unstickied on beta

View File

@ -1,6 +1,7 @@
use crate::Perform;
use actix_web::web::Data;
use anyhow::Context;
use diesel::NotFound;
use lemmy_api_common::{
blocking,
build_federated_instances,
@ -9,24 +10,32 @@ use lemmy_api_common::{
is_admin,
site::*,
};
use lemmy_apub::{build_actor_id_from_shortname, fetcher::search::search_by_apub_id, EndpointType};
use lemmy_apub::{
build_actor_id_from_shortname,
fetcher::search::{search_by_apub_id, SearchableObjects},
EndpointType,
};
use lemmy_db_queries::{
from_opt_str_to_opt_enum,
source::site::Site_,
Crud,
DbPool,
DeleteableOrRemoveable,
ListingType,
SearchType,
SortType,
};
use lemmy_db_schema::source::{moderator::*, site::Site};
use lemmy_db_schema::{
source::{moderator::*, site::Site},
PersonId,
};
use lemmy_db_views::{
comment_view::CommentQueryBuilder,
post_view::PostQueryBuilder,
comment_view::{CommentQueryBuilder, CommentView},
post_view::{PostQueryBuilder, PostView},
site_view::SiteView,
};
use lemmy_db_views_actor::{
community_view::CommunityQueryBuilder,
community_view::{CommunityQueryBuilder, CommunityView},
person_view::{PersonQueryBuilder, PersonViewSafe},
};
use lemmy_db_views_moderator::{
@ -376,13 +385,54 @@ impl Perform for ResolveObject {
_websocket_id: Option<ConnectionId>,
) -> Result<ResolveObjectResponse, LemmyError> {
let local_user_view = get_local_user_view_from_jwt_opt(&self.auth, context.pool()).await?;
let res = search_by_apub_id(&self.q, local_user_view, context)
let res = search_by_apub_id(&self.q, context)
.await
.map_err(|_| ApiError::err("couldnt_find_object"))?;
Ok(res)
convert_response(res, local_user_view.map(|l| l.person.id), context.pool())
.await
.map_err(|_| ApiError::err("couldnt_find_object").into())
}
}
async fn convert_response(
object: SearchableObjects,
user_id: Option<PersonId>,
pool: &DbPool,
) -> Result<ResolveObjectResponse, LemmyError> {
let removed_or_deleted;
let mut res = ResolveObjectResponse {
comment: None,
post: None,
community: None,
person: None,
};
use SearchableObjects::*;
match object {
Person(p) => {
removed_or_deleted = p.deleted;
res.person = Some(blocking(pool, move |conn| PersonViewSafe::read(conn, p.id)).await??)
}
Community(c) => {
removed_or_deleted = c.deleted || c.removed;
res.community =
Some(blocking(pool, move |conn| CommunityView::read(conn, c.id, user_id)).await??)
}
Post(p) => {
removed_or_deleted = p.deleted || p.removed;
res.post = Some(blocking(pool, move |conn| PostView::read(conn, p.id, user_id)).await??)
}
Comment(c) => {
removed_or_deleted = c.deleted || c.removed;
res.comment = Some(blocking(pool, move |conn| CommentView::read(conn, c.id, user_id)).await??)
}
};
// if the object was deleted from database, dont return it
if removed_or_deleted {
return Err(NotFound {}.into());
}
Ok(res)
}
#[async_trait::async_trait(?Send)]
impl Perform for TransferSite {
type Response = GetSiteResponse;

View File

@ -49,6 +49,7 @@ use lemmy_websocket::{
UserOperationCrud,
};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use url::Url;
/// This is very confusing, because there are four distinct cases to handle:
@ -59,6 +60,7 @@ use url::Url;
///
/// TODO: we should probably change how community deletions work to simplify this. Probably by
/// wrapping it in an announce just like other activities, instead of having the community send it.
#[skip_serializing_none]
#[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)]
#[serde(rename_all = "camelCase")]
pub struct Delete {

View File

@ -2,10 +2,9 @@ use crate::{check_is_apub_id_valid, APUB_JSON_CONTENT_TYPE};
use anyhow::anyhow;
use lemmy_utils::{request::retry, LemmyError};
use log::info;
use reqwest::{Client, StatusCode};
use reqwest::Client;
use serde::Deserialize;
use std::time::Duration;
use thiserror::Error;
use url::Url;
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
@ -15,50 +14,19 @@ use url::Url;
/// So we are looking at a maximum of 22 requests (rounded up just to be safe).
static MAX_REQUEST_NUMBER: i32 = 25;
#[derive(Debug, Error)]
pub(in crate::fetcher) struct FetchError {
pub inner: anyhow::Error,
pub status_code: Option<StatusCode>,
}
impl From<LemmyError> for FetchError {
fn from(t: LemmyError) -> Self {
FetchError {
inner: t.inner,
status_code: None,
}
}
}
impl From<reqwest::Error> for FetchError {
fn from(t: reqwest::Error) -> Self {
let status = t.status();
FetchError {
inner: t.into(),
status_code: status,
}
}
}
impl std::fmt::Display for FetchError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
std::fmt::Display::fmt(&self, f)
}
}
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
/// timeouts etc.
pub(in crate::fetcher) async fn fetch_remote_object<Response>(
client: &Client,
url: &Url,
recursion_counter: &mut i32,
) -> Result<Response, FetchError>
) -> Result<Response, LemmyError>
where
Response: for<'de> Deserialize<'de> + std::fmt::Debug,
{
*recursion_counter += 1;
if *recursion_counter > MAX_REQUEST_NUMBER {
return Err(LemmyError::from(anyhow!("Maximum recursion depth reached")).into());
return Err(anyhow!("Maximum recursion depth reached").into());
}
check_is_apub_id_valid(url, false)?;
@ -73,14 +41,6 @@ where
})
.await?;
if res.status() == StatusCode::GONE {
info!("Fetched remote object {} which was deleted", url);
return Err(FetchError {
inner: anyhow!("Fetched remote object {} which was deleted", url),
status_code: Some(res.status()),
});
}
let object = res.json().await?;
info!("Fetched remote object {}", url);
Ok(object)

View File

@ -5,38 +5,19 @@ pub mod object_id;
pub mod post_or_comment;
pub mod search;
use crate::{
fetcher::{fetch::FetchError, object_id::ObjectId},
ActorType,
};
use crate::{fetcher::object_id::ObjectId, ActorType};
use chrono::NaiveDateTime;
use http::StatusCode;
use lemmy_db_schema::{
naive_now,
source::{community::Community, person::Person},
};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
use url::Url;
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
fn is_deleted<Response>(fetch_response: &Result<Response, FetchError>) -> bool
where
Response: for<'de> Deserialize<'de>,
{
if let Err(e) = fetch_response {
if let Some(status) = e.status_code {
if status == StatusCode::GONE {
return true;
}
}
}
false
}
/// Get a remote actor from its apub ID (either a person or a community). Thin wrapper around
/// `get_or_fetch_and_upsert_person()` and `get_or_fetch_and_upsert_community()`.
///

View File

@ -14,7 +14,7 @@ use log::debug;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
fmt::{Debug, Display, Formatter},
marker::PhantomData,
time::Duration,
};
@ -54,7 +54,9 @@ where
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<Kind, LemmyError> {
debug!("dereference {}", self.to_string());
let db_object = self.dereference_locally(context.pool()).await?;
// if its a local object, only fetch it from the database and not over http
if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) {
return match db_object {
@ -84,6 +86,7 @@ where
/// returning none means the object was not found in local db
async fn dereference_locally(&self, pool: &DbPool) -> Result<Option<Kind>, LemmyError> {
debug!("dereference_locally {}", self.to_string());
let id: DbUrl = self.0.clone().into();
let object = blocking(pool, move |conn| ApubObject::read_from_apub_id(conn, &id)).await?;
match object {
@ -99,6 +102,7 @@ where
request_counter: &mut i32,
db_object: Option<Kind>,
) -> Result<Kind, LemmyError> {
debug!("dereference_remotely {}", self.to_string());
// dont fetch local objects this way
debug_assert!(self.0.domain() != Some(&Settings::get().hostname));
@ -118,6 +122,7 @@ where
.await?;
if res.status() == StatusCode::GONE {
debug!("is deleted {}", self.to_string());
if let Some(db_object) = db_object {
db_object.delete(context).await?;
}

View File

@ -1,47 +1,27 @@
use crate::{
fetcher::{fetch::fetch_remote_object, is_deleted, object_id::ObjectId},
find_object_by_id,
fetcher::{deletable_apub_object::DeletableApubObject, object_id::ObjectId},
objects::{comment::Note, community::Group, person::Person as ApubPerson, post::Page, FromApub},
Object,
};
use activitystreams::chrono::NaiveDateTime;
use anyhow::anyhow;
use diesel::{result::Error, PgConnection};
use itertools::Itertools;
use lemmy_api_common::{blocking, site::ResolveObjectResponse};
use lemmy_api_common::blocking;
use lemmy_apub_lib::webfinger::{webfinger_resolve_actor, WebfingerType};
use lemmy_db_queries::source::{
comment::Comment_,
community::Community_,
person::Person_,
post::Post_,
private_message::PrivateMessage_,
use lemmy_db_queries::{
source::{community::Community_, person::Person_},
ApubObject,
DbPool,
};
use lemmy_db_schema::source::{
comment::Comment,
community::Community,
person::Person,
post::Post,
private_message::PrivateMessage,
use lemmy_db_schema::{
source::{comment::Comment, community::Community, person::Person, post::Post},
DbUrl,
};
use lemmy_db_views::{
comment_view::CommentView,
local_user_view::LocalUserView,
post_view::PostView,
};
use lemmy_db_views_actor::{community_view::CommunityView, person_view::PersonViewSafe};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
use url::Url;
/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
#[derive(serde::Deserialize, Debug)]
#[serde(untagged)]
enum SearchAcceptedObjects {
Person(Box<ApubPerson>),
Group(Box<Group>),
Page(Box<Page>),
Comment(Box<Note>),
}
/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
///
/// Some working examples for use with the `docker/federation/` setup:
@ -51,9 +31,8 @@ enum SearchAcceptedObjects {
/// http://lemmy_delta:8571/comment/2
pub async fn search_by_apub_id(
query: &str,
local_user_view: Option<LocalUserView>,
context: &LemmyContext,
) -> Result<ResolveObjectResponse, LemmyError> {
) -> Result<SearchableObjects, LemmyError> {
let query_url = match Url::parse(query) {
Ok(u) => u,
Err(_) => {
@ -70,144 +49,120 @@ pub async fn search_by_apub_id(
}
// local actor, read from database and return
else {
let name: String = name.into();
return match kind {
WebfingerType::Group => {
let res = blocking(context.pool(), move |conn| {
let community = Community::read_from_name(conn, &name)?;
CommunityView::read(conn, community.id, local_user_view.map(|l| l.person.id))
})
.await??;
Ok(ResolveObjectResponse {
community: Some(res),
..ResolveObjectResponse::default()
})
}
WebfingerType::Person => {
let res = blocking(context.pool(), move |conn| {
let person = Person::find_by_name(conn, &name)?;
PersonViewSafe::read(conn, person.id)
})
.await??;
Ok(ResolveObjectResponse {
person: Some(res),
..ResolveObjectResponse::default()
})
}
};
return find_local_actor_by_name(name, kind, context.pool()).await;
}
}
};
let request_counter = &mut 0;
// this does a fetch (even for local objects), just to determine its type and fetch it again
// below. we need to fix this when rewriting the fetcher.
let fetch_response =
fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url, request_counter)
.await;
if is_deleted(&fetch_response) {
delete_object_locally(&query_url, context).await?;
return Err(anyhow!("Object was deleted").into());
}
// Necessary because we get a stack overflow using FetchError
let fet_res = fetch_response.map_err(|e| LemmyError::from(e.inner))?;
build_response(fet_res, query_url, request_counter, context).await
ObjectId::<SearchableObjects>::new(query_url)
.dereference(context, request_counter)
.await
}
async fn build_response(
fetch_response: SearchAcceptedObjects,
query_url: Url,
recursion_counter: &mut i32,
async fn find_local_actor_by_name(
name: &str,
kind: WebfingerType,
pool: &DbPool,
) -> Result<SearchableObjects, LemmyError> {
let name: String = name.into();
Ok(match kind {
WebfingerType::Group => SearchableObjects::Community(
blocking(pool, move |conn| Community::read_from_name(conn, &name)).await??,
),
WebfingerType::Person => SearchableObjects::Person(
blocking(pool, move |conn| Person::find_by_name(conn, &name)).await??,
),
})
}
/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
#[derive(Debug)]
pub enum SearchableObjects {
Person(Person),
Community(Community),
Post(Post),
Comment(Comment),
}
#[derive(Deserialize)]
#[serde(untagged)]
pub enum SearchableApubTypes {
Group(Group),
Person(ApubPerson),
Page(Page),
Note(Note),
}
impl ApubObject for SearchableObjects {
type Form = ();
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
match self {
SearchableObjects::Person(p) => p.last_refreshed_at(),
SearchableObjects::Community(c) => c.last_refreshed_at(),
SearchableObjects::Post(p) => p.last_refreshed_at(),
SearchableObjects::Comment(c) => c.last_refreshed_at(),
}
}
// TODO: this is inefficient, because if the object is not in local db, it will run 4 db queries
// before finally returning an error. it would be nice if we could check all 4 tables in
// a single query.
// we could skip this and always return an error, but then it would not be able to mark
// objects as deleted that were deleted by remote server.
fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Self, Error> {
let c = Community::read_from_apub_id(conn, object_id);
if let Ok(c) = c {
return Ok(SearchableObjects::Community(c));
}
let p = Person::read_from_apub_id(conn, object_id);
if let Ok(p) = p {
return Ok(SearchableObjects::Person(p));
}
let p = Post::read_from_apub_id(conn, object_id);
if let Ok(p) = p {
return Ok(SearchableObjects::Post(p));
}
let c = Comment::read_from_apub_id(conn, object_id);
Ok(SearchableObjects::Comment(c?))
}
// TODO: move this (and the Form type) elsewhere, as it isnt really needed on this trait
fn upsert(_conn: &PgConnection, _user_form: &Self::Form) -> Result<Self, Error> {
unimplemented!()
}
}
#[async_trait::async_trait(?Send)]
impl FromApub for SearchableObjects {
type ApubType = SearchableApubTypes;
async fn from_apub(
apub: &Self::ApubType,
context: &LemmyContext,
) -> Result<ResolveObjectResponse, LemmyError> {
use ResolveObjectResponse as ROR;
Ok(match fetch_response {
SearchAcceptedObjects::Person(p) => {
let person_uri: ObjectId<Person> = ObjectId::<Person>::new(p.id(&query_url)?.clone());
let person = person_uri.dereference(context, recursion_counter).await?;
ROR {
person: blocking(context.pool(), move |conn| {
PersonViewSafe::read(conn, person.id)
ed: &Url,
rc: &mut i32,
) -> Result<Self, LemmyError> {
use SearchableApubTypes as SAT;
use SearchableObjects as SO;
Ok(match apub {
SAT::Group(g) => SO::Community(Community::from_apub(g, context, ed, rc).await?),
SAT::Person(p) => SO::Person(Person::from_apub(p, context, ed, rc).await?),
SAT::Page(p) => SO::Post(Post::from_apub(p, context, ed, rc).await?),
SAT::Note(n) => SO::Comment(Comment::from_apub(n, context, ed, rc).await?),
})
.await?
.ok(),
..ROR::default()
}
}
SearchAcceptedObjects::Group(g) => {
let community_uri: ObjectId<Community> =
ObjectId::<Community>::new(g.id(&query_url)?.clone());
let community = community_uri
.dereference(context, recursion_counter)
.await?;
ROR {
community: blocking(context.pool(), move |conn| {
CommunityView::read(conn, community.id, None)
})
.await?
.ok(),
..ROR::default()
}
}
SearchAcceptedObjects::Page(p) => {
let p = Post::from_apub(&p, context, &query_url, recursion_counter).await?;
ROR {
post: blocking(context.pool(), move |conn| PostView::read(conn, p.id, None))
.await?
.ok(),
..ROR::default()
}
}
SearchAcceptedObjects::Comment(c) => {
let c = Comment::from_apub(&c, context, &query_url, recursion_counter).await?;
ROR {
comment: blocking(context.pool(), move |conn| {
CommentView::read(conn, c.id, None)
})
.await?
.ok(),
..ROR::default()
}
}
})
}
async fn delete_object_locally(query_url: &Url, context: &LemmyContext) -> Result<(), LemmyError> {
let res = find_object_by_id(context, query_url.to_owned()).await?;
match res {
Object::Comment(c) => {
blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, c.id, true)
})
.await??;
}
Object::Post(p) => {
blocking(context.pool(), move |conn| {
Post::update_deleted(conn, p.id, true)
})
.await??;
}
Object::Person(u) => {
// TODO: implement update_deleted() for user, move it to ApubObject trait
blocking(context.pool(), move |conn| {
Person::delete_account(conn, u.id)
})
.await??;
}
Object::Community(c) => {
blocking(context.pool(), move |conn| {
Community::update_deleted(conn, c.id, true)
})
.await??;
}
Object::PrivateMessage(pm) => {
blocking(context.pool(), move |conn| {
PrivateMessage::update_deleted(conn, pm.id, true)
})
.await??;
#[async_trait::async_trait(?Send)]
impl DeletableApubObject for SearchableObjects {
async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> {
match self {
SearchableObjects::Person(p) => p.delete(context).await,
SearchableObjects::Community(c) => c.delete(context).await,
SearchableObjects::Post(p) => p.delete(context).await,
SearchableObjects::Comment(c) => c.delete(context).await,
}
}
Ok(())
}

View File

@ -11,24 +11,15 @@ pub mod objects;
use crate::{extensions::signatures::PublicKey, fetcher::post_or_comment::PostOrComment};
use anyhow::{anyhow, Context};
use diesel::NotFound;
use lemmy_api_common::blocking;
use lemmy_db_queries::{source::activity::Activity_, ApubObject, DbPool};
use lemmy_db_queries::{source::activity::Activity_, DbPool};
use lemmy_db_schema::{
source::{
activity::Activity,
comment::Comment,
community::Community,
person::{Person as DbPerson, Person},
post::Post,
private_message::PrivateMessage,
},
source::{activity::Activity, person::Person},
CommunityId,
DbUrl,
};
use lemmy_db_views_actor::community_person_ban_view::CommunityPersonBanView;
use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
use serde::Serialize;
use std::net::IpAddr;
use url::{ParseError, Url};
@ -244,81 +235,6 @@ where
Ok(())
}
/// Tries to find a post or comment in the local database, without any network requests.
/// This is used to handle deletions and removals, because in case we dont have the object, we can
/// simply ignore the activity.
pub(crate) async fn find_post_or_comment_by_id(
context: &LemmyContext,
apub_id: Url,
) -> Result<PostOrComment, LemmyError> {
let ap_id = apub_id.clone();
let post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, &ap_id.into())
})
.await?;
if let Ok(p) = post {
return Ok(PostOrComment::Post(Box::new(p)));
}
let ap_id = apub_id.clone();
let comment = blocking(context.pool(), move |conn| {
Comment::read_from_apub_id(conn, &ap_id.into())
})
.await?;
if let Ok(c) = comment {
return Ok(PostOrComment::Comment(Box::new(c)));
}
Err(NotFound.into())
}
#[derive(Debug)]
enum Object {
Comment(Box<Comment>),
Post(Box<Post>),
Community(Box<Community>),
Person(Box<DbPerson>),
PrivateMessage(Box<PrivateMessage>),
}
async fn find_object_by_id(context: &LemmyContext, apub_id: Url) -> Result<Object, LemmyError> {
let ap_id = apub_id.clone();
if let Ok(pc) = find_post_or_comment_by_id(context, ap_id.to_owned()).await {
return Ok(match pc {
PostOrComment::Post(p) => Object::Post(Box::new(*p)),
PostOrComment::Comment(c) => Object::Comment(Box::new(*c)),
});
}
let ap_id = apub_id.clone();
let person = blocking(context.pool(), move |conn| {
DbPerson::read_from_apub_id(conn, &ap_id.into())
})
.await?;
if let Ok(u) = person {
return Ok(Object::Person(Box::new(u)));
}
let ap_id = apub_id.clone();
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &ap_id.into())
})
.await?;
if let Ok(c) = community {
return Ok(Object::Community(Box::new(c)));
}
let private_message = blocking(context.pool(), move |conn| {
PrivateMessage::read_from_apub_id(conn, &apub_id.into())
})
.await?;
if let Ok(pm) = private_message {
return Ok(Object::PrivateMessage(Box::new(pm)));
}
Err(NotFound.into())
}
async fn check_community_or_site_ban(
person: &Person,
community_id: CommunityId,