rDrama/files/helpers/actions.py

612 lines
18 KiB
Python

import random
import time
from urllib.parse import quote
from sqlalchemy.sql import func
import gevent
import requests
import math
from flask import g
from files.classes.reports import Report
from files.classes.mod_logs import ModAction
from files.classes.notifications import Notification
from files.classes.polls import CommentOption, PostOption
from files.classes.award import AwardRelationship
from files.classes.hole_relationship import Exile
from files.helpers.alerts import send_repeatable_notification, push_notif
from files.helpers.config.const import *
from files.helpers.config.awards import AWARDS
from files.helpers.const_stateful import *
from files.helpers.get import *
from files.helpers.logging import log_file
from files.helpers.sanitize import *
from files.helpers.settings import get_setting
from files.helpers.slots import check_slots_command
from files.helpers.useractions import badge_grant
from files.routes.routehelpers import check_for_alts
def _archiveorg(url):
try:
requests.post('https://ghostarchive.org/archive2', data={"archive": url}, headers=HEADERS, timeout=10)
except: pass
try:
requests.get(f'https://web.archive.org/save/{url}', headers=HEADERS, timeout=10)
except: pass
def archive_url(url):
gevent.spawn(_archiveorg, url)
if url.startswith('https://instagram.com/'):
url = url.replace('https://instagram.com/', 'https://imginn.com/')
gevent.spawn(_archiveorg, url)
def snappy_report(post, reason):
report = Report(post_id=post.id, user_id=SNAPPY_ID, reason=reason)
g.db.add(report)
message = f'@Snappy reported {post.textlink})\n\n> {reason}'
send_repeatable_notification(post.author_id, message)
def execute_snappy(post, v):
if post.hole and g.db.query(Exile.user_id).filter_by(user_id=SNAPPY_ID, hole=post.hole).one_or_none():
return
ghost = post.ghost
snappy = get_account(SNAPPY_ID)
ping_cost = 0
post_ping_group_count = len(list(group_mention_regex.finditer(post.body)))
if post.author.snappy_quotes and post.author.patron >= 5:
quotes = post.author.snappy_quotes.split("[para]")
body = random.choice(quotes).strip()
elif SITE_NAME == 'rDrama' and (v.post_count <= 3 or v.new_user):
body = "Hey, I saw that you just joined rdrama and a group of people who hate me are trying to smear my name by calling me a date male feminist. I just wanted to let you know that it's not true. If you want all the receipts proving it they're right here.\nThe TL;DR is that I hooked up with one of the hot girls on rdrama, and since I was twice her age, a lot of the guys here who wanted to bang her got jealous and assumed that the only way I could have managed that was by raping her, even though she herself said both before and afterwards that it was all completely consensual. So now that you're the new hot girl on rdrama, I guess they think that I'm going to somehow seduce you into flying all the way across the country (or some crazy shit like that) to hook up with me and they're trying to talk shit about me to make sure that doesnt happen. I don't understand it either, but I just wanted to let you know that I'm a nice guy and I hope you'll look at the evidence and make up your own mind instead of listening to all the jealous haters.\nAnyway, with that little intro out of the way, welcome to rdrama! The culture's a little rough here and you're probably going to get tons of requests to •show boobsU but if you talk to some of the women here first before engaging with all the creepy dudes, they'll give you some good advice to avoid stalkers and predators. Or (If you haven't been scared off by the rumor mill) you're welcome to ask me and I'm happy to help.\nJust out of curiosity, how did you find this place?"
elif post.hole and post.hole_obj.snappy_quotes:
quotes = post.hole_obj.snappy_quotes.split("[para]")
body = random.choice(quotes).strip()
elif SITE_NAME == 'WPD' and ('killing myself' in post.title.lower() or (post.hole != 'suicide' and 'suicide' in post.title.lower())):
body = "https://i.watchpeopledie.tv/images/1697382435294321.webp"
elif post_ping_group_count > 3:
body = "Unnecessary and uncalled for ping :marseydownvotemad: two more strikes and you're getting blocked + megadownvoted buddy, don't test your luck"
vote = Vote(user_id=SNAPPY_ID,
vote_type=-1,
post_id=post.id,
real = True
)
g.db.add(vote)
post.downvotes += 1
else:
if IS_DKD():
SNAPPY_CHOICES = SNAPPY_KONGS
elif IS_FISTMAS():
SNAPPY_CHOICES = SNAPPY_QUOTES_FISTMAS
elif IS_HOMOWEEN():
SNAPPY_CHOICES = SNAPPY_QUOTES_HOMOWEEN
elif SNAPPY_MARSEYS and SNAPPY_QUOTES:
if random.random() > 0.5:
SNAPPY_CHOICES = SNAPPY_QUOTES
else:
SNAPPY_CHOICES = SNAPPY_MARSEYS
elif SNAPPY_MARSEYS:
SNAPPY_CHOICES = SNAPPY_MARSEYS
elif SNAPPY_QUOTES:
SNAPPY_CHOICES = SNAPPY_QUOTES
else:
SNAPPY_CHOICES = [""]
body = random.choice(SNAPPY_CHOICES).strip()
body = body.replace('%OP%', f'@{post.author_name}')
if body.startswith('â–¼') or body.startswith(':#marseydownvote'):
if body.startswith('â–¼'): body = body[1:]
vote = Vote(user_id=SNAPPY_ID,
vote_type=-1,
post_id=post.id,
real = True
)
g.db.add(vote)
post.downvotes += 1
if body.startswith('OP is a Trump supporter'):
snappy_report(post, 'Trump supporter')
elif body.startswith('You had your chance. Downvoted and reported'):
snappy_report(post, 'Retard')
elif body.startswith('â–²') or body.startswith(':#marseyupvote'):
if body.startswith('â–²'): body = body[1:]
vote = Vote(user_id=SNAPPY_ID,
vote_type=1,
post_id=post.id,
real = True
)
g.db.add(vote)
post.upvotes += 1
elif ':#marseyghost' in body:
ghost = True
elif body.startswith(':#marseyreport') or body.startswith(':#marseyreportmaxxer2') or body.startswith(':#marseyreportmaxxer3'):
snappy_report(post, 'Retard')
elif body == '!slots':
body = f'!slots{snappy.coins}'
elif body == '!pinggroup':
group = g.db.query(Group).filter(Group.name != 'focusgroup').order_by(func.random()).first()
cost = len(group.member_ids) * 5
snappy.charge_account('coins', cost)
body = f'!{group.name}'
ping_cost = cost
elif body.startswith(':#marseyglow'):
award_object = AwardRelationship(
user_id=snappy.id,
kind="glowie",
post_id=post.id,
awarded_utc=time.time(),
)
g.db.add(award_object)
awarded_coins = int(AWARDS["glowie"]['price'] * COSMETIC_AWARD_COIN_AWARD_PCT)
post.author.pay_account('coins', awarded_coins, f"Glowie award on {post.textlink}")
msg = f"@Snappy has given {post.textlink} the Glowie Award and you have received {awarded_coins} coins as a result!"
send_repeatable_notification(post.author.id, msg)
elif body.startswith("You're a chud, CHUD I tell you"):
award_object = AwardRelationship(
user_id=snappy.id,
kind="chud",
post_id=post.id,
awarded_utc=time.time(),
note="Trans lives matter",
)
g.db.add(award_object)
msg = f"@Snappy has given {post.textlink} the Chud Award\n\n**You now have to say this phrase in all posts and comments you make for 24 hours:**\n\n> Trans lives matter"
send_repeatable_notification(post.author.id, msg)
if v.chud != 1:
if v.chud and time.time() < v.chud: v.chud += 86400
else: v.chud = int(time.time()) + 86400
v.chud_phrase = 'trans lives matter'
badge_grant(user=v, badge_id=58)
post.chudded = True
complies_with_chud(post)
body += "\n\n"
captured = []
if post.url:
captured.append((post.url, post.url))
for i in list(snappy_url_regex.finditer(post.body_html.replace(' data-src="', ' src="'))):
href = i.group(1)
if href in [x[0] for x in captured]: continue
title = i.group(2)
captured.append((href, title))
captured2 = []
for href, title in captured:
if not any((href.startswith(x) for x in ('/', f'{SITE_FULL}/', f'{SITE_FULL_IMAGES}/', f'{SITE_FULL_VIDEOS}/'))):
captured2.append((href, title))
if captured2:
body += "**Snapshots:**\n\n"
for href, title in captured2:
if f'**[{title}]({href})**:\n\n' not in body:
addition = f'**[{title}]({href})**:\n\n'
if href.startswith('https://old.reddit.com/r/'):
rev = href.replace('https://old.reddit.com/', '')
addition += f'* [undelete.pullpush.io](https://undelete.pullpush.io/{rev})\n\n'
elif href.startswith('https://old.reddit.com/user/'):
rev = href.replace('https://old.reddit.com/user/', '')
addition += f"* [search-new.pullpush.io](https://search-new.pullpush.io/?author={rev}&type=submission)\n\n"
elif href.startswith('https://boards.4chan.org/'):
rev = href.replace('https://boards.4chan.org/', '')
addition += f'* [archived.moe](https://archived.moe/{rev})\n\n'
elif href.startswith('https://scored.co/') and href != 'https://scored.co/':
rev = href.replace('https://scored.co/', '')
addition += f'* [unscored.arete.network](https://unscored.arete.network/{rev})\n\n'
addition += f'* [ghostarchive.org](https://ghostarchive.org/search?term={quote(href)})\n\n'
addition += f'* [archive.org](https://web.archive.org/{href})\n\n'
addition += f'* [archive.ph](https://archive.ph/?url={quote(href)}&run=1) (click to archive)\n\n'
if len(f'{body}{addition}') > COMMENT_BODY_LENGTH_LIMIT: break
body += addition
archive_url(href)
body = body[:COMMENT_BODY_LENGTH_LIMIT].strip()
body_html = sanitize(body, snappy=True, showmore=True)
if len(body_html) == 0:
return
if len(body_html) < COMMENT_BODY_HTML_LENGTH_LIMIT:
c = Comment(author_id=SNAPPY_ID,
distinguished=True,
parent_post=post.id,
level=1,
nsfw=False,
is_bot=True,
app_id=None,
body=body,
body_html=body_html,
ghost=ghost,
ping_cost=ping_cost,
)
g.db.add(c)
check_slots_command(c, v, snappy)
snappy.comment_count += 1
snappy.pay_account('coins', 1)
g.db.add(snappy)
if FEATURES['PINS'] and (body.startswith(':#marseypin:') or body.startswith(':#marseypin2:')):
post.pinned = "Snappy"
post.pinned_utc = int(time.time()) + 3600
elif SITE_NAME == 'rDrama' and body.startswith(':#marseyban:'):
days = 0.01
reason = f'<a href="/post/{post.id}">/post/{post.id}</a>'
v.ban(admin=snappy, reason=reason, days=days)
text = f"@Snappy has banned you for **{days}** days for the following reason:\n\n> {reason}"
send_repeatable_notification(v.id, text)
duration = f"for {days} days"
ma = ModAction(
kind="ban_user",
user_id=snappy.id,
target_user_id=v.id,
_note=f'duration: {duration}, reason: "{reason}"'
)
g.db.add(ma)
post.bannedfor = f'{duration} by @Snappy'
g.db.flush()
if c.ping_cost:
for x in group.member_ids:
n = Notification(comment_id=c.id, user_id=x)
g.db.add(n)
push_notif({x}, f'New mention of you by @Snappy', c.body, c)
c.top_comment_id = c.id
post.comment_count += 1
post.replies = [c]
def execute_zozbot(c, level, post, v):
if not ZOZBOT_ID: return
if random.random() >= 0.001: return
posting_to_post = isinstance(post, Post)
if posting_to_post and post.hole and g.db.query(Exile.user_id).filter_by(user_id=ZOZBOT_ID, hole=post.hole).one_or_none():
return
c2 = Comment(author_id=ZOZBOT_ID,
parent_post=post.id if posting_to_post else None,
wall_user_id=post.id if not posting_to_post else None,
parent_comment_id=c.id,
level=level+1,
is_bot=True,
body="zoz",
body_html='<p>zoz</p>',
top_comment_id=c.top_comment_id,
ghost=c.ghost,
distinguished=True
)
g.db.add(c2)
g.db.flush()
n = Notification(comment_id=c2.id, user_id=v.id)
g.db.add(n)
c3 = Comment(author_id=ZOZBOT_ID,
parent_post=post.id if posting_to_post else None,
wall_user_id=post.id if not posting_to_post else None,
parent_comment_id=c2.id,
level=level+2,
is_bot=True,
body="zle",
body_html='<p>zle</p>',
top_comment_id=c.top_comment_id,
ghost=c.ghost,
distinguished=True
)
g.db.add(c3)
g.db.flush()
c4 = Comment(author_id=ZOZBOT_ID,
parent_post=post.id if posting_to_post else None,
wall_user_id=post.id if not posting_to_post else None,
parent_comment_id=c3.id,
level=level+3,
is_bot=True,
body="zozzle",
body_html='<p>zozzle</p>',
top_comment_id=c.top_comment_id,
ghost=c.ghost,
distinguished=True
)
g.db.add(c4)
zozbot = get_account(ZOZBOT_ID)
zozbot.comment_count += 3
zozbot.pay_account('coins', 1)
g.db.add(zozbot)
if posting_to_post:
post.comment_count += 3
g.db.add(post)
push_notif({v.id}, f'New reply by @{c2.author_name}', "zoz", c2)
def execute_longpostbot(c, level, body, body_html, post, v):
if not LONGPOSTBOT_ID: return
if not len(c.body.split()) >= 200: return
if "</blockquote>" in body_html: return
posting_to_post = isinstance(post, Post)
if posting_to_post and post.hole and g.db.query(Exile.user_id).filter_by(user_id=LONGPOSTBOT_ID, hole=post.hole).one_or_none():
return
body = random.choice(LONGPOSTBOT_REPLIES)
if body.startswith('â–¼'):
body = body[1:]
vote = CommentVote(user_id=LONGPOSTBOT_ID,
vote_type=-1,
comment_id=c.id,
real = True
)
g.db.add(vote)
c.downvotes = 1
body_html = sanitize(body)
c2 = Comment(author_id=LONGPOSTBOT_ID,
parent_post=post.id if posting_to_post else None,
wall_user_id=post.id if not posting_to_post else None,
parent_comment_id=c.id,
level=level+1,
is_bot=True,
body=body,
body_html=body_html,
top_comment_id=c.top_comment_id,
ghost=c.ghost
)
g.db.add(c2)
longpostbot = get_account(LONGPOSTBOT_ID)
longpostbot.comment_count += 1
longpostbot.pay_account('coins', 1)
g.db.add(longpostbot)
g.db.flush()
n = Notification(comment_id=c2.id, user_id=v.id)
g.db.add(n)
if posting_to_post:
post.comment_count += 1
g.db.add(post)
push_notif({v.id}, f'New reply by @{c2.author_name}', c2.body, c2)
def tempban_for_spam(v):
text = "Your account has been banned for **1 day** for the following reason:\n\n> Too much spam!"
send_repeatable_notification(v.id, text)
v.ban(reason="Spam", days=1)
ma = ModAction(
kind="ban_user",
user_id=AUTOJANNY_ID,
target_user_id=v.id,
_note=f'duration: for 1 day, reason: "Spam"'
)
g.db.add(ma)
def execute_antispam_post_check(title, v, url):
if v.admin_level >= PERMS['BYPASS_ANTISPAM_CHECKS']:
return True
now = int(time.time())
cutoff = now - 60 * 60 * 24
similar_posts = g.db.query(Post).filter(
Post.author_id == v.id,
Post.title.op('<->')(title) < SPAM_SIMILARITY_THRESHOLD,
Post.created_utc > cutoff
).all()
if url:
similar_urls = g.db.query(Post).filter(
Post.author_id == v.id,
Post.url.op('<->')(url) < SPAM_URL_SIMILARITY_THRESHOLD,
Post.created_utc > cutoff
).all()
else: similar_urls = []
threshold = SPAM_SIMILAR_COUNT_THRESHOLD
if v.age >= (60 * 60 * 24 * 7): threshold *= 3
elif v.age >= (60 * 60 * 24): threshold *= 2
if max(len(similar_urls), len(similar_posts)) >= threshold:
tempban_for_spam(v)
for post in similar_posts + similar_urls:
post.is_banned = True
post.profile_pinned = False
post.ban_reason = "AutoJanny"
g.db.add(post)
ma = ModAction(
user_id=AUTOJANNY_ID,
target_post_id=post.id,
kind="ban_post",
_note="Spam"
)
g.db.add(ma)
return False
return True
def execute_antispam_duplicate_comment_check(v, body_html):
if v.admin_level >= PERMS['BYPASS_ANTISPAM_CHECKS']:
return
if v.id in ANTISPAM_BYPASS_IDS:
return
if v.age >= NOTIFICATION_SPAM_AGE_THRESHOLD:
return
if len(body_html) < 16:
return
ANTISPAM_DUPLICATE_THRESHOLD = 3
compare_time = int(time.time()) - 60 * 60 * 24
count = g.db.query(Comment.id).filter(Comment.body_html == body_html,
Comment.created_utc >= compare_time).count()
if count <= ANTISPAM_DUPLICATE_THRESHOLD: return
tempban_for_spam(v)
g.db.commit()
abort(403, "Too much spam!")
def execute_antispam_comment_check(body, v):
if v.admin_level >= PERMS['BYPASS_ANTISPAM_CHECKS']:
return
if v.id in ANTISPAM_BYPASS_IDS: return
if len(body) <= COMMENT_SPAM_LENGTH_THRESHOLD: return
now = int(time.time())
cutoff = now - 60 * 60 * 24
similar_comments = g.db.query(Comment).filter(
Comment.author_id == v.id,
Comment.body.op('<->')(body) < COMMENT_SPAM_SIMILAR_THRESHOLD,
Comment.created_utc > cutoff
).all()
threshold = COMMENT_SPAM_COUNT_THRESHOLD
if v.age >= (60 * 60 * 24 * 7):
threshold *= 3
elif v.age >= (60 * 60 * 24):
threshold *= 2
if len(similar_comments) <= threshold: return
tempban_for_spam(v)
for comment in similar_comments:
comment.is_banned = True
comment.ban_reason = "AutoJanny"
g.db.add(comment)
ma = ModAction(
user_id=AUTOJANNY_ID,
target_comment_id=comment.id,
kind="ban_comment",
_note="Spam"
)
g.db.add(ma)
g.db.commit()
abort(403, "Too much spam!")
def execute_under_siege(v, target, body, kind):
if v.shadowbanned: return
if v.admin_level >= PERMS['BYPASS_UNDER_SIEGE_MODE']: return
thresholds = cache.get("under_siege_thresholds")
if not thresholds:
thresholds = DEFAULT_UNDER_SIEGE_THRESHOLDS
cache.set("under_siege_thresholds", thresholds)
if v.age > thresholds[kind] * 60:
return
unshadowbannedcels = [x[0] for x in g.db.query(ModAction.target_user_id).filter_by(kind='unshadowban')]
if v.id in unshadowbannedcels: return
check_for_alts(v)
if v.shadowbanned: return
v.shadowbanned = AUTOJANNY_ID
v.shadowban_reason = "Under Siege"
g.db.add(v)
if kind == "report":
if isinstance(target, Post):
reason = f'report on <a href="{target.permalink}">post</a>'
else:
reason = f'report on <a href="{target.permalink}">comment</a>'
elif hasattr(target, 'permalink'):
reason = f'<a href="{target.permalink}">{kind}</a>'
else:
reason = kind
minutes = math.ceil(v.age / 60)
time_taken = f'{minutes} minute'
if minutes > 1:
time_taken += 's'
ma = ModAction(
kind="shadowban",
user_id=AUTOJANNY_ID,
target_user_id=v.id,
_note=f'reason: "Under Siege ({reason}, {time_taken})"'
)
g.db.add(ma)
def process_options(v, target):
patterns = [(poll_regex, 0), (choice_regex, 1)]
if v.admin_level >= PERMS['POST_BETS']:
patterns.append((bet_regex, 2))
option_count = 0
option_objects = []
for pattern, exclusive in patterns:
body_html = target.body_html.replace('&amp;', '&')
for i in pattern.finditer(body_html):
option_count += 1
if option_count > POLL_MAX_OPTIONS:
abort(400, f"Max number of poll options is {POLL_MAX_OPTIONS}")
body = i.group(2)
if len(body) > 500:
abort(400, f"Poll option body is too long (Max 500 characters)")
if isinstance(target, Post):
cls = PostOption
else:
cls = CommentOption
existing = g.db.query(cls).filter_by(
parent_id=target.id,
body_html=body,
exclusive=exclusive,
).first()
if not existing:
option = cls(
parent_id=target.id,
body_html=body,
exclusive=exclusive,
)
option_objects.append(option) #shitty hack to bypass autoflush
g.db.add_all(option_objects)