Compare commits

...

9 Commits

Author SHA1 Message Date
Nutomic ee33eedc63
Merge 23e51f58ae into c4fc3a8ede 2024-05-09 09:50:09 -04:00
dullbananas c4fc3a8ede
Optimize stuff in attempt to fix high amount of locks, and fix comment_aggregates.child_count (#4696)
* separate triggers

* auto_explain.log_triggers=on

* Revert "auto_explain.log_triggers=on"

This reverts commit 078b2dbb9b.

* Revert "separate triggers"

This reverts commit 95600da4af.

* bring back migration

* re-order statements

* add comment about statement ordering

* no redundant updates

* optimize post_aggregates update in comment trigger

* set comment path in trigger

* update comment_aggregates.child_count using trigger

* move `LEFT JOIN post` to inner query

* clean up newest_comment_time_necro

* add down.sql
2024-05-09 08:18:55 -04:00
Felix Ableitner 23e51f58ae fmt 2024-05-08 12:46:15 +02:00
Felix Ableitner fb1fc7cb3a remove test 2024-05-08 12:45:38 +02:00
Felix Ableitner d4cc99f16b cleanup 2024-05-08 12:42:24 +02:00
Felix Ableitner 992ff352b7 add comment 2024-05-08 12:41:28 +02:00
Felix Ableitner 7902138df2 extract helper fn 2024-05-08 12:40:14 +02:00
Felix Ableitner eab6dbbe06 Fetch blocked objects if not known locally (fixes #4669) 2024-05-08 12:10:44 +02:00
Felix Ableitner 6cefdaee49 Allow importing partial backup (fixes #4672) 2024-05-06 11:20:27 +02:00
6 changed files with 338 additions and 269 deletions

View File

@ -4,9 +4,10 @@ use crate::objects::{
person::ApubPerson, person::ApubPerson,
post::ApubPost, post::ApubPost,
}; };
use activitypub_federation::{config::Data, fetch::object_id::ObjectId}; use activitypub_federation::{config::Data, fetch::object_id::ObjectId, traits::Object};
use actix_web::web::Json; use actix_web::web::Json;
use futures::{future::try_join_all, StreamExt}; use futures::{future::try_join_all, StreamExt};
use itertools::Itertools;
use lemmy_api_common::{context::LemmyContext, SuccessResponse}; use lemmy_api_common::{context::LemmyContext, SuccessResponse};
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::DbUrl, newtypes::DbUrl,
@ -30,8 +31,11 @@ use lemmy_utils::{
spawn_try_task, spawn_try_task,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::future::Future;
use tracing::info; use tracing::info;
const PARALLELISM: usize = 10;
/// Backup of user data. This struct should never be changed so that the data can be used as a /// Backup of user data. This struct should never be changed so that the data can be used as a
/// long-term backup in case the instance goes down unexpectedly. All fields are optional to allow /// long-term backup in case the instance goes down unexpectedly. All fields are optional to allow
/// importing partial backups. /// importing partial backups.
@ -40,7 +44,7 @@ use tracing::info;
/// ///
/// Be careful with any changes to this struct, to avoid breaking changes which could prevent /// Be careful with any changes to this struct, to avoid breaking changes which could prevent
/// importing older backups. /// importing older backups.
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct UserSettingsBackup { pub struct UserSettingsBackup {
pub display_name: Option<String>, pub display_name: Option<String>,
pub bio: Option<String>, pub bio: Option<String>,
@ -133,7 +137,8 @@ pub async fn import_settings(
local_user_view.local_user.id, local_user_view.local_user.id,
&local_user_form, &local_user_form,
) )
.await?; .await
.ok();
// Update the vote display mode settings // Update the vote display mode settings
let vote_display_mode_form = LocalUserVoteDisplayModeUpdateForm { let vote_display_mode_form = LocalUserVoteDisplayModeUpdateForm {
@ -167,141 +172,91 @@ pub async fn import_settings(
} }
spawn_try_task(async move { spawn_try_task(async move {
const PARALLELISM: usize = 10;
let person_id = local_user_view.person.id; let person_id = local_user_view.person.id;
// These tasks fetch objects from remote instances which might be down.
// TODO: Would be nice if we could send a list of failed items with api response, but then
// the request would likely timeout.
let mut failed_items = vec![];
info!( info!(
"Starting settings backup for {}", "Starting settings import for {}",
local_user_view.person.name local_user_view.person.name
); );
futures::stream::iter( let failed_followed_communities = fetch_and_import(
data data.followed_communities.clone(),
.followed_communities &context,
.clone() |(followed, context)| async move {
.into_iter() let community = followed.dereference(&context).await?;
// reset_request_count works like clone, and is necessary to avoid running into request limit let form = CommunityFollowerForm {
.map(|f| (f, context.reset_request_count())) person_id,
.map(|(followed, context)| async move { community_id: community.id,
// need to reset outgoing request count to avoid running into limit pending: true,
let community = followed.dereference(&context).await?; };
let form = CommunityFollowerForm { CommunityFollower::follow(&mut context.pool(), &form).await?;
person_id, LemmyResult::Ok(())
community_id: community.id, },
pending: true,
};
CommunityFollower::follow(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}),
) )
.buffer_unordered(PARALLELISM)
.collect::<Vec<_>>()
.await
.into_iter()
.enumerate()
.for_each(|(i, r)| {
if let Err(e) = r {
failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone()));
info!("Failed to import followed community: {e}");
}
});
futures::stream::iter(
data
.saved_posts
.clone()
.into_iter()
.map(|s| (s, context.reset_request_count()))
.map(|(saved, context)| async move {
let post = saved.dereference(&context).await?;
let form = PostSavedForm {
person_id,
post_id: post.id,
};
PostSaved::save(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}),
)
.buffer_unordered(PARALLELISM)
.collect::<Vec<_>>()
.await
.into_iter()
.enumerate()
.for_each(|(i, r)| {
if let Err(e) = r {
failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone()));
info!("Failed to import saved post community: {e}");
}
});
futures::stream::iter(
data
.saved_comments
.clone()
.into_iter()
.map(|s| (s, context.reset_request_count()))
.map(|(saved, context)| async move {
let comment = saved.dereference(&context).await?;
let form = CommentSavedForm {
person_id,
comment_id: comment.id,
};
CommentSaved::save(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}),
)
.buffer_unordered(PARALLELISM)
.collect::<Vec<_>>()
.await
.into_iter()
.enumerate()
.for_each(|(i, r)| {
if let Err(e) = r {
failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone()));
info!("Failed to import saved comment community: {e}");
}
});
let failed_items: Vec<_> = failed_items.into_iter().flatten().collect();
info!(
"Finished settings backup for {}, failed items: {:#?}",
local_user_view.person.name, failed_items
);
// These tasks don't connect to any remote instances but only insert directly in the database.
// That means the only error condition are db connection failures, so no extra error handling is
// needed.
try_join_all(data.blocked_communities.iter().map(|blocked| async {
// dont fetch unknown blocked objects from home server
let community = blocked.dereference_local(&context).await?;
let form = CommunityBlockForm {
person_id,
community_id: community.id,
};
CommunityBlock::block(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}))
.await?; .await?;
try_join_all(data.blocked_users.iter().map(|blocked| async { let failed_saved_posts = fetch_and_import(
// dont fetch unknown blocked objects from home server data.saved_posts.clone(),
let target = blocked.dereference_local(&context).await?; &context,
let form = PersonBlockForm { |(saved, context)| async move {
person_id, let post = saved.dereference(&context).await?;
target_id: target.id, let form = PostSavedForm {
}; person_id,
PersonBlock::block(&mut context.pool(), &form).await?; post_id: post.id,
LemmyResult::Ok(()) };
})) PostSaved::save(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.await?;
let failed_saved_comments = fetch_and_import(
data.saved_comments.clone(),
&context,
|(saved, context)| async move {
let comment = saved.dereference(&context).await?;
let form = CommentSavedForm {
person_id,
comment_id: comment.id,
};
CommentSaved::save(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.await?;
let failed_community_blocks = fetch_and_import(
data.blocked_communities.clone(),
&context,
|(blocked, context)| async move {
let community = blocked.dereference(&context).await?;
let form = CommunityBlockForm {
person_id,
community_id: community.id,
};
CommunityBlock::block(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.await?;
let failed_user_blocks = fetch_and_import(
data.blocked_users.clone(),
&context,
|(blocked, context)| async move {
let context = context.reset_request_count();
let target = blocked.dereference(&context).await?;
let form = PersonBlockForm {
person_id,
target_id: target.id,
};
PersonBlock::block(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.await?; .await?;
try_join_all(data.blocked_instances.iter().map(|domain| async { try_join_all(data.blocked_instances.iter().map(|domain| async {
// dont fetch unknown blocked objects from home server
let instance = Instance::read_or_create(&mut context.pool(), domain.clone()).await?; let instance = Instance::read_or_create(&mut context.pool(), domain.clone()).await?;
let form = InstanceBlockForm { let form = InstanceBlockForm {
person_id, person_id,
@ -312,17 +267,53 @@ pub async fn import_settings(
})) }))
.await?; .await?;
info!("Settings import completed for {}, the following items failed: {failed_followed_communities}, {failed_saved_posts}, {failed_saved_comments}, {failed_community_blocks}, {failed_user_blocks}",
local_user_view.person.name);
Ok(()) Ok(())
}); });
Ok(Json(Default::default())) Ok(Json(Default::default()))
} }
async fn fetch_and_import<Kind, Fut>(
objects: Vec<ObjectId<Kind>>,
context: &Data<LemmyContext>,
import_fn: impl FnMut((ObjectId<Kind>, Data<LemmyContext>)) -> Fut,
) -> LemmyResult<String>
where
Kind: Object + Send + 'static,
for<'de2> <Kind as Object>::Kind: Deserialize<'de2>,
Fut: Future<Output = LemmyResult<()>>,
{
let mut failed_items = vec![];
futures::stream::iter(
objects
.clone()
.into_iter()
// need to reset outgoing request count to avoid running into limit
.map(|s| (s, context.reset_request_count()))
.map(import_fn),
)
.buffer_unordered(PARALLELISM)
.collect::<Vec<_>>()
.await
.into_iter()
.enumerate()
.for_each(|(i, r): (usize, LemmyResult<()>)| {
if r.is_err() {
if let Some(object) = objects.get(i) {
failed_items.push(object.inner().clone());
}
}
});
Ok(failed_items.into_iter().join(","))
}
#[cfg(test)] #[cfg(test)]
#[allow(clippy::indexing_slicing)] #[allow(clippy::indexing_slicing)]
mod tests { mod tests {
use crate::api::user_settings_backup::{export_settings, import_settings}; use crate::api::user_settings_backup::{export_settings, import_settings, UserSettingsBackup};
use activitypub_federation::config::Data; use activitypub_federation::config::Data;
use lemmy_api_common::context::LemmyContext; use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{ use lemmy_db_schema::{
@ -420,6 +411,44 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test]
#[serial]
async fn test_settings_partial_import() -> LemmyResult<()> {
let context = LemmyContext::init_test_context().await;
let export_user =
create_user("hanna".to_string(), Some("my bio".to_string()), &context).await?;
let community_form = CommunityInsertForm::builder()
.name("testcom".to_string())
.title("testcom".to_string())
.instance_id(export_user.person.instance_id)
.build();
let community = Community::create(&mut context.pool(), &community_form).await?;
let follower_form = CommunityFollowerForm {
community_id: community.id,
person_id: export_user.person.id,
pending: false,
};
CommunityFollower::follow(&mut context.pool(), &follower_form).await?;
let backup = export_settings(export_user.clone(), context.reset_request_count()).await?;
let import_user = create_user("charles".to_string(), None, &context).await?;
let backup2 = UserSettingsBackup {
followed_communities: backup.followed_communities.clone(),
..Default::default()
};
import_settings(
actix_web::web::Json(backup2),
import_user.clone(),
context.reset_request_count(),
)
.await?;
Ok(())
}
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn disallow_large_backup() -> LemmyResult<()> { async fn disallow_large_backup() -> LemmyResult<()> {

View File

@ -29,7 +29,9 @@ pub(crate) mod mentions;
pub mod objects; pub mod objects;
pub mod protocol; pub mod protocol;
pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50; /// Maximum number of outgoing HTTP requests to fetch a single object. Needs to be high enough
/// to fetch a new community with posts, moderators and featured posts.
pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 100;
/// Only include a basic context to save space and bandwidth. The main context is hosted statically /// Only include a basic context to save space and bandwidth. The main context is hosted statically
/// on join-lemmy.org. Include activitystreams explicitly for better compat, but this could /// on join-lemmy.org. Include activitystreams explicitly for better compat, but this could

View File

@ -5,6 +5,12 @@
-- (even if only other columns are updated) because triggers can run after the deletion of referenced rows and -- (even if only other columns are updated) because triggers can run after the deletion of referenced rows and
-- before the automatic deletion of the row that references it. This is not a problem for insert or delete. -- before the automatic deletion of the row that references it. This is not a problem for insert or delete.
-- --
-- After a row update begins, a concurrent update on the same row can't begin until the whole
-- transaction that contains the first update is finished. To reduce this locking, statements in
-- triggers should be ordered based on the likelihood of concurrent writers. For example, updating
-- site_aggregates should be done last because the same row is updated for all local stuff. If
-- it were not last, then the locking period for concurrent writers would extend to include the
-- time consumed by statements that come after.
-- --
-- --
-- Create triggers for both post and comments -- Create triggers for both post and comments
@ -38,16 +44,18 @@ BEGIN
(thing_like).thing_id, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score = 1), 0) AS upvotes, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score != 1), 0) AS downvotes FROM select_old_and_new_rows AS old_and_new_rows GROUP BY (thing_like).thing_id) AS diff (thing_like).thing_id, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score = 1), 0) AS upvotes, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score != 1), 0) AS downvotes FROM select_old_and_new_rows AS old_and_new_rows GROUP BY (thing_like).thing_id) AS diff
WHERE WHERE
a.thing_id = diff.thing_id a.thing_id = diff.thing_id
RETURNING AND (diff.upvotes, diff.downvotes) != (0, 0)
r.creator_id_from_thing_aggregates (a.*) AS creator_id, diff.upvotes - diff.downvotes AS score) RETURNING
UPDATE r.creator_id_from_thing_aggregates (a.*) AS creator_id, diff.upvotes - diff.downvotes AS score)
person_aggregates AS a UPDATE
SET person_aggregates AS a
thing_score = a.thing_score + diff.score FROM ( SET
SELECT thing_score = a.thing_score + diff.score FROM (
creator_id, sum(score) AS score FROM thing_diff GROUP BY creator_id) AS diff SELECT
WHERE creator_id, sum(score) AS score FROM thing_diff GROUP BY creator_id) AS diff
a.person_id = diff.creator_id; WHERE
a.person_id = diff.creator_id
AND diff.score != 0;
RETURN NULL; RETURN NULL;
END; END;
$$); $$);
@ -62,6 +70,21 @@ CALL r.post_or_comment ('post');
CALL r.post_or_comment ('comment'); CALL r.post_or_comment ('comment');
-- Create triggers that update counts in parent aggregates -- Create triggers that update counts in parent aggregates
CREATE FUNCTION r.parent_comment_ids (path ltree)
RETURNS SETOF int
LANGUAGE sql
IMMUTABLE parallel safe
BEGIN
ATOMIC
SELECT
comment_id::int
FROM
string_to_table (ltree2text (path), '.') AS comment_id
-- Skip first and last
LIMIT (nlevel (path) - 2) OFFSET 1;
END;
CALL r.create_triggers ('comment', $$ CALL r.create_triggers ('comment', $$
BEGIN BEGIN
UPDATE UPDATE
@ -76,60 +99,84 @@ BEGIN
r.is_counted (comment) r.is_counted (comment)
GROUP BY (comment).creator_id) AS diff GROUP BY (comment).creator_id) AS diff
WHERE WHERE
a.person_id = diff.creator_id; a.person_id = diff.creator_id
AND diff.comment_count != 0;
UPDATE UPDATE
site_aggregates AS a comment_aggregates AS a
SET SET
comments = a.comments + diff.comments child_count = a.child_count + diff.child_count
FROM ( FROM (
SELECT SELECT
coalesce(sum(count_diff), 0) AS comments parent_id,
FROM coalesce(sum(count_diff), 0) AS child_count
select_old_and_new_rows AS old_and_new_rows FROM (
WHERE -- For each inserted or deleted comment, this outputs 1 row for each parent comment.
r.is_counted (comment) -- For example, this:
AND (comment).local) AS diff; --
-- count_diff | (comment).path
-- ------------+----------------
-- 1 | 0.5.6.7
-- 1 | 0.5.6.7.8
--
-- becomes this:
--
-- count_diff | parent_id
-- ------------+-----------
-- 1 | 5
-- 1 | 6
-- 1 | 5
-- 1 | 6
-- 1 | 7
SELECT
count_diff,
parent_id
FROM
select_old_and_new_rows AS old_and_new_rows,
LATERAL r.parent_comment_ids ((comment).path) AS parent_id) AS expanded_old_and_new_rows
GROUP BY
parent_id) AS diff
WHERE
a.comment_id = diff.parent_id
AND diff.child_count != 0;
WITH post_diff AS ( WITH post_diff AS (
UPDATE UPDATE
post_aggregates AS a post_aggregates AS a
SET SET
comments = a.comments + diff.comments, comments = a.comments + diff.comments,
newest_comment_time = GREATEST (a.newest_comment_time, ( newest_comment_time = GREATEST (a.newest_comment_time, diff.newest_comment_time),
SELECT newest_comment_time_necro = GREATEST (a.newest_comment_time_necro, diff.newest_comment_time_necro)
published
FROM select_new_rows AS new_comment
WHERE
a.post_id = new_comment.post_id ORDER BY published DESC LIMIT 1)),
newest_comment_time_necro = GREATEST (a.newest_comment_time_necro, (
SELECT
published
FROM select_new_rows AS new_comment
WHERE
a.post_id = new_comment.post_id
-- Ignore comments from the post's creator
AND a.creator_id != new_comment.creator_id
-- Ignore comments on old posts
AND a.published > (new_comment.published - '2 days'::interval)
ORDER BY published DESC LIMIT 1))
FROM ( FROM (
SELECT SELECT
(comment).post_id, post.id AS post_id,
coalesce(sum(count_diff), 0) AS comments coalesce(sum(count_diff), 0) AS comments,
-- Old rows are excluded using `count_diff = 1`
max((comment).published) FILTER (WHERE count_diff = 1) AS newest_comment_time,
max((comment).published) FILTER (WHERE count_diff = 1
-- Ignore comments from the post's creator
AND post.creator_id != (comment).creator_id
-- Ignore comments on old posts
AND post.published > ((comment).published - '2 days'::interval)) AS newest_comment_time_necro,
r.is_counted (post.*) AS include_in_community_aggregates
FROM FROM
select_old_and_new_rows AS old_and_new_rows select_old_and_new_rows AS old_and_new_rows
LEFT JOIN post ON post.id = (comment).post_id
WHERE WHERE
r.is_counted (comment) r.is_counted (comment)
GROUP BY GROUP BY
(comment).post_id) AS diff post.id) AS diff
LEFT JOIN post ON post.id = diff.post_id
WHERE WHERE
a.post_id = diff.post_id a.post_id = diff.post_id
AND (diff.comments,
GREATEST (a.newest_comment_time, diff.newest_comment_time),
GREATEST (a.newest_comment_time_necro, diff.newest_comment_time_necro)) != (0,
a.newest_comment_time,
a.newest_comment_time_necro)
RETURNING RETURNING
a.community_id, a.community_id,
diff.comments, diff.comments,
r.is_counted (post.*) AS include_in_community_aggregates) diff.include_in_community_aggregates)
UPDATE UPDATE
community_aggregates AS a community_aggregates AS a
SET SET
@ -145,7 +192,23 @@ FROM (
GROUP BY GROUP BY
community_id) AS diff community_id) AS diff
WHERE WHERE
a.community_id = diff.community_id; a.community_id = diff.community_id
AND diff.comments != 0;
UPDATE
site_aggregates AS a
SET
comments = a.comments + diff.comments
FROM (
SELECT
coalesce(sum(count_diff), 0) AS comments
FROM
select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (comment)
AND (comment).local) AS diff
WHERE
diff.comments != 0;
RETURN NULL; RETURN NULL;
@ -167,20 +230,8 @@ BEGIN
r.is_counted (post) r.is_counted (post)
GROUP BY (post).creator_id) AS diff GROUP BY (post).creator_id) AS diff
WHERE WHERE
a.person_id = diff.creator_id; a.person_id = diff.creator_id
AND diff.post_count != 0;
UPDATE
site_aggregates AS a
SET
posts = a.posts + diff.posts
FROM (
SELECT
coalesce(sum(count_diff), 0) AS posts
FROM
select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (post)
AND (post).local) AS diff;
UPDATE UPDATE
community_aggregates AS a community_aggregates AS a
@ -197,7 +248,23 @@ FROM (
GROUP BY GROUP BY
(post).community_id) AS diff (post).community_id) AS diff
WHERE WHERE
a.community_id = diff.community_id; a.community_id = diff.community_id
AND diff.posts != 0;
UPDATE
site_aggregates AS a
SET
posts = a.posts + diff.posts
FROM (
SELECT
coalesce(sum(count_diff), 0) AS posts
FROM
select_old_and_new_rows AS old_and_new_rows
WHERE
r.is_counted (post)
AND (post).local) AS diff
WHERE
diff.posts != 0;
RETURN NULL; RETURN NULL;
@ -217,7 +284,9 @@ BEGIN
FROM select_old_and_new_rows AS old_and_new_rows FROM select_old_and_new_rows AS old_and_new_rows
WHERE WHERE
r.is_counted (community) r.is_counted (community)
AND (community).local) AS diff; AND (community).local) AS diff
WHERE
diff.communities != 0;
RETURN NULL; RETURN NULL;
@ -235,7 +304,9 @@ BEGIN
SELECT SELECT
coalesce(sum(count_diff), 0) AS users coalesce(sum(count_diff), 0) AS users
FROM select_old_and_new_rows AS old_and_new_rows FROM select_old_and_new_rows AS old_and_new_rows
WHERE (person).local) AS diff; WHERE (person).local) AS diff
WHERE
diff.users != 0;
RETURN NULL; RETURN NULL;
@ -270,7 +341,8 @@ BEGIN
GROUP BY GROUP BY
old_post.community_id) AS diff old_post.community_id) AS diff
WHERE WHERE
a.community_id = diff.community_id; a.community_id = diff.community_id
AND diff.comments != 0;
RETURN NULL; RETURN NULL;
END; END;
$$; $$;
@ -296,7 +368,8 @@ BEGIN
LEFT JOIN community ON community.id = (community_follower).community_id LEFT JOIN community ON community.id = (community_follower).community_id
LEFT JOIN person ON person.id = (community_follower).person_id GROUP BY (community_follower).community_id) AS diff LEFT JOIN person ON person.id = (community_follower).person_id GROUP BY (community_follower).community_id) AS diff
WHERE WHERE
a.community_id = diff.community_id; a.community_id = diff.community_id
AND (diff.subscribers, diff.subscribers_local) != (0, 0);
RETURN NULL; RETURN NULL;
@ -474,3 +547,24 @@ CREATE TRIGGER delete_follow
FOR EACH ROW FOR EACH ROW
EXECUTE FUNCTION r.delete_follow_before_person (); EXECUTE FUNCTION r.delete_follow_before_person ();
-- Triggers that change values before insert or update
CREATE FUNCTION r.comment_change_values ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
id text = NEW.id::text;
BEGIN
-- Make `path` end with `id` if it doesn't already
IF NOT (NEW.path ~ ('*.' || id)::lquery) THEN
NEW.path = NEW.path || id;
END IF;
RETURN NEW;
END
$$;
CREATE TRIGGER change_values
BEFORE INSERT OR UPDATE ON comment
FOR EACH ROW
EXECUTE FUNCTION r.comment_change_values ();

View File

@ -15,12 +15,7 @@ use crate::{
utils::{functions::coalesce, get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, utils::{functions::coalesce, get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT},
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use diesel::{ use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl};
dsl::{insert_into, sql_query},
result::Error,
ExpressionMethods,
QueryDsl,
};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use diesel_ltree::Ltree; use diesel_ltree::Ltree;
use url::Url; use url::Url;
@ -72,81 +67,23 @@ impl Comment {
parent_path: Option<&Ltree>, parent_path: Option<&Ltree>,
) -> Result<Comment, Error> { ) -> Result<Comment, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
let comment_form = (comment_form, parent_path.map(|p| comment::path.eq(p)));
conn if let Some(timestamp) = timestamp {
.build_transaction() insert_into(comment::table)
.run(|conn| { .values(comment_form)
Box::pin(async move { .on_conflict(comment::ap_id)
// Insert, to get the id .filter_target(coalesce(comment::updated, comment::published).lt(timestamp))
let inserted_comment = if let Some(timestamp) = timestamp { .do_update()
insert_into(comment::table) .set(comment_form)
.values(comment_form) .get_result::<Self>(conn)
.on_conflict(comment::ap_id) .await
.filter_target(coalesce(comment::updated, comment::published).lt(timestamp)) } else {
.do_update() insert_into(comment::table)
.set(comment_form) .values(comment_form)
.get_result::<Self>(conn) .get_result::<Self>(conn)
.await? .await
} else { }
insert_into(comment::table)
.values(comment_form)
.get_result::<Self>(conn)
.await?
};
let comment_id = inserted_comment.id;
// You need to update the ltree column
let ltree = Ltree(if let Some(parent_path) = parent_path {
// The previous parent will already have 0 in it
// Append this comment id
format!("{}.{}", parent_path.0, comment_id)
} else {
// '0' is always the first path, append to that
format!("{}.{}", 0, comment_id)
});
let updated_comment = diesel::update(comment::table.find(comment_id))
.set(comment::path.eq(ltree))
.get_result::<Self>(conn)
.await?;
// Update the child count for the parent comment_aggregates
// You could do this with a trigger, but since you have to do this manually anyway,
// you can just have it here
if let Some(parent_path) = parent_path {
// You have to update counts for all parents, not just the immediate one
// TODO if the performance of this is terrible, it might be better to do this as part of a
// scheduled query... although the counts would often be wrong.
//
// The child_count query for reference:
// select c.id, c.path, count(c2.id) as child_count from comment c
// left join comment c2 on c2.path <@ c.path and c2.path != c.path
// group by c.id
let parent_id = parent_path.0.split('.').nth(1);
if let Some(parent_id) = parent_id {
let top_parent = format!("0.{}", parent_id);
let update_child_count_stmt = format!(
"
update comment_aggregates ca set child_count = c.child_count
from (
select c.id, c.path, count(c2.id) as child_count from comment c
join comment c2 on c2.path <@ c.path and c2.path != c.path
and c.path <@ '{top_parent}'
group by c.id
) as c
where ca.comment_id = c.id"
);
sql_query(update_child_count_stmt).execute(conn).await?;
}
}
Ok(updated_comment)
}) as _
})
.await
} }
pub async fn read_from_apub_id( pub async fn read_from_apub_id(

View File

@ -0,0 +1,3 @@
SELECT
1;

View File

@ -0,0 +1,4 @@
-- This migration exists to trigger re-execution of replaceable_schema
SELECT
1;