2023-04-25 23:28:06 +00:00
use clokwerk ::{ Scheduler , TimeUnits as CTimeUnits } ;
use diesel ::{
dsl ::{ now , IntervalDsl } ,
Connection ,
ExpressionMethods ,
2023-06-20 06:17:54 +00:00
NullableExpressionMethods ,
2023-04-25 23:28:06 +00:00
QueryDsl ,
} ;
2021-01-29 16:38:27 +00:00
// Import week days and WeekDay
use diesel ::{ sql_query , PgConnection , RunQueryDsl } ;
2023-06-21 08:28:20 +00:00
use lemmy_api_common ::context ::LemmyContext ;
2023-02-18 14:36:12 +00:00
use lemmy_db_schema ::{
2023-06-08 20:15:15 +00:00
schema ::{
activity ,
2023-06-20 06:17:54 +00:00
comment ,
2023-06-08 20:15:15 +00:00
comment_aggregates ,
community_aggregates ,
community_person_ban ,
instance ,
person ,
2023-06-20 06:17:54 +00:00
post ,
2023-06-08 20:15:15 +00:00
post_aggregates ,
} ,
2023-02-18 14:36:12 +00:00
source ::instance ::{ Instance , InstanceForm } ,
2023-06-20 06:17:54 +00:00
utils ::{ functions ::hot_rank , naive_now , DELETED_REPLACEMENT_TEXT } ,
2023-02-18 14:36:12 +00:00
} ;
use lemmy_routes ::nodeinfo ::NodeInfo ;
use lemmy_utils ::{ error ::LemmyError , REQWEST_TIMEOUT } ;
use reqwest ::blocking ::Client ;
2021-01-29 16:38:27 +00:00
use std ::{ thread , time ::Duration } ;
2023-06-15 09:29:12 +00:00
use tracing ::{ error , info } ;
2021-01-29 16:38:27 +00:00
/// Schedules various cleanup tasks for lemmy in a background thread
2023-06-21 08:28:20 +00:00
pub fn setup (
db_url : String ,
user_agent : String ,
context_1 : LemmyContext ,
) -> Result < ( ) , LemmyError > {
2022-11-09 10:05:00 +00:00
// Setup the connections
2021-01-29 16:38:27 +00:00
let mut scheduler = Scheduler ::new ( ) ;
2023-06-20 09:33:03 +00:00
startup_jobs ( & db_url ) ;
2021-08-26 11:49:16 +00:00
2023-06-08 20:15:15 +00:00
// Update active counts every hour
2023-06-20 09:33:03 +00:00
let url = db_url . clone ( ) ;
2023-04-25 23:28:06 +00:00
scheduler . every ( CTimeUnits ::hour ( 1 ) ) . run ( move | | {
2023-06-20 09:33:03 +00:00
let mut conn = PgConnection ::establish ( & url ) . expect ( " could not establish connection " ) ;
active_counts ( & mut conn ) ;
update_banned_when_expired ( & mut conn ) ;
2023-06-08 20:15:15 +00:00
} ) ;
// Update hot ranks every 5 minutes
2023-06-20 09:33:03 +00:00
let url = db_url . clone ( ) ;
2023-06-08 20:15:15 +00:00
scheduler . every ( CTimeUnits ::minutes ( 5 ) ) . run ( move | | {
2023-06-20 09:33:03 +00:00
let mut conn = PgConnection ::establish ( & url ) . expect ( " could not establish connection " ) ;
update_hot_ranks ( & mut conn , true ) ;
2021-01-29 16:38:27 +00:00
} ) ;
2023-06-08 20:15:15 +00:00
// Clear old activities every week
2023-06-20 09:33:03 +00:00
let url = db_url . clone ( ) ;
2023-04-25 23:28:06 +00:00
scheduler . every ( CTimeUnits ::weeks ( 1 ) ) . run ( move | | {
2023-06-20 09:33:03 +00:00
let mut conn = PgConnection ::establish ( & url ) . expect ( " could not establish connection " ) ;
clear_old_activities ( & mut conn ) ;
2021-01-29 16:38:27 +00:00
} ) ;
2023-06-21 08:28:20 +00:00
// Remove old rate limit buckets after 1 to 2 hours of inactivity
scheduler . every ( CTimeUnits ::hour ( 1 ) ) . run ( move | | {
let hour = Duration ::from_secs ( 3600 ) ;
context_1 . settings_updated_channel ( ) . remove_older_than ( hour ) ;
} ) ;
2023-06-20 06:17:54 +00:00
// Overwrite deleted & removed posts and comments every day
let url = db_url . clone ( ) ;
scheduler . every ( CTimeUnits ::days ( 1 ) ) . run ( move | | {
let mut conn = PgConnection ::establish ( & url ) . expect ( " could not establish connection " ) ;
overwrite_deleted_posts_and_comments ( & mut conn ) ;
} ) ;
2023-06-20 09:33:03 +00:00
// Update the Instance Software
2023-04-25 23:28:06 +00:00
scheduler . every ( CTimeUnits ::days ( 1 ) ) . run ( move | | {
2023-06-20 09:33:03 +00:00
let mut conn = PgConnection ::establish ( & db_url ) . expect ( " could not establish connection " ) ;
update_instance_software ( & mut conn , & user_agent ) ;
2023-02-18 14:36:12 +00:00
} ) ;
2021-01-29 16:38:27 +00:00
// Manually run the scheduler in an event loop
loop {
scheduler . run_pending ( ) ;
thread ::sleep ( Duration ::from_millis ( 1000 ) ) ;
}
}
2023-06-20 09:33:03 +00:00
/// Run these on server startup
fn startup_jobs ( db_url : & str ) {
let mut conn = PgConnection ::establish ( db_url ) . expect ( " could not establish connection " ) ;
active_counts ( & mut conn ) ;
update_hot_ranks ( & mut conn , false ) ;
update_banned_when_expired ( & mut conn ) ;
clear_old_activities ( & mut conn ) ;
2023-06-20 06:17:54 +00:00
overwrite_deleted_posts_and_comments ( & mut conn ) ;
2023-06-20 09:33:03 +00:00
}
2023-06-08 20:15:15 +00:00
/// Update the hot_rank columns for the aggregates tables
fn update_hot_ranks ( conn : & mut PgConnection , last_week_only : bool ) {
let mut post_update = diesel ::update ( post_aggregates ::table ) . into_boxed ( ) ;
let mut comment_update = diesel ::update ( comment_aggregates ::table ) . into_boxed ( ) ;
let mut community_update = diesel ::update ( community_aggregates ::table ) . into_boxed ( ) ;
// Only update for the last week of content
if last_week_only {
info! ( " Updating hot ranks for last week... " ) ;
let last_week = now - diesel ::dsl ::IntervalDsl ::weeks ( 1 ) ;
post_update = post_update . filter ( post_aggregates ::published . gt ( last_week ) ) ;
comment_update = comment_update . filter ( comment_aggregates ::published . gt ( last_week ) ) ;
community_update = community_update . filter ( community_aggregates ::published . gt ( last_week ) ) ;
} else {
info! ( " Updating hot ranks for all history... " ) ;
2021-01-29 16:38:27 +00:00
}
2023-06-15 09:29:12 +00:00
match post_update
2023-06-08 20:15:15 +00:00
. set ( (
post_aggregates ::hot_rank . eq ( hot_rank ( post_aggregates ::score , post_aggregates ::published ) ) ,
post_aggregates ::hot_rank_active . eq ( hot_rank (
post_aggregates ::score ,
post_aggregates ::newest_comment_time_necro ,
) ) ,
) )
. execute ( conn )
2023-06-15 09:29:12 +00:00
{
Ok ( _ ) = > { }
Err ( e ) = > {
error! ( " Failed to update post_aggregates hot_ranks: {} " , e )
}
}
2023-06-08 20:15:15 +00:00
2023-06-15 09:29:12 +00:00
match comment_update
2023-06-08 20:15:15 +00:00
. set ( comment_aggregates ::hot_rank . eq ( hot_rank (
comment_aggregates ::score ,
comment_aggregates ::published ,
) ) )
. execute ( conn )
2023-06-15 09:29:12 +00:00
{
Ok ( _ ) = > { }
Err ( e ) = > {
error! ( " Failed to update comment_aggregates hot_ranks: {} " , e )
}
}
2023-06-08 20:15:15 +00:00
2023-06-15 09:29:12 +00:00
match community_update
2023-06-08 20:15:15 +00:00
. set ( community_aggregates ::hot_rank . eq ( hot_rank (
community_aggregates ::subscribers ,
community_aggregates ::published ,
) ) )
. execute ( conn )
2023-06-15 09:29:12 +00:00
{
Ok ( _ ) = > {
info! ( " Done. " ) ;
}
Err ( e ) = > {
error! ( " Failed to update community_aggregates hot_ranks: {} " , e )
}
}
2021-01-29 16:38:27 +00:00
}
/// Clear old activities (this table gets very large)
2022-09-26 14:09:32 +00:00
fn clear_old_activities ( conn : & mut PgConnection ) {
2021-01-29 16:38:27 +00:00
info! ( " Clearing old activities... " ) ;
2023-06-15 09:29:12 +00:00
match diesel ::delete ( activity ::table . filter ( activity ::published . lt ( now - 6. months ( ) ) ) )
2022-11-09 10:05:00 +00:00
. execute ( conn )
2023-06-15 09:29:12 +00:00
{
Ok ( _ ) = > {
info! ( " Done. " ) ;
}
Err ( e ) = > {
error! ( " Failed to clear old activities: {} " , e )
}
}
2021-01-29 16:38:27 +00:00
}
2023-06-20 06:17:54 +00:00
/// overwrite posts and comments 30d after deletion
fn overwrite_deleted_posts_and_comments ( conn : & mut PgConnection ) {
info! ( " Overwriting deleted posts... " ) ;
match diesel ::update (
post ::table
. filter ( post ::deleted . eq ( true ) )
. filter ( post ::updated . lt ( now . nullable ( ) - 1. months ( ) ) )
. filter ( post ::body . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( (
post ::body . eq ( DELETED_REPLACEMENT_TEXT ) ,
post ::name . eq ( DELETED_REPLACEMENT_TEXT ) ,
) )
. execute ( conn )
{
Ok ( _ ) = > {
info! ( " Done. " ) ;
}
Err ( e ) = > {
error! ( " Failed to overwrite deleted posts: {} " , e )
}
}
info! ( " Overwriting deleted comments... " ) ;
match diesel ::update (
comment ::table
. filter ( comment ::deleted . eq ( true ) )
. filter ( comment ::updated . lt ( now . nullable ( ) - 1. months ( ) ) )
. filter ( comment ::content . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( comment ::content . eq ( DELETED_REPLACEMENT_TEXT ) )
. execute ( conn )
{
Ok ( _ ) = > {
info! ( " Done. " ) ;
}
Err ( e ) = > {
error! ( " Failed to overwrite deleted comments: {} " , e )
}
}
}
2021-01-29 16:38:27 +00:00
/// Re-calculate the site and community active counts every 12 hours
2022-09-26 14:09:32 +00:00
fn active_counts ( conn : & mut PgConnection ) {
2021-01-29 16:38:27 +00:00
info! ( " Updating active site and community aggregates ... " ) ;
let intervals = vec! [
( " 1 day " , " day " ) ,
( " 1 week " , " week " ) ,
( " 1 month " , " month " ) ,
( " 6 months " , " half_year " ) ,
] ;
for i in & intervals {
let update_site_stmt = format! (
" update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}')) " ,
i . 1 , i . 0
) ;
2023-06-15 09:29:12 +00:00
match sql_query ( update_site_stmt ) . execute ( conn ) {
Ok ( _ ) = > { }
Err ( e ) = > {
error! ( " Failed to update site stats: {} " , e )
}
}
2021-01-29 16:38:27 +00:00
let update_community_stmt = format! ( " update community_aggregates ca set users_active_ {} = mv.count_ from community_aggregates_activity(' {} ') mv where ca.community_id = mv.community_id_ " , i . 1 , i . 0 ) ;
2023-06-15 09:29:12 +00:00
match sql_query ( update_community_stmt ) . execute ( conn ) {
Ok ( _ ) = > { }
Err ( e ) = > {
error! ( " Failed to update community stats: {} " , e )
}
}
2021-01-29 16:38:27 +00:00
}
info! ( " Done. " ) ;
}
2022-03-30 13:56:23 +00:00
/// Set banned to false after ban expires
2022-09-26 14:09:32 +00:00
fn update_banned_when_expired ( conn : & mut PgConnection ) {
2022-03-30 13:56:23 +00:00
info! ( " Updating banned column if it expires ... " ) ;
2023-04-25 23:28:06 +00:00
2023-06-15 09:29:12 +00:00
match diesel ::update (
2023-04-25 23:28:06 +00:00
person ::table
. filter ( person ::banned . eq ( true ) )
. filter ( person ::ban_expires . lt ( now ) ) ,
)
. set ( person ::banned . eq ( false ) )
. execute ( conn )
2023-06-15 09:29:12 +00:00
{
Ok ( _ ) = > { }
Err ( e ) = > {
error! ( " Failed to update person.banned when expires: {} " , e )
}
}
match diesel ::delete ( community_person_ban ::table . filter ( community_person_ban ::expires . lt ( now ) ) )
2022-03-30 13:56:23 +00:00
. execute ( conn )
2023-06-15 09:29:12 +00:00
{
Ok ( _ ) = > { }
Err ( e ) = > {
error! ( " Failed to remove community_ban expired rows: {} " , e )
}
}
2022-03-30 13:56:23 +00:00
}
2022-09-07 12:12:51 +00:00
2023-02-18 14:36:12 +00:00
/// Updates the instance software and version
fn update_instance_software ( conn : & mut PgConnection , user_agent : & str ) {
info! ( " Updating instances software and versions... " ) ;
2023-06-15 09:29:12 +00:00
let client = match Client ::builder ( )
2023-02-18 14:36:12 +00:00
. user_agent ( user_agent )
. timeout ( REQWEST_TIMEOUT )
. build ( )
2023-06-15 09:29:12 +00:00
{
Ok ( client ) = > client ,
Err ( e ) = > {
error! ( " Failed to build reqwest client: {} " , e ) ;
return ;
}
} ;
2023-02-18 14:36:12 +00:00
2023-06-15 09:29:12 +00:00
let instances = match instance ::table . get_results ::< Instance > ( conn ) {
Ok ( instances ) = > instances ,
Err ( e ) = > {
error! ( " Failed to get instances: {} " , e ) ;
return ;
}
} ;
2023-02-18 14:36:12 +00:00
for instance in instances {
let node_info_url = format! ( " https:// {} /nodeinfo/2.0.json " , instance . domain ) ;
// Skip it if it can't connect
let res = client
. get ( & node_info_url )
. send ( )
. ok ( )
. and_then ( | t | t . json ::< NodeInfo > ( ) . ok ( ) ) ;
if let Some ( node_info ) = res {
let software = node_info . software . as_ref ( ) ;
let form = InstanceForm ::builder ( )
. domain ( instance . domain )
. software ( software . and_then ( | s | s . name . clone ( ) ) )
. version ( software . and_then ( | s | s . version . clone ( ) ) )
. updated ( Some ( naive_now ( ) ) )
. build ( ) ;
2023-06-15 09:29:12 +00:00
match diesel ::update ( instance ::table . find ( instance . id ) )
2023-02-18 14:36:12 +00:00
. set ( form )
. execute ( conn )
2023-06-15 09:29:12 +00:00
{
Ok ( _ ) = > {
info! ( " Done. " ) ;
}
Err ( e ) = > {
error! ( " Failed to update site instance software: {} " , e ) ;
return ;
}
}
2023-02-18 14:36:12 +00:00
}
}
}
#[ cfg(test) ]
mod tests {
use lemmy_routes ::nodeinfo ::NodeInfo ;
use reqwest ::Client ;
#[ tokio::test ]
2023-02-28 21:45:37 +00:00
#[ ignore ]
2023-02-18 14:36:12 +00:00
async fn test_nodeinfo ( ) {
let client = Client ::builder ( ) . build ( ) . unwrap ( ) ;
let lemmy_ml_nodeinfo = client
. get ( " https://lemmy.ml/nodeinfo/2.0.json " )
. send ( )
. await
. unwrap ( )
. json ::< NodeInfo > ( )
. await
. unwrap ( ) ;
assert_eq! ( lemmy_ml_nodeinfo . software . unwrap ( ) . name . unwrap ( ) , " lemmy " ) ;
}
}