mirror of https://github.com/LemmyNet/lemmy.git
use max(id) instead of seq max value to prevent uncommitted transactions from causing skipped activities
parent
3d649e1d3e
commit
a808d3208a
|
@ -1,8 +1,5 @@
|
||||||
use anyhow::{anyhow, Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
use diesel::{
|
use diesel::prelude::*;
|
||||||
prelude::*,
|
|
||||||
sql_types::{Bool, Int8},
|
|
||||||
};
|
|
||||||
use diesel_async::RunQueryDsl;
|
use diesel_async::RunQueryDsl;
|
||||||
use lemmy_apub::{
|
use lemmy_apub::{
|
||||||
activity_lists::SharedInboxActivities,
|
activity_lists::SharedInboxActivities,
|
||||||
|
@ -191,17 +188,11 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<Acti
|
||||||
});
|
});
|
||||||
CACHE
|
CACHE
|
||||||
.try_get_with((), async {
|
.try_get_with((), async {
|
||||||
|
use diesel::dsl::max;
|
||||||
|
use lemmy_db_schema::schema::sent_activity::dsl::{id, sent_activity};
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
let seq: Sequence =
|
let seq: Option<ActivityId> = sent_activity.select(max(id)).get_result(conn).await?;
|
||||||
diesel::sql_query("select last_value, is_called from sent_activity_id_seq")
|
let latest_id = seq.unwrap_or(0);
|
||||||
.get_result(conn)
|
|
||||||
.await?;
|
|
||||||
let latest_id = if seq.is_called {
|
|
||||||
seq.last_value as ActivityId
|
|
||||||
} else {
|
|
||||||
// if a PG sequence has never been used, last_value will actually be next_value
|
|
||||||
(seq.last_value - 1) as ActivityId
|
|
||||||
};
|
|
||||||
anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId)
|
anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
@ -212,11 +203,3 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<Acti
|
||||||
pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration {
|
pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration {
|
||||||
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
|
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(QueryableByName)]
|
|
||||||
struct Sequence {
|
|
||||||
#[diesel(sql_type = Int8)]
|
|
||||||
last_value: i64, // this value is bigint for some reason even if sequence is int4
|
|
||||||
#[diesel(sql_type = Bool)]
|
|
||||||
is_called: bool,
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue