mirror of https://github.com/LemmyNet/lemmy.git
Merge branch 'main' into markdown-link-rule
commit
6257469e51
|
@ -48,8 +48,8 @@
|
||||||
## About The Project
|
## About The Project
|
||||||
|
|
||||||
| Desktop | Mobile |
|
| Desktop | Mobile |
|
||||||
| ---------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------- |
|
| --------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------- |
|
||||||
| ![desktop](https://raw.githubusercontent.com/LemmyNet/joinlemmy-site/main/src/assets/images/main_img.webp) | ![mobile](https://raw.githubusercontent.com/LemmyNet/joinlemmy-site/main/src/assets/images/mobile_pic.webp) |
|
| ![desktop](https://raw.githubusercontent.com/LemmyNet/joinlemmy-site/main/src/assets/images/main_screen_2.webp) | ![mobile](https://raw.githubusercontent.com/LemmyNet/joinlemmy-site/main/src/assets/images/mobile_pic.webp) |
|
||||||
|
|
||||||
[Lemmy](https://github.com/LemmyNet/lemmy) is similar to sites like [Reddit](https://reddit.com), [Lobste.rs](https://lobste.rs), or [Hacker News](https://news.ycombinator.com/): you subscribe to forums you're interested in, post links and discussions, then vote, and comment on them. Behind the scenes, it is very different; anyone can easily run a server, and all these servers are federated (think email), and connected to the same universe, called the [Fediverse](https://en.wikipedia.org/wiki/Fediverse).
|
[Lemmy](https://github.com/LemmyNet/lemmy) is similar to sites like [Reddit](https://reddit.com), [Lobste.rs](https://lobste.rs), or [Hacker News](https://news.ycombinator.com/): you subscribe to forums you're interested in, post links and discussions, then vote, and comment on them. Behind the scenes, it is very different; anyone can easily run a server, and all these servers are federated (think email), and connected to the same universe, called the [Fediverse](https://en.wikipedia.org/wiki/Fediverse).
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,9 @@ pub fn read_auth_token(req: &HttpRequest) -> Result<Option<String>, LemmyError>
|
||||||
// ensure that its marked as httponly and secure
|
// ensure that its marked as httponly and secure
|
||||||
let secure = cookie.secure().unwrap_or_default();
|
let secure = cookie.secure().unwrap_or_default();
|
||||||
let http_only = cookie.http_only().unwrap_or_default();
|
let http_only = cookie.http_only().unwrap_or_default();
|
||||||
if !secure || !http_only {
|
let is_debug_mode = cfg!(debug_assertions);
|
||||||
|
|
||||||
|
if !is_debug_mode && (!secure || !http_only) {
|
||||||
Err(LemmyError::from(LemmyErrorType::AuthCookieInsecure))
|
Err(LemmyError::from(LemmyErrorType::AuthCookieInsecure))
|
||||||
} else {
|
} else {
|
||||||
Ok(Some(cookie.value().to_string()))
|
Ok(Some(cookie.value().to_string()))
|
||||||
|
|
|
@ -127,6 +127,7 @@ pub async fn save_user_settings(
|
||||||
post_listing_mode: data.post_listing_mode,
|
post_listing_mode: data.post_listing_mode,
|
||||||
enable_keyboard_navigation: data.enable_keyboard_navigation,
|
enable_keyboard_navigation: data.enable_keyboard_navigation,
|
||||||
enable_animated_images: data.enable_animated_images,
|
enable_animated_images: data.enable_animated_images,
|
||||||
|
collapse_bot_comments: data.collapse_bot_comments,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,6 @@ doctest = false
|
||||||
full = [
|
full = [
|
||||||
"tracing",
|
"tracing",
|
||||||
"rosetta-i18n",
|
"rosetta-i18n",
|
||||||
"chrono",
|
|
||||||
"lemmy_utils",
|
"lemmy_utils",
|
||||||
"lemmy_db_views/full",
|
"lemmy_db_views/full",
|
||||||
"lemmy_db_views_actor/full",
|
"lemmy_db_views_actor/full",
|
||||||
|
@ -47,7 +46,7 @@ activitypub_federation = { workspace = true, optional = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_with = { workspace = true }
|
serde_with = { workspace = true }
|
||||||
url = { workspace = true }
|
url = { workspace = true }
|
||||||
chrono = { workspace = true, optional = true }
|
chrono = { workspace = true }
|
||||||
tracing = { workspace = true, optional = true }
|
tracing = { workspace = true, optional = true }
|
||||||
reqwest-middleware = { workspace = true, optional = true }
|
reqwest-middleware = { workspace = true, optional = true }
|
||||||
regex = { workspace = true }
|
regex = { workspace = true }
|
||||||
|
|
|
@ -25,6 +25,7 @@ pub extern crate lemmy_db_views_actor;
|
||||||
pub extern crate lemmy_db_views_moderator;
|
pub extern crate lemmy_db_views_moderator;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
#[cfg_attr(feature = "full", derive(ts_rs::TS))]
|
#[cfg_attr(feature = "full", derive(ts_rs::TS))]
|
||||||
|
@ -39,3 +40,8 @@ impl Default for SuccessResponse {
|
||||||
SuccessResponse { success: true }
|
SuccessResponse { success: true }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// how long to sleep based on how many retries have already happened
|
||||||
|
pub fn federate_retry_sleep_duration(retry_count: i32) -> Duration {
|
||||||
|
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
|
||||||
|
}
|
||||||
|
|
|
@ -129,6 +129,8 @@ pub struct SaveUserSettings {
|
||||||
pub enable_keyboard_navigation: Option<bool>,
|
pub enable_keyboard_navigation: Option<bool>,
|
||||||
/// Whether user avatars or inline images in the UI that are gifs should be allowed to play or should be paused
|
/// Whether user avatars or inline images in the UI that are gifs should be allowed to play or should be paused
|
||||||
pub enable_animated_images: Option<bool>,
|
pub enable_animated_images: Option<bool>,
|
||||||
|
/// Whether to auto-collapse bot comments.
|
||||||
|
pub collapse_bot_comments: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
|
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
|
||||||
|
|
|
@ -1,6 +1,13 @@
|
||||||
|
use crate::federate_retry_sleep_duration;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
newtypes::{CommentId, CommunityId, InstanceId, LanguageId, PersonId, PostId},
|
newtypes::{CommentId, CommunityId, InstanceId, LanguageId, PersonId, PostId},
|
||||||
source::{instance::Instance, language::Language, tagline::Tagline},
|
source::{
|
||||||
|
federation_queue_state::FederationQueueState,
|
||||||
|
instance::Instance,
|
||||||
|
language::Language,
|
||||||
|
tagline::Tagline,
|
||||||
|
},
|
||||||
ListingType,
|
ListingType,
|
||||||
ModlogActionType,
|
ModlogActionType,
|
||||||
RegistrationMode,
|
RegistrationMode,
|
||||||
|
@ -316,9 +323,41 @@ pub struct MyUserInfo {
|
||||||
#[cfg_attr(feature = "full", ts(export))]
|
#[cfg_attr(feature = "full", ts(export))]
|
||||||
/// A list of federated instances.
|
/// A list of federated instances.
|
||||||
pub struct FederatedInstances {
|
pub struct FederatedInstances {
|
||||||
pub linked: Vec<Instance>,
|
pub linked: Vec<InstanceWithFederationState>,
|
||||||
pub allowed: Vec<Instance>,
|
pub allowed: Vec<InstanceWithFederationState>,
|
||||||
pub blocked: Vec<Instance>,
|
pub blocked: Vec<InstanceWithFederationState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
#[cfg_attr(feature = "full", derive(TS))]
|
||||||
|
#[cfg_attr(feature = "full", ts(export))]
|
||||||
|
pub struct ReadableFederationState {
|
||||||
|
#[serde(flatten)]
|
||||||
|
internal_state: FederationQueueState,
|
||||||
|
/// timestamp of the next retry attempt (null if fail count is 0)
|
||||||
|
next_retry: Option<DateTime<Utc>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<FederationQueueState> for ReadableFederationState {
|
||||||
|
fn from(internal_state: FederationQueueState) -> Self {
|
||||||
|
ReadableFederationState {
|
||||||
|
next_retry: internal_state.last_retry.map(|r| {
|
||||||
|
r + chrono::Duration::from_std(federate_retry_sleep_duration(internal_state.fail_count))
|
||||||
|
.expect("sleep duration longer than 2**63 ms (262 million years)")
|
||||||
|
}),
|
||||||
|
internal_state,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
#[cfg_attr(feature = "full", derive(TS))]
|
||||||
|
#[cfg_attr(feature = "full", ts(export))]
|
||||||
|
pub struct InstanceWithFederationState {
|
||||||
|
#[serde(flatten)]
|
||||||
|
pub instance: Instance,
|
||||||
|
/// if federation to this instance is or was active, show state of outgoing federation to this instance
|
||||||
|
pub federation_state: Option<ReadableFederationState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[skip_serializing_none]
|
#[skip_serializing_none]
|
||||||
|
|
|
@ -2,7 +2,7 @@ use crate::{
|
||||||
context::LemmyContext,
|
context::LemmyContext,
|
||||||
request::purge_image_from_pictrs,
|
request::purge_image_from_pictrs,
|
||||||
sensitive::Sensitive,
|
sensitive::Sensitive,
|
||||||
site::FederatedInstances,
|
site::{FederatedInstances, InstanceWithFederationState},
|
||||||
};
|
};
|
||||||
use actix_web::cookie::{Cookie, SameSite};
|
use actix_web::cookie::{Cookie, SameSite};
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
@ -280,12 +280,27 @@ pub async fn build_federated_instances(
|
||||||
pool: &mut DbPool<'_>,
|
pool: &mut DbPool<'_>,
|
||||||
) -> Result<Option<FederatedInstances>, LemmyError> {
|
) -> Result<Option<FederatedInstances>, LemmyError> {
|
||||||
if local_site.federation_enabled {
|
if local_site.federation_enabled {
|
||||||
// TODO I hate that this requires 3 queries
|
let mut linked = Vec::new();
|
||||||
let (linked, allowed, blocked) = lemmy_db_schema::try_join_with_pool!(pool => (
|
let mut allowed = Vec::new();
|
||||||
Instance::linked,
|
let mut blocked = Vec::new();
|
||||||
Instance::allowlist,
|
|
||||||
Instance::blocklist
|
let all = Instance::read_all_with_fed_state(pool).await?;
|
||||||
))?;
|
for (instance, federation_state, is_blocked, is_allowed) in all {
|
||||||
|
let i = InstanceWithFederationState {
|
||||||
|
instance,
|
||||||
|
federation_state: federation_state.map(std::convert::Into::into),
|
||||||
|
};
|
||||||
|
if is_blocked {
|
||||||
|
// blocked instances will only have an entry here if they had been federated with in the past.
|
||||||
|
blocked.push(i);
|
||||||
|
} else if is_allowed {
|
||||||
|
allowed.push(i.clone());
|
||||||
|
linked.push(i);
|
||||||
|
} else {
|
||||||
|
// not explicitly allowed but implicitly linked
|
||||||
|
linked.push(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Some(FederatedInstances {
|
Ok(Some(FederatedInstances {
|
||||||
linked,
|
linked,
|
||||||
|
|
|
@ -504,29 +504,13 @@ mod tests {
|
||||||
site_registration_mode: RegistrationMode,
|
site_registration_mode: RegistrationMode,
|
||||||
) -> LocalSite {
|
) -> LocalSite {
|
||||||
LocalSite {
|
LocalSite {
|
||||||
id: Default::default(),
|
|
||||||
site_id: Default::default(),
|
|
||||||
site_setup,
|
site_setup,
|
||||||
enable_downvotes: false,
|
|
||||||
enable_nsfw: false,
|
|
||||||
community_creation_admin_only: false,
|
|
||||||
require_email_verification: false,
|
|
||||||
application_question: site_application_question,
|
application_question: site_application_question,
|
||||||
private_instance: site_is_private,
|
private_instance: site_is_private,
|
||||||
default_theme: String::new(),
|
|
||||||
default_post_listing_type: ListingType::All,
|
|
||||||
legal_information: None,
|
|
||||||
hide_modlog_mod_names: false,
|
|
||||||
application_email_admins: false,
|
|
||||||
slur_filter_regex: site_slur_filter_regex,
|
slur_filter_regex: site_slur_filter_regex,
|
||||||
actor_name_max_length: 0,
|
|
||||||
federation_enabled: site_is_federated,
|
federation_enabled: site_is_federated,
|
||||||
captcha_enabled: false,
|
|
||||||
captcha_difficulty: String::new(),
|
|
||||||
published: Default::default(),
|
|
||||||
updated: None,
|
|
||||||
registration_mode: site_registration_mode,
|
registration_mode: site_registration_mode,
|
||||||
reports_email_admins: false,
|
..Default::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -502,29 +502,12 @@ mod tests {
|
||||||
site_registration_mode: RegistrationMode,
|
site_registration_mode: RegistrationMode,
|
||||||
) -> LocalSite {
|
) -> LocalSite {
|
||||||
LocalSite {
|
LocalSite {
|
||||||
id: Default::default(),
|
|
||||||
site_id: Default::default(),
|
|
||||||
site_setup: true,
|
|
||||||
enable_downvotes: false,
|
|
||||||
enable_nsfw: false,
|
|
||||||
community_creation_admin_only: false,
|
|
||||||
require_email_verification: false,
|
|
||||||
application_question: site_application_question,
|
application_question: site_application_question,
|
||||||
private_instance: site_is_private,
|
private_instance: site_is_private,
|
||||||
default_theme: String::new(),
|
|
||||||
default_post_listing_type: ListingType::All,
|
|
||||||
legal_information: None,
|
|
||||||
hide_modlog_mod_names: false,
|
|
||||||
application_email_admins: false,
|
|
||||||
slur_filter_regex: site_slur_filter_regex,
|
slur_filter_regex: site_slur_filter_regex,
|
||||||
actor_name_max_length: 0,
|
|
||||||
federation_enabled: site_is_federated,
|
federation_enabled: site_is_federated,
|
||||||
captcha_enabled: false,
|
|
||||||
captcha_difficulty: String::new(),
|
|
||||||
published: Default::default(),
|
|
||||||
updated: None,
|
|
||||||
registration_mode: site_registration_mode,
|
registration_mode: site_registration_mode,
|
||||||
reports_email_admins: false,
|
..Default::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ pub(crate) async fn send_like_activity(
|
||||||
score: i16,
|
score: i16,
|
||||||
context: Data<LemmyContext>,
|
context: Data<LemmyContext>,
|
||||||
) -> Result<(), LemmyError> {
|
) -> Result<(), LemmyError> {
|
||||||
let object_id: ObjectId<PostOrComment> = object_id.try_into()?;
|
let object_id: ObjectId<PostOrComment> = object_id.into();
|
||||||
let actor: ApubPerson = actor.into();
|
let actor: ApubPerson = actor.into();
|
||||||
let community: ApubCommunity = community.into();
|
let community: ApubCommunity = community.into();
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
diesel::OptionalExtension,
|
diesel::OptionalExtension,
|
||||||
newtypes::DbUrl,
|
newtypes::{ActivityId, DbUrl},
|
||||||
source::activity::{ReceivedActivity, SentActivity, SentActivityForm},
|
source::activity::{ReceivedActivity, SentActivity, SentActivityForm},
|
||||||
utils::{get_conn, DbPool},
|
utils::{get_conn, DbPool},
|
||||||
};
|
};
|
||||||
|
@ -30,7 +30,7 @@ impl SentActivity {
|
||||||
.first::<Self>(conn)
|
.first::<Self>(conn)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
pub async fn read(pool: &mut DbPool<'_>, object_id: i64) -> Result<Self, Error> {
|
pub async fn read(pool: &mut DbPool<'_>, object_id: ActivityId) -> Result<Self, Error> {
|
||||||
use crate::schema::sent_activity::dsl::sent_activity;
|
use crate::schema::sent_activity::dsl::sent_activity;
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
sent_activity.find(object_id).first::<Self>(conn).await
|
sent_activity.find(object_id).first::<Self>(conn).await
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
use crate::{
|
||||||
|
newtypes::InstanceId,
|
||||||
|
source::federation_queue_state::FederationQueueState,
|
||||||
|
utils::{get_conn, DbPool},
|
||||||
|
};
|
||||||
|
use diesel::{prelude::*, result::Error};
|
||||||
|
use diesel_async::RunQueryDsl;
|
||||||
|
|
||||||
|
impl FederationQueueState {
|
||||||
|
/// load state or return a default empty value
|
||||||
|
pub async fn load(
|
||||||
|
pool: &mut DbPool<'_>,
|
||||||
|
instance_id_: InstanceId,
|
||||||
|
) -> Result<FederationQueueState, Error> {
|
||||||
|
use crate::schema::federation_queue_state::dsl::{federation_queue_state, instance_id};
|
||||||
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
Ok(
|
||||||
|
federation_queue_state
|
||||||
|
.filter(instance_id.eq(&instance_id_))
|
||||||
|
.select(FederationQueueState::as_select())
|
||||||
|
.get_result(conn)
|
||||||
|
.await
|
||||||
|
.optional()?
|
||||||
|
.unwrap_or(FederationQueueState {
|
||||||
|
instance_id: instance_id_,
|
||||||
|
fail_count: 0,
|
||||||
|
last_retry: None,
|
||||||
|
last_successful_id: None, // this value is set to the most current id for new instances
|
||||||
|
last_successful_published_time: None,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<(), Error> {
|
||||||
|
use crate::schema::federation_queue_state::dsl::{federation_queue_state, instance_id};
|
||||||
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
|
||||||
|
state
|
||||||
|
.insert_into(federation_queue_state)
|
||||||
|
.on_conflict(instance_id)
|
||||||
|
.do_update()
|
||||||
|
.set(state)
|
||||||
|
.execute(conn)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,8 +1,18 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
diesel::dsl::IntervalDsl,
|
diesel::dsl::IntervalDsl,
|
||||||
newtypes::InstanceId,
|
newtypes::InstanceId,
|
||||||
schema::{federation_allowlist, federation_blocklist, instance, local_site, site},
|
schema::{
|
||||||
source::instance::{Instance, InstanceForm},
|
federation_allowlist,
|
||||||
|
federation_blocklist,
|
||||||
|
federation_queue_state,
|
||||||
|
instance,
|
||||||
|
local_site,
|
||||||
|
site,
|
||||||
|
},
|
||||||
|
source::{
|
||||||
|
federation_queue_state::FederationQueueState,
|
||||||
|
instance::{Instance, InstanceForm},
|
||||||
|
},
|
||||||
utils::{functions::lower, get_conn, naive_now, now, DbPool},
|
utils::{functions::lower, get_conn, naive_now, now, DbPool},
|
||||||
};
|
};
|
||||||
use diesel::{
|
use diesel::{
|
||||||
|
@ -59,7 +69,7 @@ impl Instance {
|
||||||
pub async fn read_all(pool: &mut DbPool<'_>) -> Result<Vec<Instance>, Error> {
|
pub async fn read_all(pool: &mut DbPool<'_>) -> Result<Vec<Instance>, Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
instance::table
|
instance::table
|
||||||
.select(instance::all_columns)
|
.select(Self::as_select())
|
||||||
.get_results(conn)
|
.get_results(conn)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -73,7 +83,7 @@ impl Instance {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
instance::table
|
instance::table
|
||||||
.inner_join(federation_allowlist::table)
|
.inner_join(federation_allowlist::table)
|
||||||
.select(instance::all_columns)
|
.select(Self::as_select())
|
||||||
.get_results(conn)
|
.get_results(conn)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -82,14 +92,14 @@ impl Instance {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
instance::table
|
instance::table
|
||||||
.inner_join(federation_blocklist::table)
|
.inner_join(federation_blocklist::table)
|
||||||
.select(instance::all_columns)
|
.select(Self::as_select())
|
||||||
.get_results(conn)
|
.get_results(conn)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// returns a list of all instances, each with a flag of whether the instance is allowed or not and dead or not
|
/// returns a list of all instances, each with a flag of whether the instance is allowed or not and dead or not
|
||||||
/// ordered by id
|
/// ordered by id
|
||||||
pub async fn read_all_with_blocked_and_dead(
|
pub async fn read_federated_with_blocked_and_dead(
|
||||||
pool: &mut DbPool<'_>,
|
pool: &mut DbPool<'_>,
|
||||||
) -> Result<Vec<(Self, bool, bool)>, Error> {
|
) -> Result<Vec<(Self, bool, bool)>, Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
@ -125,16 +135,24 @@ impl Instance {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn linked(pool: &mut DbPool<'_>) -> Result<Vec<Self>, Error> {
|
/// returns (instance, blocked, allowed, fed queue state) tuples
|
||||||
|
pub async fn read_all_with_fed_state(
|
||||||
|
pool: &mut DbPool<'_>,
|
||||||
|
) -> Result<Vec<(Self, Option<FederationQueueState>, bool, bool)>, Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
instance::table
|
instance::table
|
||||||
// omit instance representing the local site
|
// omit instance representing the local site
|
||||||
.left_join(site::table.inner_join(local_site::table))
|
.left_join(site::table.inner_join(local_site::table))
|
||||||
.filter(local_site::id.is_null())
|
.filter(local_site::id.is_null())
|
||||||
// omit instances in the blocklist
|
|
||||||
.left_join(federation_blocklist::table)
|
.left_join(federation_blocklist::table)
|
||||||
.filter(federation_blocklist::id.is_null())
|
.left_join(federation_allowlist::table)
|
||||||
.select(instance::all_columns)
|
.left_join(federation_queue_state::table)
|
||||||
|
.select((
|
||||||
|
Self::as_select(),
|
||||||
|
Option::<FederationQueueState>::as_select(),
|
||||||
|
federation_blocklist::id.nullable().is_not_null(),
|
||||||
|
federation_allowlist::id.nullable().is_not_null(),
|
||||||
|
))
|
||||||
.get_results(conn)
|
.get_results(conn)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ pub mod custom_emoji;
|
||||||
pub mod email_verification;
|
pub mod email_verification;
|
||||||
pub mod federation_allowlist;
|
pub mod federation_allowlist;
|
||||||
pub mod federation_blocklist;
|
pub mod federation_blocklist;
|
||||||
|
pub mod federation_queue_state;
|
||||||
pub mod images;
|
pub mod images;
|
||||||
pub mod instance;
|
pub mod instance;
|
||||||
pub mod instance_block;
|
pub mod instance_block;
|
||||||
|
|
|
@ -114,7 +114,9 @@ pub enum ListingType {
|
||||||
ModeratorView,
|
ModeratorView,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(EnumString, Display, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
#[derive(
|
||||||
|
EnumString, Display, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default,
|
||||||
|
)]
|
||||||
#[cfg_attr(feature = "full", derive(DbEnum, TS))]
|
#[cfg_attr(feature = "full", derive(DbEnum, TS))]
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
feature = "full",
|
feature = "full",
|
||||||
|
@ -129,6 +131,7 @@ pub enum RegistrationMode {
|
||||||
/// Open, but pending approval of a registration application.
|
/// Open, but pending approval of a registration application.
|
||||||
RequireApplication,
|
RequireApplication,
|
||||||
/// Open to all.
|
/// Open to all.
|
||||||
|
#[default]
|
||||||
Open,
|
Open,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -149,6 +149,13 @@ pub struct LocalImageId(i32);
|
||||||
/// The instance id.
|
/// The instance id.
|
||||||
pub struct InstanceId(i32);
|
pub struct InstanceId(i32);
|
||||||
|
|
||||||
|
#[derive(
|
||||||
|
Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default, PartialOrd, Ord,
|
||||||
|
)]
|
||||||
|
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]
|
||||||
|
#[cfg_attr(feature = "full", ts(export))]
|
||||||
|
pub struct ActivityId(pub i64);
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)]
|
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)]
|
||||||
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]
|
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]
|
||||||
#[cfg_attr(feature = "full", ts(export))]
|
#[cfg_attr(feature = "full", ts(export))]
|
||||||
|
|
|
@ -307,9 +307,10 @@ diesel::table! {
|
||||||
federation_queue_state (id) {
|
federation_queue_state (id) {
|
||||||
id -> Int4,
|
id -> Int4,
|
||||||
instance_id -> Int4,
|
instance_id -> Int4,
|
||||||
last_successful_id -> Int8,
|
last_successful_id -> Nullable<Int8>,
|
||||||
fail_count -> Int4,
|
fail_count -> Int4,
|
||||||
last_retry -> Timestamptz,
|
last_retry -> Nullable<Timestamptz>,
|
||||||
|
last_successful_published_time -> Nullable<Timestamptz>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,6 +386,7 @@ diesel::table! {
|
||||||
updated -> Nullable<Timestamptz>,
|
updated -> Nullable<Timestamptz>,
|
||||||
registration_mode -> RegistrationModeEnum,
|
registration_mode -> RegistrationModeEnum,
|
||||||
reports_email_admins -> Bool,
|
reports_email_admins -> Bool,
|
||||||
|
federation_signed_fetch -> Bool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,6 +447,7 @@ diesel::table! {
|
||||||
totp_2fa_enabled -> Bool,
|
totp_2fa_enabled -> Bool,
|
||||||
enable_keyboard_navigation -> Bool,
|
enable_keyboard_navigation -> Bool,
|
||||||
enable_animated_images -> Bool,
|
enable_animated_images -> Bool,
|
||||||
|
collapse_bot_comments -> Bool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
newtypes::{CommunityId, DbUrl},
|
newtypes::{ActivityId, CommunityId, DbUrl},
|
||||||
schema::sent_activity,
|
schema::sent_activity,
|
||||||
};
|
};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
@ -54,7 +54,7 @@ impl ActivitySendTargets {
|
||||||
#[derive(PartialEq, Eq, Debug, Queryable)]
|
#[derive(PartialEq, Eq, Debug, Queryable)]
|
||||||
#[diesel(table_name = sent_activity)]
|
#[diesel(table_name = sent_activity)]
|
||||||
pub struct SentActivity {
|
pub struct SentActivity {
|
||||||
pub id: i64,
|
pub id: ActivityId,
|
||||||
pub ap_id: DbUrl,
|
pub ap_id: DbUrl,
|
||||||
pub data: Value,
|
pub data: Value,
|
||||||
pub sensitive: bool,
|
pub sensitive: bool,
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
use crate::newtypes::{ActivityId, InstanceId};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
#[cfg(feature = "full")]
|
||||||
|
use diesel::prelude::*;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
#[cfg(feature = "full")]
|
||||||
|
use ts_rs::TS;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
#[cfg_attr(
|
||||||
|
feature = "full",
|
||||||
|
derive(Queryable, Selectable, Insertable, AsChangeset)
|
||||||
|
)]
|
||||||
|
#[cfg_attr(feature = "full", derive(TS))]
|
||||||
|
#[cfg_attr(feature = "full", diesel(table_name = crate::schema::federation_queue_state))]
|
||||||
|
#[cfg_attr(feature = "full", diesel(check_for_backend(diesel::pg::Pg)))]
|
||||||
|
pub struct FederationQueueState {
|
||||||
|
pub instance_id: InstanceId,
|
||||||
|
/// the last successfully sent activity id
|
||||||
|
pub last_successful_id: Option<ActivityId>,
|
||||||
|
pub last_successful_published_time: Option<DateTime<Utc>>,
|
||||||
|
/// how many failed attempts have been made to send the next activity
|
||||||
|
pub fail_count: i32,
|
||||||
|
/// timestamp of the last retry attempt (when the last failing activity was resent)
|
||||||
|
pub last_retry: Option<DateTime<Utc>>,
|
||||||
|
}
|
|
@ -13,7 +13,7 @@ use ts_rs::TS;
|
||||||
use typed_builder::TypedBuilder;
|
use typed_builder::TypedBuilder;
|
||||||
|
|
||||||
#[skip_serializing_none]
|
#[skip_serializing_none]
|
||||||
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
#[cfg_attr(feature = "full", derive(Queryable, Identifiable, TS))]
|
#[cfg_attr(feature = "full", derive(Queryable, Identifiable, TS))]
|
||||||
#[cfg_attr(feature = "full", diesel(table_name = local_site))]
|
#[cfg_attr(feature = "full", diesel(table_name = local_site))]
|
||||||
#[cfg_attr(feature = "full", diesel(belongs_to(crate::source::site::Site)))]
|
#[cfg_attr(feature = "full", diesel(belongs_to(crate::source::site::Site)))]
|
||||||
|
@ -60,6 +60,9 @@ pub struct LocalSite {
|
||||||
pub registration_mode: RegistrationMode,
|
pub registration_mode: RegistrationMode,
|
||||||
/// Whether to email admins on new reports.
|
/// Whether to email admins on new reports.
|
||||||
pub reports_email_admins: bool,
|
pub reports_email_admins: bool,
|
||||||
|
/// Whether to sign outgoing Activitypub fetches with private key of local instance. Some
|
||||||
|
/// Fediverse instances and platforms require this.
|
||||||
|
pub federation_signed_fetch: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, TypedBuilder)]
|
#[derive(Clone, TypedBuilder)]
|
||||||
|
@ -88,6 +91,7 @@ pub struct LocalSiteInsertForm {
|
||||||
pub captcha_difficulty: Option<String>,
|
pub captcha_difficulty: Option<String>,
|
||||||
pub registration_mode: Option<RegistrationMode>,
|
pub registration_mode: Option<RegistrationMode>,
|
||||||
pub reports_email_admins: Option<bool>,
|
pub reports_email_admins: Option<bool>,
|
||||||
|
pub federation_signed_fetch: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
|
@ -114,4 +118,5 @@ pub struct LocalSiteUpdateForm {
|
||||||
pub registration_mode: Option<RegistrationMode>,
|
pub registration_mode: Option<RegistrationMode>,
|
||||||
pub reports_email_admins: Option<bool>,
|
pub reports_email_admins: Option<bool>,
|
||||||
pub updated: Option<Option<DateTime<Utc>>>,
|
pub updated: Option<Option<DateTime<Utc>>>,
|
||||||
|
pub federation_signed_fetch: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,8 @@ pub struct LocalUser {
|
||||||
pub enable_keyboard_navigation: bool,
|
pub enable_keyboard_navigation: bool,
|
||||||
/// Whether user avatars and inline images in the UI that are gifs should be allowed to play or should be paused
|
/// Whether user avatars and inline images in the UI that are gifs should be allowed to play or should be paused
|
||||||
pub enable_animated_images: bool,
|
pub enable_animated_images: bool,
|
||||||
|
/// Whether to auto-collapse bot comments.
|
||||||
|
pub collapse_bot_comments: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, TypedBuilder)]
|
#[derive(Clone, TypedBuilder)]
|
||||||
|
@ -94,6 +96,7 @@ pub struct LocalUserInsertForm {
|
||||||
pub totp_2fa_enabled: Option<bool>,
|
pub totp_2fa_enabled: Option<bool>,
|
||||||
pub enable_keyboard_navigation: Option<bool>,
|
pub enable_keyboard_navigation: Option<bool>,
|
||||||
pub enable_animated_images: Option<bool>,
|
pub enable_animated_images: Option<bool>,
|
||||||
|
pub collapse_bot_comments: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
|
@ -124,4 +127,5 @@ pub struct LocalUserUpdateForm {
|
||||||
pub totp_2fa_enabled: Option<bool>,
|
pub totp_2fa_enabled: Option<bool>,
|
||||||
pub enable_keyboard_navigation: Option<bool>,
|
pub enable_keyboard_navigation: Option<bool>,
|
||||||
pub enable_animated_images: Option<bool>,
|
pub enable_animated_images: Option<bool>,
|
||||||
|
pub collapse_bot_comments: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ pub mod custom_emoji_keyword;
|
||||||
pub mod email_verification;
|
pub mod email_verification;
|
||||||
pub mod federation_allowlist;
|
pub mod federation_allowlist;
|
||||||
pub mod federation_blocklist;
|
pub mod federation_blocklist;
|
||||||
|
pub mod federation_queue_state;
|
||||||
pub mod images;
|
pub mod images;
|
||||||
pub mod instance;
|
pub mod instance;
|
||||||
pub mod instance_block;
|
pub mod instance_block;
|
||||||
|
|
|
@ -112,7 +112,7 @@ fn queries<'a>() -> Queries<
|
||||||
let (limit, offset) = limit_and_offset(options.page, options.limit)?;
|
let (limit, offset) = limit_and_offset(options.page, options.limit)?;
|
||||||
|
|
||||||
query = query
|
query = query
|
||||||
.order_by(comment_report::published.desc())
|
.order_by(comment_report::published.asc())
|
||||||
.limit(limit)
|
.limit(limit)
|
||||||
.offset(offset);
|
.offset(offset);
|
||||||
|
|
||||||
|
@ -476,8 +476,8 @@ mod tests {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
reports,
|
reports,
|
||||||
[
|
[
|
||||||
|
expected_sara_report_view.clone(),
|
||||||
expected_jessica_report_view.clone(),
|
expected_jessica_report_view.clone(),
|
||||||
expected_sara_report_view.clone()
|
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ fn queries<'a>() -> Queries<
|
||||||
let (limit, offset) = limit_and_offset(options.page, options.limit)?;
|
let (limit, offset) = limit_and_offset(options.page, options.limit)?;
|
||||||
|
|
||||||
query = query
|
query = query
|
||||||
.order_by(post_report::published.desc())
|
.order_by(post_report::published.asc())
|
||||||
.limit(limit)
|
.limit(limit)
|
||||||
.offset(offset);
|
.offset(offset);
|
||||||
|
|
||||||
|
@ -337,8 +337,8 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
assert_eq!(reports[0].creator.id, inserted_jessica.id);
|
assert_eq!(reports[0].creator.id, inserted_sara.id);
|
||||||
assert_eq!(reports[1].creator.id, inserted_sara.id);
|
assert_eq!(reports[1].creator.id, inserted_jessica.id);
|
||||||
|
|
||||||
// Make sure the counts are correct
|
// Make sure the counts are correct
|
||||||
let report_count = PostReportView::get_report_count(pool, inserted_timmy.id, false, None)
|
let report_count = PostReportView::get_report_count(pool, inserted_timmy.id, false, None)
|
||||||
|
|
|
@ -56,7 +56,7 @@ fn queries<'a>() -> Queries<
|
||||||
let (limit, offset) = limit_and_offset(options.page, options.limit)?;
|
let (limit, offset) = limit_and_offset(options.page, options.limit)?;
|
||||||
|
|
||||||
query
|
query
|
||||||
.order_by(private_message::published.desc())
|
.order_by(private_message::published.asc())
|
||||||
.limit(limit)
|
.limit(limit)
|
||||||
.offset(offset)
|
.offset(offset)
|
||||||
.load::<PrivateMessageReportView>(&mut conn)
|
.load::<PrivateMessageReportView>(&mut conn)
|
||||||
|
|
|
@ -268,6 +268,7 @@ mod tests {
|
||||||
totp_2fa_enabled: inserted_sara_local_user.totp_2fa_enabled,
|
totp_2fa_enabled: inserted_sara_local_user.totp_2fa_enabled,
|
||||||
enable_keyboard_navigation: inserted_sara_local_user.enable_keyboard_navigation,
|
enable_keyboard_navigation: inserted_sara_local_user.enable_keyboard_navigation,
|
||||||
enable_animated_images: inserted_sara_local_user.enable_animated_images,
|
enable_animated_images: inserted_sara_local_user.enable_animated_images,
|
||||||
|
collapse_bot_comments: inserted_sara_local_user.collapse_bot_comments,
|
||||||
},
|
},
|
||||||
creator: Person {
|
creator: Person {
|
||||||
id: inserted_sara_person.id,
|
id: inserted_sara_person.id,
|
||||||
|
|
|
@ -1,63 +0,0 @@
|
||||||
use crate::util::ActivityId;
|
|
||||||
use anyhow::Result;
|
|
||||||
use chrono::{DateTime, TimeZone, Utc};
|
|
||||||
use diesel::prelude::*;
|
|
||||||
use diesel_async::RunQueryDsl;
|
|
||||||
use lemmy_db_schema::{
|
|
||||||
newtypes::InstanceId,
|
|
||||||
utils::{get_conn, DbPool},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Queryable, Selectable, Insertable, AsChangeset, Clone)]
|
|
||||||
#[diesel(table_name = lemmy_db_schema::schema::federation_queue_state)]
|
|
||||||
#[diesel(check_for_backend(diesel::pg::Pg))]
|
|
||||||
pub struct FederationQueueState {
|
|
||||||
pub instance_id: InstanceId,
|
|
||||||
pub last_successful_id: ActivityId, // todo: i64
|
|
||||||
pub fail_count: i32,
|
|
||||||
pub last_retry: DateTime<Utc>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FederationQueueState {
|
|
||||||
/// load state or return a default empty value
|
|
||||||
pub async fn load(
|
|
||||||
pool: &mut DbPool<'_>,
|
|
||||||
instance_id_: InstanceId,
|
|
||||||
) -> Result<FederationQueueState> {
|
|
||||||
use lemmy_db_schema::schema::federation_queue_state::dsl::{
|
|
||||||
federation_queue_state,
|
|
||||||
instance_id,
|
|
||||||
};
|
|
||||||
let conn = &mut get_conn(pool).await?;
|
|
||||||
Ok(
|
|
||||||
federation_queue_state
|
|
||||||
.filter(instance_id.eq(&instance_id_))
|
|
||||||
.select(FederationQueueState::as_select())
|
|
||||||
.get_result(conn)
|
|
||||||
.await
|
|
||||||
.optional()?
|
|
||||||
.unwrap_or(FederationQueueState {
|
|
||||||
instance_id: instance_id_,
|
|
||||||
fail_count: 0,
|
|
||||||
last_retry: Utc.timestamp_nanos(0),
|
|
||||||
last_successful_id: -1, // this value is set to the most current id for new instances
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<()> {
|
|
||||||
use lemmy_db_schema::schema::federation_queue_state::dsl::{
|
|
||||||
federation_queue_state,
|
|
||||||
instance_id,
|
|
||||||
};
|
|
||||||
let conn = &mut get_conn(pool).await?;
|
|
||||||
|
|
||||||
state
|
|
||||||
.insert_into(federation_queue_state)
|
|
||||||
.on_conflict(instance_id)
|
|
||||||
.do_update()
|
|
||||||
.set(state)
|
|
||||||
.execute(conn)
|
|
||||||
.await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,14 +1,10 @@
|
||||||
use crate::{
|
use crate::{util::CancellableTask, worker::InstanceWorker};
|
||||||
util::{retry_sleep_duration, CancellableTask},
|
|
||||||
worker::InstanceWorker,
|
|
||||||
};
|
|
||||||
use activitypub_federation::config::FederationConfig;
|
use activitypub_federation::config::FederationConfig;
|
||||||
use chrono::{Local, Timelike};
|
use chrono::{Local, Timelike};
|
||||||
use federation_queue_state::FederationQueueState;
|
use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
|
||||||
use lemmy_api_common::context::LemmyContext;
|
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
newtypes::InstanceId,
|
newtypes::InstanceId,
|
||||||
source::instance::Instance,
|
source::{federation_queue_state::FederationQueueState, instance::Instance},
|
||||||
utils::{ActualDbPool, DbPool},
|
utils::{ActualDbPool, DbPool},
|
||||||
};
|
};
|
||||||
use std::{collections::HashMap, time::Duration};
|
use std::{collections::HashMap, time::Duration};
|
||||||
|
@ -18,7 +14,6 @@ use tokio::{
|
||||||
};
|
};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
mod federation_queue_state;
|
|
||||||
mod util;
|
mod util;
|
||||||
mod worker;
|
mod worker;
|
||||||
|
|
||||||
|
@ -52,7 +47,9 @@ async fn start_stop_federation_workers(
|
||||||
let mut total_count = 0;
|
let mut total_count = 0;
|
||||||
let mut dead_count = 0;
|
let mut dead_count = 0;
|
||||||
let mut disallowed_count = 0;
|
let mut disallowed_count = 0;
|
||||||
for (instance, allowed, is_dead) in Instance::read_all_with_blocked_and_dead(pool2).await? {
|
for (instance, allowed, is_dead) in
|
||||||
|
Instance::read_federated_with_blocked_and_dead(pool2).await?
|
||||||
|
{
|
||||||
if instance.domain == local_domain {
|
if instance.domain == local_domain {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -188,14 +185,14 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap<String, FederationQu
|
||||||
// todo: more stats (act/sec, avg http req duration)
|
// todo: more stats (act/sec, avg http req duration)
|
||||||
let mut ok_count = 0;
|
let mut ok_count = 0;
|
||||||
for (domain, stat) in stats {
|
for (domain, stat) in stats {
|
||||||
let behind = last_id - stat.last_successful_id;
|
let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0);
|
||||||
if stat.fail_count > 0 {
|
if stat.fail_count > 0 {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}",
|
"{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}",
|
||||||
domain,
|
domain,
|
||||||
behind,
|
behind,
|
||||||
stat.fail_count,
|
stat.fail_count,
|
||||||
retry_sleep_duration(stat.fail_count)
|
federate_retry_sleep_duration(stat.fail_count)
|
||||||
);
|
);
|
||||||
} else if behind > 0 {
|
} else if behind > 0 {
|
||||||
tracing::info!("{}: Ok. {} behind", domain, behind);
|
tracing::info!("{}: Ok. {} behind", domain, behind);
|
||||||
|
|
|
@ -6,6 +6,7 @@ use lemmy_apub::{
|
||||||
fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity},
|
fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity},
|
||||||
};
|
};
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
|
newtypes::ActivityId,
|
||||||
source::{
|
source::{
|
||||||
activity::{ActorType, SentActivity},
|
activity::{ActorType, SentActivity},
|
||||||
community::Community,
|
community::Community,
|
||||||
|
@ -141,9 +142,6 @@ pub(crate) async fn get_actor_cached(
|
||||||
.map_err(|e| anyhow::anyhow!("err getting actor {actor_type:?} {actor_apub_id}: {e:?}"))
|
.map_err(|e| anyhow::anyhow!("err getting actor {actor_type:?} {actor_apub_id}: {e:?}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// this should maybe be a newtype like all the other PersonId CommunityId etc.
|
|
||||||
pub(crate) type ActivityId = i64;
|
|
||||||
|
|
||||||
type CachedActivityInfo = Option<Arc<(SentActivity, SharedInboxActivities)>>;
|
type CachedActivityInfo = Option<Arc<(SentActivity, SharedInboxActivities)>>;
|
||||||
/// activities are immutable so cache does not need to have TTL
|
/// activities are immutable so cache does not need to have TTL
|
||||||
/// May return None if the corresponding id does not exist or is a received activity.
|
/// May return None if the corresponding id does not exist or is a received activity.
|
||||||
|
@ -192,14 +190,9 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<Acti
|
||||||
use lemmy_db_schema::schema::sent_activity::dsl::{id, sent_activity};
|
use lemmy_db_schema::schema::sent_activity::dsl::{id, sent_activity};
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
let seq: Option<ActivityId> = sent_activity.select(max(id)).get_result(conn).await?;
|
let seq: Option<ActivityId> = sent_activity.select(max(id)).get_result(conn).await?;
|
||||||
let latest_id = seq.unwrap_or(0);
|
let latest_id = seq.unwrap_or(ActivityId(0));
|
||||||
anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId)
|
anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow::anyhow!("err getting id: {e:?}"))
|
.map_err(|e| anyhow::anyhow!("err getting id: {e:?}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// how long to sleep based on how many retries have already happened
|
|
||||||
pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration {
|
|
||||||
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,22 +1,23 @@
|
||||||
use crate::{
|
use crate::util::{
|
||||||
federation_queue_state::FederationQueueState,
|
|
||||||
util::{
|
|
||||||
get_activity_cached,
|
get_activity_cached,
|
||||||
get_actor_cached,
|
get_actor_cached,
|
||||||
get_latest_activity_id,
|
get_latest_activity_id,
|
||||||
retry_sleep_duration,
|
|
||||||
LEMMY_TEST_FAST_FEDERATION,
|
LEMMY_TEST_FAST_FEDERATION,
|
||||||
WORK_FINISHED_RECHECK_DELAY,
|
WORK_FINISHED_RECHECK_DELAY,
|
||||||
},
|
|
||||||
};
|
};
|
||||||
use activitypub_federation::{activity_sending::SendActivityTask, config::Data};
|
use activitypub_federation::{activity_sending::SendActivityTask, config::Data};
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use chrono::{DateTime, TimeZone, Utc};
|
use chrono::{DateTime, TimeZone, Utc};
|
||||||
use lemmy_api_common::context::LemmyContext;
|
use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
|
||||||
use lemmy_apub::activity_lists::SharedInboxActivities;
|
use lemmy_apub::activity_lists::SharedInboxActivities;
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
newtypes::{CommunityId, InstanceId},
|
newtypes::{ActivityId, CommunityId, InstanceId},
|
||||||
source::{activity::SentActivity, instance::Instance, site::Site},
|
source::{
|
||||||
|
activity::SentActivity,
|
||||||
|
federation_queue_state::FederationQueueState,
|
||||||
|
instance::Instance,
|
||||||
|
site::Site,
|
||||||
|
},
|
||||||
utils::DbPool,
|
utils::DbPool,
|
||||||
};
|
};
|
||||||
use lemmy_db_views_actor::structs::CommunityFollowerView;
|
use lemmy_db_views_actor::structs::CommunityFollowerView;
|
||||||
|
@ -122,8 +123,12 @@ impl InstanceWorker {
|
||||||
async fn initial_fail_sleep(&mut self) -> Result<()> {
|
async fn initial_fail_sleep(&mut self) -> Result<()> {
|
||||||
// before starting queue, sleep remaining duration if last request failed
|
// before starting queue, sleep remaining duration if last request failed
|
||||||
if self.state.fail_count > 0 {
|
if self.state.fail_count > 0 {
|
||||||
let elapsed = (Utc::now() - self.state.last_retry).to_std()?;
|
let last_retry = self
|
||||||
let required = retry_sleep_duration(self.state.fail_count);
|
.state
|
||||||
|
.last_retry
|
||||||
|
.context("impossible: if fail count set last retry also set")?;
|
||||||
|
let elapsed = (Utc::now() - last_retry).to_std()?;
|
||||||
|
let required = federate_retry_sleep_duration(self.state.fail_count);
|
||||||
if elapsed >= required {
|
if elapsed >= required {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -138,14 +143,16 @@ impl InstanceWorker {
|
||||||
/// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities
|
/// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities
|
||||||
async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> {
|
async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> {
|
||||||
let latest_id = get_latest_activity_id(pool).await?;
|
let latest_id = get_latest_activity_id(pool).await?;
|
||||||
if self.state.last_successful_id == -1 {
|
let mut id = if let Some(id) = self.state.last_successful_id {
|
||||||
|
id
|
||||||
|
} else {
|
||||||
// this is the initial creation (instance first seen) of the federation queue for this instance
|
// this is the initial creation (instance first seen) of the federation queue for this instance
|
||||||
// skip all past activities:
|
// skip all past activities:
|
||||||
self.state.last_successful_id = latest_id;
|
self.state.last_successful_id = Some(latest_id);
|
||||||
// save here to ensure it's not read as 0 again later if no activities have happened
|
// save here to ensure it's not read as 0 again later if no activities have happened
|
||||||
self.save_and_send_state(pool).await?;
|
self.save_and_send_state(pool).await?;
|
||||||
}
|
latest_id
|
||||||
let mut id = self.state.last_successful_id;
|
};
|
||||||
if id == latest_id {
|
if id == latest_id {
|
||||||
// no more work to be done, wait before rechecking
|
// no more work to be done, wait before rechecking
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
@ -159,13 +166,13 @@ impl InstanceWorker {
|
||||||
&& processed_activities < CHECK_SAVE_STATE_EVERY_IT
|
&& processed_activities < CHECK_SAVE_STATE_EVERY_IT
|
||||||
&& !self.stop.is_cancelled()
|
&& !self.stop.is_cancelled()
|
||||||
{
|
{
|
||||||
id += 1;
|
id = ActivityId(id.0 + 1);
|
||||||
processed_activities += 1;
|
processed_activities += 1;
|
||||||
let Some(ele) = get_activity_cached(pool, id)
|
let Some(ele) = get_activity_cached(pool, id)
|
||||||
.await
|
.await
|
||||||
.context("failed reading activity from db")?
|
.context("failed reading activity from db")?
|
||||||
else {
|
else {
|
||||||
self.state.last_successful_id = id;
|
self.state.last_successful_id = Some(id);
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await {
|
if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await {
|
||||||
|
@ -179,7 +186,8 @@ impl InstanceWorker {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
// send success!
|
// send success!
|
||||||
self.state.last_successful_id = id;
|
self.state.last_successful_id = Some(id);
|
||||||
|
self.state.last_successful_published_time = Some(ele.0.published);
|
||||||
self.state.fail_count = 0;
|
self.state.fail_count = 0;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -198,7 +206,8 @@ impl InstanceWorker {
|
||||||
.await
|
.await
|
||||||
.context("failed figuring out inbox urls")?;
|
.context("failed figuring out inbox urls")?;
|
||||||
if inbox_urls.is_empty() {
|
if inbox_urls.is_empty() {
|
||||||
self.state.last_successful_id = activity.id;
|
self.state.last_successful_id = Some(activity.id);
|
||||||
|
self.state.last_successful_published_time = Some(activity.published);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
let Some(actor_apub_id) = &activity.actor_apub_id else {
|
let Some(actor_apub_id) = &activity.actor_apub_id else {
|
||||||
|
@ -217,10 +226,10 @@ impl InstanceWorker {
|
||||||
tracing::info!("sending out {}", task);
|
tracing::info!("sending out {}", task);
|
||||||
while let Err(e) = task.sign_and_send(&self.context).await {
|
while let Err(e) = task.sign_and_send(&self.context).await {
|
||||||
self.state.fail_count += 1;
|
self.state.fail_count += 1;
|
||||||
self.state.last_retry = Utc::now();
|
self.state.last_retry = Some(Utc::now());
|
||||||
let retry_delay: Duration = retry_sleep_duration(self.state.fail_count);
|
let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count);
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"{}: retrying {} attempt {} with delay {retry_delay:.2?}. ({e})",
|
"{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
|
||||||
self.instance.domain,
|
self.instance.domain,
|
||||||
activity.id,
|
activity.id,
|
||||||
self.state.fail_count
|
self.state.fail_count
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
ALTER TABLE local_user
|
||||||
|
DROP COLUMN collapse_bot_comments;
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
ALTER TABLE local_user
|
||||||
|
ADD COLUMN collapse_bot_comments boolean DEFAULT FALSE NOT NULL;
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
ALTER TABLE federation_queue_state
|
||||||
|
DROP COLUMN last_successful_published_time,
|
||||||
|
ALTER COLUMN last_successful_id SET NOT NULL,
|
||||||
|
ALTER COLUMN last_retry SET NOT NULL;
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
ALTER TABLE federation_queue_state
|
||||||
|
ADD COLUMN last_successful_published_time timestamptz NULL,
|
||||||
|
ALTER COLUMN last_successful_id DROP NOT NULL,
|
||||||
|
ALTER COLUMN last_retry DROP NOT NULL;
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
ALTER TABLE local_site
|
||||||
|
DROP COLUMN federation_signed_fetch;
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
ALTER TABLE local_site
|
||||||
|
ADD COLUMN federation_signed_fetch boolean NOT NULL DEFAULT FALSE;
|
||||||
|
|
13
src/lib.rs
13
src/lib.rs
|
@ -36,6 +36,7 @@ use lemmy_api_common::{
|
||||||
};
|
};
|
||||||
use lemmy_apub::{
|
use lemmy_apub::{
|
||||||
activities::{handle_outgoing_activities, match_outgoing_activities},
|
activities::{handle_outgoing_activities, match_outgoing_activities},
|
||||||
|
objects::instance::ApubSite,
|
||||||
VerifyUrlData,
|
VerifyUrlData,
|
||||||
FEDERATION_HTTP_FETCH_LIMIT,
|
FEDERATION_HTTP_FETCH_LIMIT,
|
||||||
};
|
};
|
||||||
|
@ -164,16 +165,20 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> {
|
||||||
serve_prometheus(prometheus, context.clone())?;
|
serve_prometheus(prometheus, context.clone())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let federation_config = FederationConfig::builder()
|
let mut federation_config = FederationConfig::builder();
|
||||||
|
federation_config
|
||||||
.domain(SETTINGS.hostname.clone())
|
.domain(SETTINGS.hostname.clone())
|
||||||
.app_data(context.clone())
|
.app_data(context.clone())
|
||||||
.client(client.clone())
|
.client(client.clone())
|
||||||
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
|
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
|
||||||
.debug(cfg!(debug_assertions))
|
.debug(cfg!(debug_assertions))
|
||||||
.http_signature_compat(true)
|
.http_signature_compat(true)
|
||||||
.url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone())))
|
.url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone())));
|
||||||
.build()
|
if local_site.federation_signed_fetch {
|
||||||
.await?;
|
let site: ApubSite = site_view.site.into();
|
||||||
|
federation_config.signed_fetch_actor(&site);
|
||||||
|
}
|
||||||
|
let federation_config = federation_config.build().await?;
|
||||||
|
|
||||||
MATCH_OUTGOING_ACTIVITIES
|
MATCH_OUTGOING_ACTIVITIES
|
||||||
.set(Box::new(move |d, c| {
|
.set(Box::new(move |d, c| {
|
||||||
|
|
|
@ -295,14 +295,17 @@ async fn clear_old_activities(pool: &mut DbPool<'_>) {
|
||||||
|
|
||||||
match conn {
|
match conn {
|
||||||
Ok(mut conn) => {
|
Ok(mut conn) => {
|
||||||
diesel::delete(sent_activity::table.filter(sent_activity::published.lt(now() - 3.months())))
|
diesel::delete(
|
||||||
|
sent_activity::table.filter(sent_activity::published.lt(now() - IntervalDsl::days(7))),
|
||||||
|
)
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| error!("Failed to clear old sent activities: {e}"))
|
.map_err(|e| error!("Failed to clear old sent activities: {e}"))
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
diesel::delete(
|
diesel::delete(
|
||||||
received_activity::table.filter(received_activity::published.lt(now() - 3.months())),
|
received_activity::table
|
||||||
|
.filter(received_activity::published.lt(now() - IntervalDsl::days(7))),
|
||||||
)
|
)
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await
|
.await
|
||||||
|
|
Loading…
Reference in New Issue