rewrite community to use new fetcher

rewrite-fetcher
Felix Ableitner 2021-09-22 16:15:20 +02:00
parent 27e409442a
commit 1013d1a46d
13 changed files with 37 additions and 117 deletions

View File

@ -9,7 +9,7 @@ use crate::{
}, },
activity_queue::send_to_community_new, activity_queue::send_to_community_new,
extensions::context::lemmy_context, extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, new_fetcher::dereference}, fetcher::new_fetcher::dereference,
generate_moderators_url, generate_moderators_url,
ActorType, ActorType,
}; };
@ -93,8 +93,7 @@ impl ActivityHandler for AddMod {
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let community = let community = dereference::<Community>(&self.cc[0], context, request_counter).await?;
get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?;
let new_mod = dereference::<Person>(&self.object, context, request_counter).await?; let new_mod = dereference::<Person>(&self.object, context, request_counter).await?;
// If we had to refetch the community while parsing the activity, then the new mod has already // If we had to refetch the community while parsing the activity, then the new mod has already

View File

@ -8,7 +8,7 @@ use crate::{
}, },
activity_queue::send_to_community_new, activity_queue::send_to_community_new,
extensions::context::lemmy_context, extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, new_fetcher::dereference}, fetcher::new_fetcher::dereference,
ActorType, ActorType,
}; };
use activitystreams::{ use activitystreams::{
@ -102,8 +102,7 @@ impl ActivityHandler for BlockUserFromCommunity {
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let community = let community = dereference::<Community>(&self.cc[0], context, request_counter).await?;
get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?;
let blocked_user = dereference::<Person>(&self.object, context, request_counter).await?; let blocked_user = dereference::<Person>(&self.object, context, request_counter).await?;
let community_user_ban_form = CommunityPersonBanForm { let community_user_ban_form = CommunityPersonBanForm {

View File

@ -10,7 +10,7 @@ use crate::{
}, },
activity_queue::send_to_community_new, activity_queue::send_to_community_new,
extensions::context::lemmy_context, extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, new_fetcher::dereference}, fetcher::new_fetcher::dereference,
generate_moderators_url, generate_moderators_url,
ActorType, ActorType,
}; };
@ -108,8 +108,7 @@ impl ActivityHandler for RemoveMod {
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
if self.target.is_some() { if self.target.is_some() {
let community = let community = dereference::<Community>(&self.cc[0], context, request_counter).await?;
get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?;
let remove_mod = dereference::<Person>(&self.object, context, request_counter).await?; let remove_mod = dereference::<Person>(&self.object, context, request_counter).await?;
let form = CommunityModeratorForm { let form = CommunityModeratorForm {

View File

@ -8,7 +8,7 @@ use crate::{
}, },
activity_queue::send_to_community_new, activity_queue::send_to_community_new,
extensions::context::lemmy_context, extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, new_fetcher::dereference}, fetcher::new_fetcher::dereference,
ActorType, ActorType,
}; };
use activitystreams::{ use activitystreams::{
@ -91,8 +91,7 @@ impl ActivityHandler for UndoBlockUserFromCommunity {
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let community = let community = dereference::<Community>(&self.cc[0], context, request_counter).await?;
get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?;
let blocked_user = dereference::<Person>(&self.object.object, context, request_counter).await?; let blocked_user = dereference::<Person>(&self.object.object, context, request_counter).await?;
let community_user_ban_form = CommunityPersonBanForm { let community_user_ban_form = CommunityPersonBanForm {

View File

@ -7,7 +7,7 @@ use crate::{
}, },
activity_queue::send_activity_new, activity_queue::send_activity_new,
extensions::context::lemmy_context, extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, new_fetcher::dereference}, fetcher::new_fetcher::dereference,
ActorType, ActorType,
}; };
use activitystreams::{ use activitystreams::{
@ -90,7 +90,7 @@ impl ActivityHandler for AcceptFollowCommunity {
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let actor = get_or_fetch_and_upsert_community(&self.actor, context, request_counter).await?; let actor = dereference::<Community>(&self.actor, context, request_counter).await?;
let to = dereference::<Person>(&self.to, context, request_counter).await?; let to = dereference::<Person>(&self.to, context, request_counter).await?;
// This will throw an error if no follow was requested // This will throw an error if no follow was requested
blocking(context.pool(), move |conn| { blocking(context.pool(), move |conn| {

View File

@ -7,7 +7,7 @@ use crate::{
}, },
activity_queue::send_activity_new, activity_queue::send_activity_new,
extensions::context::lemmy_context, extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, new_fetcher::dereference}, fetcher::new_fetcher::dereference,
ActorType, ActorType,
}; };
use activitystreams::{ use activitystreams::{
@ -98,8 +98,7 @@ impl ActivityHandler for FollowCommunity {
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let actor = dereference::<Person>(&self.actor, context, request_counter).await?; let actor = dereference::<Person>(&self.actor, context, request_counter).await?;
let community = let community = dereference::<Community>(&self.object, context, request_counter).await?;
get_or_fetch_and_upsert_community(&self.object, context, request_counter).await?;
let community_follower_form = CommunityFollowerForm { let community_follower_form = CommunityFollowerForm {
community_id: community.id, community_id: community.id,
person_id: actor.id, person_id: actor.id,

View File

@ -7,7 +7,7 @@ use crate::{
}, },
activity_queue::send_activity_new, activity_queue::send_activity_new,
extensions::context::lemmy_context, extensions::context::lemmy_context,
fetcher::{community::get_or_fetch_and_upsert_community, new_fetcher::dereference}, fetcher::new_fetcher::dereference,
ActorType, ActorType,
}; };
use activitystreams::{ use activitystreams::{
@ -85,7 +85,7 @@ impl ActivityHandler for UndoFollowCommunity {
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let actor = dereference::<Person>(&self.actor, context, request_counter).await?; let actor = dereference::<Person>(&self.actor, context, request_counter).await?;
let community = get_or_fetch_and_upsert_community(&self.to, context, request_counter).await?; let community = dereference::<Community>(&self.to, context, request_counter).await?;
let community_follower_form = CommunityFollowerForm { let community_follower_form = CommunityFollowerForm {
community_id: community.id, community_id: community.id,

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
check_community_or_site_ban, check_community_or_site_ban,
check_is_apub_id_valid, check_is_apub_id_valid,
fetcher::{community::get_or_fetch_and_upsert_community, new_fetcher::dereference}, fetcher::new_fetcher::dereference,
generate_moderators_url, generate_moderators_url,
}; };
use anyhow::anyhow; use anyhow::anyhow;
@ -58,7 +58,7 @@ pub(crate) async fn extract_community(
let mut cc_iter = cc.iter(); let mut cc_iter = cc.iter();
loop { loop {
if let Some(cid) = cc_iter.next() { if let Some(cid) = cc_iter.next() {
if let Ok(c) = get_or_fetch_and_upsert_community(cid, context, request_counter).await { if let Ok(c) = dereference(cid, context, request_counter).await {
break Ok(c); break Ok(c);
} }
} else { } else {
@ -75,7 +75,7 @@ pub(crate) async fn verify_person_in_community(
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let community = get_or_fetch_and_upsert_community(community_id, context, request_counter).await?; let community = dereference::<Community>(community_id, context, request_counter).await?;
let person = dereference::<Person>(person_id, context, request_counter).await?; let person = dereference::<Person>(person_id, context, request_counter).await?;
check_community_or_site_ban(&person, community.id, context.pool()).await check_community_or_site_ban(&person, community.id, context.pool()).await
} }
@ -86,7 +86,7 @@ async fn verify_community(
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
get_or_fetch_and_upsert_community(community_id, context, request_counter).await?; dereference::<Community>(community_id, context, request_counter).await?;
Ok(()) Ok(())
} }

View File

@ -1,19 +1,13 @@
use crate::{ use crate::{
activities::community::announce::AnnounceActivity, activities::community::announce::AnnounceActivity,
fetcher::{ fetcher::{fetch::fetch_remote_object, new_fetcher::dereference},
fetch::fetch_remote_object, objects::community::Group,
is_deleted,
new_fetcher::dereference,
should_refetch_actor,
},
objects::{community::Group, FromApub},
}; };
use activitystreams::collection::{CollectionExt, OrderedCollection}; use activitystreams::collection::{CollectionExt, OrderedCollection};
use anyhow::Context; use anyhow::Context;
use diesel::result::Error::NotFound;
use lemmy_api_common::blocking; use lemmy_api_common::blocking;
use lemmy_apub_lib::ActivityHandler; use lemmy_apub_lib::ActivityHandler;
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable}; use lemmy_db_queries::Joinable;
use lemmy_db_schema::source::{ use lemmy_db_schema::source::{
community::{Community, CommunityModerator, CommunityModeratorForm}, community::{Community, CommunityModerator, CommunityModeratorForm},
person::Person, person::Person,
@ -21,75 +15,9 @@ use lemmy_db_schema::source::{
use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView; use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView;
use lemmy_utils::{location_info, LemmyError}; use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext; use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url; use url::Url;
/// Get a community from its apub ID. pub(crate) async fn update_community_mods(
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_community(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Community, LemmyError> {
let apub_id_owned = apub_id.to_owned();
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &apub_id_owned.into())
})
.await?;
match community {
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
debug!("Fetching and updating from remote community: {}", apub_id);
fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
}
Ok(c) => Ok(c),
Err(NotFound {}) => {
debug!("Fetching and creating remote community: {}", apub_id);
fetch_remote_community(apub_id, context, None, recursion_counter).await
}
Err(e) => Err(e.into()),
}
}
/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
/// is set, this is an update for a community which is already known locally. If not, we don't know
/// the community yet and also pull the outbox, to get some initial posts.
async fn fetch_remote_community(
apub_id: &Url,
context: &LemmyContext,
old_community: Option<Community>,
request_counter: &mut i32,
) -> Result<Community, LemmyError> {
let group = fetch_remote_object::<Group>(context.client(), apub_id, request_counter).await;
if let Some(c) = old_community.to_owned() {
if is_deleted(&group) {
blocking(context.pool(), move |conn| {
Community::update_deleted(conn, c.id, true)
})
.await??;
} else if group.is_err() {
// If fetching failed, return the existing data.
return Ok(c);
}
}
let group = group?;
let community = Community::from_apub(&group, context, apub_id, request_counter).await?;
update_community_mods(&group, &community, context, request_counter).await?;
// only fetch outbox for new communities, otherwise this can create an infinite loop
if old_community.is_none() {
fetch_community_outbox(context, &group.outbox, request_counter).await?
}
Ok(community)
}
async fn update_community_mods(
group: &Group, group: &Group,
community: &Community, community: &Community,
context: &LemmyContext, context: &LemmyContext,
@ -139,7 +67,7 @@ async fn update_community_mods(
Ok(()) Ok(())
} }
async fn fetch_community_outbox( pub(crate) async fn fetch_community_outbox(
context: &LemmyContext, context: &LemmyContext,
outbox: &Url, outbox: &Url,
recursion_counter: &mut i32, recursion_counter: &mut i32,

View File

@ -5,16 +5,15 @@ pub mod post_or_comment;
pub mod search; pub mod search;
use crate::{ use crate::{
fetcher::{ fetcher::{fetch::FetchError, new_fetcher::dereference},
community::get_or_fetch_and_upsert_community,
fetch::FetchError,
new_fetcher::dereference,
},
ActorType, ActorType,
}; };
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use http::StatusCode; use http::StatusCode;
use lemmy_db_schema::{naive_now, source::person::Person}; use lemmy_db_schema::{
naive_now,
source::{community::Community, person::Person},
};
use lemmy_utils::LemmyError; use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext; use lemmy_websocket::LemmyContext;
use serde::Deserialize; use serde::Deserialize;
@ -47,7 +46,7 @@ pub(crate) async fn get_or_fetch_and_upsert_actor(
context: &LemmyContext, context: &LemmyContext,
recursion_counter: &mut i32, recursion_counter: &mut i32,
) -> Result<Box<dyn ActorType>, LemmyError> { ) -> Result<Box<dyn ActorType>, LemmyError> {
let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await; let community = dereference::<Community>(apub_id, context, recursion_counter).await;
let actor: Box<dyn ActorType> = match community { let actor: Box<dyn ActorType> = match community {
Ok(c) => Box::new(c), Ok(c) => Box::new(c),
Err(_) => Box::new(dereference::<Person>(apub_id, context, recursion_counter).await?), Err(_) => Box::new(dereference::<Person>(apub_id, context, recursion_counter).await?),

View File

@ -27,7 +27,6 @@ where
let db_object = dereference_locally::<Kind>(id.clone(), context.pool()).await?; let db_object = dereference_locally::<Kind>(id.clone(), context.pool()).await?;
// if its a local object, only fetch it from the database and not over http // if its a local object, only fetch it from the database and not over http
if id.domain() == Some(&Settings::get().get_hostname_without_port()?) { if id.domain() == Some(&Settings::get().get_hostname_without_port()?) {
dbg!("is local object", db_object.is_some());
return match db_object { return match db_object {
None => Err(NotFound {}.into()), None => Err(NotFound {}.into()),
Some(o) => Ok(o), Some(o) => Ok(o),

View File

@ -1,10 +1,5 @@
use crate::{ use crate::{
fetcher::{ fetcher::{fetch::fetch_remote_object, is_deleted, new_fetcher::dereference},
community::get_or_fetch_and_upsert_community,
fetch::fetch_remote_object,
is_deleted,
new_fetcher::dereference,
},
find_object_by_id, find_object_by_id,
objects::{comment::Note, community::Group, person::Person as ApubPerson, post::Page, FromApub}, objects::{comment::Note, community::Group, person::Person as ApubPerson, post::Page, FromApub},
Object, Object,
@ -143,8 +138,7 @@ async fn build_response(
} }
SearchAcceptedObjects::Group(g) => { SearchAcceptedObjects::Group(g) => {
let community_uri = g.id(&query_url)?; let community_uri = g.id(&query_url)?;
let community = let community = dereference::<Community>(community_uri, context, recursion_counter).await?;
get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
ROR { ROR {
community: blocking(context.pool(), move |conn| { community: blocking(context.pool(), move |conn| {
CommunityView::read(conn, community.id, None) CommunityView::read(conn, community.id, None)

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
extensions::{context::lemmy_context, signatures::PublicKey}, extensions::{context::lemmy_context, signatures::PublicKey},
fetcher::community::fetch_community_mods, fetcher::community::{fetch_community_mods, fetch_community_outbox, update_community_mods},
generate_moderators_url, generate_moderators_url,
objects::{create_tombstone, FromApub, ImageObject, Source, ToApub}, objects::{create_tombstone, FromApub, ImageObject, Source, ToApub},
ActorType, ActorType,
@ -179,6 +179,11 @@ impl FromApub for Community {
let form = Group::from_apub_to_form(group, expected_domain).await?; let form = Group::from_apub_to_form(group, expected_domain).await?;
let community = blocking(context.pool(), move |conn| Community::upsert(conn, &form)).await??; let community = blocking(context.pool(), move |conn| Community::upsert(conn, &form)).await??;
update_community_mods(group, &community, context, request_counter).await?;
// TODO: doing this unconditionally might cause infinite loop for some reason
fetch_community_outbox(context, &group.outbox, request_counter).await?;
Ok(community) Ok(community)
} }
} }