Avoid duplicate comment send, better activity logging

pull/1177/head
Felix Ableitner 2020-10-06 18:28:31 +02:00
parent ca4868cefd
commit 60730e81d9
5 changed files with 43 additions and 24 deletions

View File

@ -6,7 +6,7 @@ use crate::{
ActorType, ActorType,
}; };
use activitystreams::{ use activitystreams::{
base::{Extends, ExtendsExt}, base::{BaseExt, Extends, ExtendsExt},
object::AsObject, object::AsObject,
}; };
use anyhow::{anyhow, Context, Error}; use anyhow::{anyhow, Context, Error};
@ -23,9 +23,9 @@ use itertools::Itertools;
use lemmy_db::{community::Community, user::User_, DbPool}; use lemmy_db::{community::Community, user::User_, DbPool};
use lemmy_utils::{location_info, settings::Settings, LemmyError}; use lemmy_utils::{location_info, settings::Settings, LemmyError};
use lemmy_websocket::LemmyContext; use lemmy_websocket::LemmyContext;
use log::warn; use log::{debug, warn};
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{export::fmt::Debug, Deserialize, Serialize};
use std::{collections::BTreeMap, future::Future, pin::Pin}; use std::{collections::BTreeMap, future::Future, pin::Pin};
use url::Url; use url::Url;
@ -36,11 +36,12 @@ pub async fn send_activity_single_dest<T, Kind>(
context: &LemmyContext, context: &LemmyContext,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: AsObject<Kind> + Extends<Kind>, T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
Kind: Serialize, Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static, <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{ {
if check_is_apub_id_valid(&to).is_ok() { if check_is_apub_id_valid(&to).is_ok() {
debug!("Sending activity {:?} to {}", &activity.id_unchecked(), &to);
send_activity_internal( send_activity_internal(
context.activity_queue(), context.activity_queue(),
activity, activity,
@ -62,7 +63,7 @@ pub async fn send_to_community_followers<T, Kind>(
sender_shared_inbox: Option<Url>, sender_shared_inbox: Option<Url>,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: AsObject<Kind> + Extends<Kind>, T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
Kind: Serialize, Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static, <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{ {
@ -79,6 +80,11 @@ where
.unique() .unique()
.map(|inbox| inbox.to_owned()) .map(|inbox| inbox.to_owned())
.collect(); .collect();
debug!(
"Sending activity {:?} to followers of {}",
&activity.id_unchecked(),
&community.actor_id
);
send_activity_internal( send_activity_internal(
context.activity_queue(), context.activity_queue(),
@ -100,7 +106,7 @@ pub async fn send_to_community<T, Kind>(
context: &LemmyContext, context: &LemmyContext,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: AsObject<Kind> + Extends<Kind>, T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
Kind: Serialize, Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static, <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{ {
@ -110,6 +116,11 @@ where
} else { } else {
let inbox = community.get_shared_inbox_url()?; let inbox = community.get_shared_inbox_url()?;
check_is_apub_id_valid(&inbox)?; check_is_apub_id_valid(&inbox)?;
debug!(
"Sending activity {:?} to community {}",
&activity.id_unchecked(),
&community.actor_id
);
send_activity_internal( send_activity_internal(
context.activity_queue(), context.activity_queue(),
activity, activity,
@ -131,10 +142,16 @@ pub async fn send_comment_mentions<T, Kind>(
context: &LemmyContext, context: &LemmyContext,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: AsObject<Kind> + Extends<Kind>, T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
Kind: Serialize, Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static, <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{ {
dbg!(&mentions, &activity.id_unchecked());
debug!(
"Sending mentions activity {:?} to {:?}",
&activity.id_unchecked(),
&mentions
);
let mentions = mentions let mentions = mentions
.iter() .iter()
.filter(|inbox| check_is_apub_id_valid(inbox).is_ok()) .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
@ -165,7 +182,7 @@ async fn send_activity_internal<T, Kind>(
insert_into_db: bool, insert_into_db: bool,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: AsObject<Kind> + Extends<Kind>, T: AsObject<Kind> + Extends<Kind> + Debug,
Kind: Serialize, Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static, <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{ {

View File

@ -506,12 +506,7 @@ async fn collect_non_local_mentions_and_addresses(
} }
} }
let mut inboxes: Vec<Url> = vec![]; let inboxes = mention_inboxes.into_iter().unique().collect();
if !community.local {
inboxes.push(community.get_shared_inbox_url()?);
}
inboxes.extend(mention_inboxes);
inboxes = inboxes.into_iter().unique().collect();
Ok(MentionsAndAddresses { Ok(MentionsAndAddresses {
addressed_ccs, addressed_ccs,

View File

@ -57,14 +57,16 @@ pub async fn community_inbox(
.into(), .into(),
); );
} }
debug!(
"Community {} received activity {:?}",
&community.name, &activity
);
let user_uri = activity let user_uri = activity
.actor()? .actor()?
.as_single_xsd_any_uri() .as_single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
debug!(
"Community {} inbox received activity {:?} from {}",
community.name,
&activity.id_unchecked(),
&user_uri
);
check_is_apub_id_valid(user_uri)?; check_is_apub_id_valid(user_uri)?;
let user = get_or_fetch_and_upsert_user(&user_uri, &context).await?; let user = get_or_fetch_and_upsert_user(&user_uri, &context).await?;

View File

@ -60,17 +60,17 @@ pub async fn shared_inbox(
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let activity = input.into_inner(); let activity = input.into_inner();
let json = serde_json::to_string_pretty(&activity)?;
debug!("Shared inbox received activity: {}", json);
// TODO: if we already received an activity with identical ID, then ignore this (same in other inboxes)
let sender = &activity let sender = &activity
.actor()? .actor()?
.to_owned() .to_owned()
.single_xsd_any_uri() .single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
let community = get_community_id_from_activity(&activity)?; let community = get_community_id_from_activity(&activity)?;
debug!(
"Shared inbox received activity {:?} from {}",
&activity.id_unchecked(),
&sender
);
check_is_apub_id_valid(sender)?; check_is_apub_id_valid(sender)?;
check_is_apub_id_valid(&community)?; check_is_apub_id_valid(&community)?;

View File

@ -50,12 +50,17 @@ pub async fn user_inbox(
) -> Result<HttpResponse, LemmyError> { ) -> Result<HttpResponse, LemmyError> {
let activity = input.into_inner(); let activity = input.into_inner();
let username = path.into_inner(); let username = path.into_inner();
debug!("User {} received activity: {:?}", &username, &activity);
let actor_uri = activity let actor_uri = activity
.actor()? .actor()?
.as_single_xsd_any_uri() .as_single_xsd_any_uri()
.context(location_info!())?; .context(location_info!())?;
debug!(
"User {} inbox received activity {:?} from {}",
username,
&activity.id_unchecked(),
&actor_uri
);
check_is_apub_id_valid(actor_uri)?; check_is_apub_id_valid(actor_uri)?;