import time from urllib.parse import quote, urlencode from math import floor import os from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import load_only from psycopg2.errors import UniqueViolation from files.__main__ import app, cache, limiter from files.classes import * from files.helpers.actions import * from files.helpers.alerts import * from files.helpers.cloudflare import * from files.helpers.config.const import * from files.helpers.get import * from files.helpers.media import * from files.helpers.sanitize import * from files.helpers.security import * from files.helpers.settings import get_settings, toggle_setting from files.helpers.useractions import * from files.routes.routehelpers import check_for_alts from files.routes.wrappers import * from files.routes.routehelpers import get_alt_graph, get_alt_graph_ids from .front import frontlist, comment_idlist mylist2 = [ (User, User.bio, "bio"), (Submission, Submission.body, "body"), (Comment, Comment.body, "body"), ] SITES = '(i.imgur.com|i.imgur.io|i.ibb.co|files.catbox.moe|pomf2.lain.la\/f)' images4_regex = re.compile(f'https:\/\/{SITES}\/([a-zA-Z0-9/]*?)(_d)?\.(webp|png|jpg|jpeg|gif)[a-z0-9=&?;]*', flags=re.A) @app.get('/admin/images') @admin_level_required(5) def images(v): for cls, attr, attrname in mylist2: print(f'{cls.__name__}.{attrname}\n---------------------------', flush=True) items = g.db.query(cls).options(load_only(cls.id, attr)).filter( attr.op('similar to')(f'%https://{SITES}/%.(webp|png|jpg|jpeg|gif)%'), ).order_by(func.random()).all() if not items: continue total = len(items) for x, i in enumerate(items): attribute = getattr(i, attrname) if attribute.startswith(f'https://{SITE_IMAGES}/images/'): continue if not len(list(images4_regex.finditer(attribute))): continue print(f'{x+1}/{total}: {i.id}', flush=True) captured = set() for y in images4_regex.finditer(attribute): site = y.group(1) id = y.group(2) if id in captured: continue captured.add(id) ext = y.group(4) url = f'https://{site}/{id}.{ext}' print(url, flush=True) try: image_req = requests.get(url, headers=HEADERS, timeout=5) except: continue if image_req.status_code >= 400: print(f"ERROR CODE: {image_req.status_code}", flush=True) continue if not image_req.headers.get("Content-Type","").startswith("image/"): print("NOT IMAGE/", flush=True) continue if image_req.headers.get("Content-Type","").startswith("image/svg"): print("IMAGE/SVG", flush=True) continue name = f'/images/{time.time()}'.replace('.','') + '.webp' with open(name, "wb") as file: for chunk in image_req.iter_content(1024): file.write(chunk) if ext != 'webp': try: process_image(name, v) except: continue if not os.path.exists(name): continue size = os.stat(name).st_size if not size: continue new_url = f"https://{SITE_IMAGES}{name}" attribute = attribute.replace(y.group(0), new_url) setattr(i, attrname, attribute) try: setattr(i, f'{attrname}_html', sanitize(attribute)) except: g.db.rollback() pass g.db.add(i) g.db.commit() print('done!!!!!', flush=True) time.sleep(5) return "success" @app.get('/admin/loggedin') @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['VIEW_ACTIVE_USERS']) def loggedin_list(v): ids = [x for x,val in cache.get('loggedin').items() if time.time()-val < LOGGEDIN_ACTIVE_TIME] users = g.db.query(User).filter(User.id.in_(ids)).order_by(User.admin_level.desc(), User.truescore.desc()).all() return render_template("admin/loggedin.html", v=v, users=users) @app.get('/admin/loggedout') @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['VIEW_ACTIVE_USERS']) def loggedout_list(v): users = sorted([val[1] for x,val in cache.get('loggedout').items() if time.time()-val[0] < LOGGEDIN_ACTIVE_TIME]) return render_template("admin/loggedout.html", v=v, users=users) @app.get('/admin/dm_media') @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['ENABLE_DM_MEDIA']) def dm_media(v): with open(f"{LOG_DIRECTORY}/dm_media.log", "r", encoding="utf-8") as f: items=f.read().split("\n")[:-1] total = len(items) items = [x.split(", ") for x in items] items.reverse() try: page = int(request.values.get('page', 1)) except: page = 1 firstrange = PAGE_SIZE * (page - 1) secondrange = firstrange + PAGE_SIZE items = items[firstrange:secondrange] return render_template("admin/dm_media.html", v=v, items=items, total=total, page=page) @app.get('/admin/edit_rules') @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['EDIT_RULES']) def edit_rules_get(v): try: with open(f'files/templates/rules_{SITE_NAME}.html', 'r', encoding="utf-8") as f: rules = f.read() except: rules = None return render_template('admin/edit_rules.html', v=v, rules=rules) @app.post('/admin/edit_rules') @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit("30/minute;200/hour;1000/day") @limiter.limit("30/minute;200/hour;1000/day", key_func=get_ID) @admin_level_required(PERMS['EDIT_RULES']) def edit_rules_post(v): rules = request.values.get('rules', '').strip() rules = sanitize(rules, blackjack="rules", showmore=False) with open(f'files/templates/rules_{SITE_NAME}.html', 'w+', encoding="utf-8") as f: f.write(rules) # ma = ModAction( # kind="edit_rules", # user_id=v.id, # ) # g.db.add(ma) return render_template('admin/edit_rules.html', v=v, rules=rules, msg='Rules edited successfully!') @app.post("/@/make_admin") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['ADMIN_ADD']) def make_admin(v:User, username): user = get_user(username) user.admin_level = 1 g.db.add(user) ma = ModAction( kind="make_admin", user_id=v.id, target_user_id=user.id ) g.db.add(ma) send_repeatable_notification(user.id, f"@{v.username} added you as an admin!") return {"message": f"@{user.username} has been made admin!"} @app.post("/@/remove_admin") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['ADMIN_REMOVE']) def remove_admin(v:User, username): if SITE == 'devrama.net': abort(403, "You can't remove admins on devrama!") user = get_user(username) if user.admin_level > v.admin_level: abort(403) if user.admin_level: user.admin_level = 0 g.db.add(user) ma = ModAction( kind="remove_admin", user_id=v.id, target_user_id=user.id ) g.db.add(ma) send_repeatable_notification(user.id, f"@{v.username} removed you as an admin!") return {"message": f"@{user.username} has been removed as admin!"} @app.post("/distribute//") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_BETS_DISTRIBUTE']) def distribute(v:User, kind, option_id): autojanny = get_account(AUTOJANNY_ID) if autojanny.coins == 0: abort(400, "@AutoJanny has 0 coins") try: option_id = int(option_id) except: abort(400) if kind == 'post': cls = SubmissionOption else: cls = CommentOption option = g.db.get(cls, option_id) if option.exclusive != 2: abort(403) option.exclusive = 3 g.db.add(option) parent = option.parent pool = 0 for o in parent.options: if o.exclusive >= 2: pool += o.upvotes pool *= POLL_BET_COINS autojanny.charge_account('coins', pool) if autojanny.coins < 0: autojanny.coins = 0 g.db.add(autojanny) votes = option.votes coinsperperson = int(pool / len(votes)) text = f"You won {coinsperperson} coins betting on {parent.permalink} :marseyparty:" cid = notif_comment(text) for vote in votes: u = vote.user u.pay_account('coins', coinsperperson) add_notif(cid, u.id, text) text = f"You lost the {POLL_BET_COINS} coins you bet on {parent.permalink} :marseylaugh:" cid = notif_comment(text) losing_voters = [] for o in parent.options: if o.exclusive == 2: losing_voters.extend([x.user_id for x in o.votes]) for uid in losing_voters: add_notif(cid, uid, text) if isinstance(parent, Submission): ma = ModAction( kind="distribute", user_id=v.id, target_submission_id=parent.id ) else: ma = ModAction( kind="distribute", user_id=v.id, target_comment_id=parent.id ) g.db.add(ma) return {"message": f"Each winner has received {coinsperperson} coins!"} @app.post("/@/revert_actions") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['ADMIN_ACTIONS_REVERT']) def revert_actions(v:User, username): revertee = get_user(username) if revertee.admin_level > v.admin_level: abort(403) ma = ModAction( kind="revert", user_id=v.id, target_user_id=revertee.id ) g.db.add(ma) cutoff = int(time.time()) - 86400 posts = [x[0] for x in g.db.query(ModAction.target_submission_id).filter(ModAction.user_id == revertee.id, ModAction.created_utc > cutoff, ModAction.kind == 'ban_post').all()] posts = g.db.query(Submission).filter(Submission.id.in_(posts)).all() comments = [x[0] for x in g.db.query(ModAction.target_comment_id).filter(ModAction.user_id == revertee.id, ModAction.created_utc > cutoff, ModAction.kind == 'ban_comment').all()] comments = g.db.query(Comment).filter(Comment.id.in_(comments)).all() for item in posts + comments: item.is_banned = False item.ban_reason = None item.is_approved = v.id g.db.add(item) users = (x[0] for x in g.db.query(ModAction.target_user_id).filter(ModAction.user_id == revertee.id, ModAction.created_utc > cutoff, ModAction.kind.in_(('shadowban', 'ban_user'))).all()) users = g.db.query(User).filter(User.id.in_(users)).all() for user in users: user.shadowbanned = None user.unban_utc = 0 user.ban_reason = None if user.is_banned: user.is_banned = None send_repeatable_notification(user.id, f"@{v.username} (a site admin) has unbanned you!") g.db.add(user) for u in get_alt_graph(user.id): u.shadowbanned = None u.unban_utc = 0 u.ban_reason = None if u.is_banned: u.is_banned = None send_repeatable_notification(u.id, f"@{v.username} (a site admin) has unbanned you!") g.db.add(u) return {"message": f"@{revertee.username}'s admin actions have been reverted!"} @app.get("/admin/shadowbanned") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_SHADOWBAN']) def shadowbanned(v): users = g.db.query(User) \ .filter( User.shadowbanned != None, ) \ .order_by(User.ban_reason).all() return render_template("admin/shadowbanned.html", v=v, users=users) @app.get("/admin/image_posts") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def image_posts_listing(v): try: page = int(request.values.get('page', 1)) except: page = 1 posts = g.db.query(Submission).options( load_only(Submission.id, Submission.url) ).order_by(Submission.id.desc()) posts = [x.id for x in posts if x.is_image] total = len(posts) firstrange = PAGE_SIZE * (page - 1) secondrange = firstrange + PAGE_SIZE posts = posts[firstrange:secondrange] posts = get_posts(posts, v=v) return render_template("admin/image_posts.html", v=v, listing=posts, total=total, page=page, sort="new") @app.get("/admin/reported/posts") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def reported_posts(v): page = get_page() listing = g.db.query(Submission).options(load_only(Submission.id)).filter_by( is_approved=None, is_banned=False, deleted_utc=0 ).join(Submission.flags) total = listing.count() listing = listing.order_by(Submission.id.desc()).offset(PAGE_SIZE * (page - 1)).limit(PAGE_SIZE) listing = [p.id for p in listing] listing = get_posts(listing, v=v) return render_template("admin/reported_posts.html", total=total, listing=listing, page=page, v=v) @app.get("/admin/reported/comments") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def reported_comments(v): page = get_page() listing = g.db.query(Comment).options(load_only(Comment.id)).filter_by( is_approved=None, is_banned=False, deleted_utc=0 ).join(Comment.flags) total = listing.count() listing = listing.order_by(Comment.id.desc()).offset(PAGE_SIZE * (page - 1)).limit(PAGE_SIZE) listing = [c.id for c in listing] listing = get_comments(listing, v=v) return render_template("admin/reported_comments.html", total=total, listing=listing, page=page, v=v, standalone=True) @app.get("/admin") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['ADMIN_HOME_VISIBLE']) def admin_home(v): return render_template("admin/admin_home.html", v=v) @app.post("/admin/site_settings/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['SITE_SETTINGS']) def change_settings(v:User, setting): if setting not in get_settings().keys(): abort(404, f"Setting '{setting}' not found") if setting == "offline_mode" and v.admin_level < PERMS["SITE_OFFLINE_MODE"]: abort(403, "You can't change this setting!") val = toggle_setting(setting) if val: word = 'enable' else: word = 'disable' if setting == "under_attack": new_security_level = 'under_attack' if val else 'high' if not set_security_level(new_security_level): abort(400, f'Failed to {word} under attack mode') ma = ModAction( kind=f"{word}_{setting}", user_id=v.id, ) g.db.add(ma) return {'message': f"{setting.replace('_', ' ').title()} {word}d successfully!"} @app.post("/admin/clear_cloudflare_cache") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['SITE_CACHE_PURGE_CDN']) def clear_cloudflare_cache(v): if not clear_entire_cache(): abort(400, 'Failed to clear cloudflare cache!') ma = ModAction( kind="clear_cloudflare_cache", user_id=v.id ) g.db.add(ma) return {"message": "Cloudflare cache cleared!"} def admin_badges_grantable_list(v): query = g.db.query(BadgeDef) if BADGE_BLACKLIST and v.admin_level < PERMS['IGNORE_BADGE_BLACKLIST']: query = query.filter(BadgeDef.id.notin_(BADGE_BLACKLIST)) badge_types = query.order_by(BadgeDef.id).all() return badge_types @app.get("/admin/badge_grant") @app.get("/admin/badge_remove") @feature_required('BADGES') @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BADGES']) def badge_grant_get(v): grant = request.url.endswith("grant") badge_types = admin_badges_grantable_list(v) return render_template("admin/badge_admin.html", v=v, badge_types=badge_types, grant=grant) @app.post("/admin/badge_grant") @feature_required('BADGES') @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BADGES']) def badge_grant_post(v): badges = admin_badges_grantable_list(v) user = get_user(request.values.get("username"), graceful=True) if not user: error = "User not found!" if v.client: return {"error": error} return render_template("admin/badge_admin.html", v=v, badge_types=badges, grant=True, error=error) try: badge_id = int(request.values.get("badge_id")) except: abort(400) if badge_id not in [b.id for b in badges]: abort(403, "You can't grant this badge!") description = request.values.get("description") url = request.values.get("url") if url: if '\\' in url: abort(400) if url.startswith(SITE_FULL): url = url.split(SITE_FULL, 1)[1] existing = user.has_badge(badge_id) if existing: if url or description: existing.url = url existing.description = description g.db.add(existing) msg = "Badge attributes edited successfully!" if v.client: return {"message": msg} return render_template("admin/badge_admin.html", v=v, badge_types=badges, grant=True, msg=msg) error = "User already has that badge!" if v.client: return {"error": error} return render_template("admin/badge_admin.html", v=v, badge_types=badges, grant=True, error=error) new_badge = Badge( badge_id=badge_id, user_id=user.id, url=url, description=description ) g.db.add(new_badge) g.db.flush() if v.id != user.id: text = f"@{v.username} (a site admin) has given you the following profile badge:\n\n{new_badge.path}\n\n**{new_badge.name}**\n\n{new_badge.badge.description}" send_repeatable_notification(user.id, text) ma = ModAction( kind="badge_grant", user_id=v.id, target_user_id=user.id, _note=new_badge.name ) g.db.add(ma) msg = f"{new_badge.name} Badge granted to @{user.username} successfully!" if v.client: return {"message": msg} return render_template("admin/badge_admin.html", v=v, badge_types=badges, grant=True, msg=msg) @app.post("/admin/badge_remove") @feature_required('BADGES') @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BADGES']) def badge_remove_post(v): badges = admin_badges_grantable_list(v) user = get_user(request.values.get("username"), graceful=True) if not user: return render_template("admin/badge_admin.html", v=v, badge_types=badges, grant=False, error="User not found!") try: badge_id = int(request.values.get("badge_id")) except: abort(400) if badge_id not in [b.id for b in badges]: abort(403) badge = user.has_badge(badge_id) if not badge: return render_template("admin/badge_admin.html", v=v, badge_types=badges, grant=False, error="User doesn't have that badge!") if v.id != user.id: text = f"@{v.username} (a site admin) has removed the following profile badge from you:\n\n{badge.path}\n\n**{badge.name}**\n\n{badge.badge.description}" send_repeatable_notification(user.id, text) ma = ModAction( kind="badge_remove", user_id=v.id, target_user_id=user.id, _note=badge.name ) g.db.add(ma) g.db.delete(badge) return render_template("admin/badge_admin.html", v=v, badge_types=badges, grant=False, msg=f"{badge.name} Badge removed from @{user.username} successfully!") @app.get("/admin/alt_votes") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['VIEW_ALT_VOTES']) def alt_votes_get(v): u1 = request.values.get("u1") u2 = request.values.get("u2") if not u1 or not u2: return render_template("admin/alt_votes.html", v=v) u1 = get_user(u1) u2 = get_user(u2) u1_post_ups = g.db.query( Vote.submission_id).filter_by( user_id=u1.id, vote_type=1).all() u1_post_downs = g.db.query( Vote.submission_id).filter_by( user_id=u1.id, vote_type=-1).all() u1_comment_ups = g.db.query( CommentVote.comment_id).filter_by( user_id=u1.id, vote_type=1).all() u1_comment_downs = g.db.query( CommentVote.comment_id).filter_by( user_id=u1.id, vote_type=-1).all() u2_post_ups = g.db.query( Vote.submission_id).filter_by( user_id=u2.id, vote_type=1).all() u2_post_downs = g.db.query( Vote.submission_id).filter_by( user_id=u2.id, vote_type=-1).all() u2_comment_ups = g.db.query( CommentVote.comment_id).filter_by( user_id=u2.id, vote_type=1).all() u2_comment_downs = g.db.query( CommentVote.comment_id).filter_by( user_id=u2.id, vote_type=-1).all() data = {} data['u1_only_post_ups'] = len( [x for x in u1_post_ups if x not in u2_post_ups]) data['u2_only_post_ups'] = len( [x for x in u2_post_ups if x not in u1_post_ups]) data['both_post_ups'] = len(list(set(u1_post_ups) & set(u2_post_ups))) data['u1_only_post_downs'] = len( [x for x in u1_post_downs if x not in u2_post_downs]) data['u2_only_post_downs'] = len( [x for x in u2_post_downs if x not in u1_post_downs]) data['both_post_downs'] = len( list(set(u1_post_downs) & set(u2_post_downs))) data['u1_only_comment_ups'] = len( [x for x in u1_comment_ups if x not in u2_comment_ups]) data['u2_only_comment_ups'] = len( [x for x in u2_comment_ups if x not in u1_comment_ups]) data['both_comment_ups'] = len( list(set(u1_comment_ups) & set(u2_comment_ups))) data['u1_only_comment_downs'] = len( [x for x in u1_comment_downs if x not in u2_comment_downs]) data['u2_only_comment_downs'] = len( [x for x in u2_comment_downs if x not in u1_comment_downs]) data['both_comment_downs'] = len( list(set(u1_comment_downs) & set(u2_comment_downs))) data['u1_post_ups_unique'] = 100 * \ data['u1_only_post_ups'] // len(u1_post_ups) if u1_post_ups else 0 data['u2_post_ups_unique'] = 100 * \ data['u2_only_post_ups'] // len(u2_post_ups) if u2_post_ups else 0 data['u1_post_downs_unique'] = 100 * \ data['u1_only_post_downs'] // len( u1_post_downs) if u1_post_downs else 0 data['u2_post_downs_unique'] = 100 * \ data['u2_only_post_downs'] // len( u2_post_downs) if u2_post_downs else 0 data['u1_comment_ups_unique'] = 100 * \ data['u1_only_comment_ups'] // len( u1_comment_ups) if u1_comment_ups else 0 data['u2_comment_ups_unique'] = 100 * \ data['u2_only_comment_ups'] // len( u2_comment_ups) if u2_comment_ups else 0 data['u1_comment_downs_unique'] = 100 * \ data['u1_only_comment_downs'] // len( u1_comment_downs) if u1_comment_downs else 0 data['u2_comment_downs_unique'] = 100 * \ data['u2_only_comment_downs'] // len( u2_comment_downs) if u2_comment_downs else 0 return render_template("admin/alt_votes.html", u1=u1, u2=u2, v=v, data=data ) @app.get("/admin/alts/") @app.get("/@/alts/") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_LINK']) def admin_view_alts(v:User, username=None): u = get_user(username or request.values.get('username'), graceful=True) return render_template('admin/alts.html', v=v, u=u, alts=u.alts if u else None) @app.post('/@/alts/') @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_LINK']) def admin_add_alt(v:User, username): user1 = get_user(username) user2 = get_user(request.values.get('other_username')) if user1.id == user2.id: abort(400, "Can't add the same account as alts of each other") ids = [user1.id, user2.id] a = g.db.query(Alt).filter(Alt.user1.in_(ids), Alt.user2.in_(ids)).one_or_none() if a: abort(409, f"@{user1.username} and @{user2.username} are already known alts!") a = Alt( user1=user1.id, user2=user2.id, is_manual=True, ) g.db.add(a) g.db.flush() cache.delete_memoized(get_alt_graph_ids, user1.id) cache.delete_memoized(get_alt_graph_ids, user2.id) check_for_alts(user1) check_for_alts(user2) ma = ModAction( kind=f"link_accounts", user_id=v.id, target_user_id=user1.id, _note=f'with @{user2.username}' ) g.db.add(ma) return {"message": f"Linked @{user1.username} and @{user2.username} successfully!"} @app.post('/@/alts//deleted') @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_LINK']) def admin_delink_relink_alt(v:User, username, other): user1 = get_user(username) user2 = get_account(other) ids = [user1.id, user2.id] a = g.db.query(Alt).filter(Alt.user1.in_(ids), Alt.user2.in_(ids)).one_or_none() if not a: abort(404) g.db.delete(a) ma = ModAction( kind=f"delink_accounts", user_id=v.id, target_user_id=user1.id, _note=f'from @{user2.username}' ) g.db.add(ma) return {"message": f"Delinked @{user1.username} and @{user2.username} successfully!"} @app.get("/admin/removed/posts") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def admin_removed(v): page = get_page() listing = g.db.query(Submission).options(load_only(Submission.id)).join(Submission.author).filter( or_(Submission.is_banned==True, User.shadowbanned != None)) total = listing.count() listing = listing.order_by(Submission.id.desc()).offset(PAGE_SIZE * (page - 1)).limit(PAGE_SIZE).all() listing = [x.id for x in listing] posts = get_posts(listing, v=v) return render_template("admin/removed_posts.html", v=v, listing=posts, page=page, total=total ) @app.get("/admin/removed/comments") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def admin_removed_comments(v): page = get_page() listing = g.db.query(Comment).options(load_only(Comment.id)).join(Comment.author).filter( or_(Comment.is_banned==True, User.shadowbanned != None)) total = listing.count() listing = listing.order_by(Comment.id.desc()).offset(PAGE_SIZE * (page - 1)).limit(PAGE_SIZE).all() listing = [x.id for x in listing] comments = get_comments(listing, v=v) return render_template("admin/removed_comments.html", v=v, listing=comments, page=page, total=total ) @app.post("/unchud_user/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_AGENDAPOSTER']) def unagendaposter(id, v): if id.startswith('p_'): post_id = id.split('p_')[1] post = g.db.get(Submission, post_id) user = post.author elif id.startswith('c_'): comment_id = id.split('c_')[1] comment = g.db.get(Comment, comment_id) user = comment.author else: user = get_account(id) if not user.chudded_by: abort(403, "Jannies can't undo chud awards anymore!") user.agendaposter = 0 user.agendaposter_phrase = None user.chudded_by = None g.db.add(user) ma = ModAction( kind="unchud", user_id=v.id, target_user_id=user.id ) g.db.add(ma) badge = user.has_badge(58) if badge: g.db.delete(badge) send_repeatable_notification(user.id, f"@{v.username} (a site admin) has unchudded you.") return {"message": f"@{user.username} has been unchudded!"} @app.post("/shadowban/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_SHADOWBAN']) def shadowban(user_id, v): user = get_account(user_id) if user.admin_level > v.admin_level: abort(403) user.shadowbanned = v.id reason = request.values.get("reason", "").strip()[:256] if not reason: abort(400, "You need to submit a reason for shadowbanning!") reason = filter_emojis_only(reason) if len(reason) > 256: abort(400, "Ban reason too long!") user.ban_reason = reason g.db.add(user) check_for_alts(user) ma = ModAction( kind="shadowban", user_id=v.id, target_user_id=user.id, _note=f'reason: "{reason}"' ) g.db.add(ma) cache.delete_memoized(frontlist) return {"message": f"@{user.username} has been shadowbanned!"} @app.post("/unshadowban/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_SHADOWBAN']) def unshadowban(user_id, v): user = get_account(user_id) user.shadowbanned = None if not user.is_banned: user.ban_reason = None g.db.add(user) for alt in get_alt_graph(user.id): alt.shadowbanned = None if not alt.is_banned: alt.ban_reason = None g.db.add(alt) ma = ModAction( kind="unshadowban", user_id=v.id, target_user_id=user.id, ) g.db.add(ma) cache.delete_memoized(frontlist) return {"message": f"@{user.username} has been unshadowbanned!"} @app.post("/admin/title_change/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_TITLE_CHANGE']) def admin_title_change(user_id, v): user = get_account(user_id) new_name=request.values.get("title").strip()[:256] user.customtitleplain=new_name new_name = filter_emojis_only(new_name) new_name = censor_slurs(new_name, None) user=get_account(user.id) user.customtitle=new_name if request.values.get("locked"): user.flairchanged = int(time.time()) + 2629746 else: user.flairchanged = 0 badge = user.has_badge(96) if badge: g.db.delete(badge) g.db.add(user) if user.flairchanged: kind = "set_flair_locked" else: kind = "set_flair_notlocked" ma=ModAction( kind=kind, user_id=v.id, target_user_id=user.id, _note=f'"{new_name}"' ) g.db.add(ma) if user.flairchanged: message = f"@{v.username} (a site admin) has locked your flair to `{user.customtitleplain}`." else: message = f"@{v.username} (a site admin) has changed your flair to `{user.customtitleplain}`. You can change it back in the settings." send_repeatable_notification(user.id, message) return {"message": f"@{user.username}'s flair has been changed!"} @app.post("/ban_user/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BAN']) def ban_user(id, v): if id.startswith('p_'): post_id = id.split('p_')[1] post = g.db.get(Submission, post_id) user = post.author elif id.startswith('c_'): comment_id = id.split('c_')[1] comment = g.db.get(Comment, comment_id) user = comment.author else: user = get_account(id) if user.admin_level > v.admin_level: abort(403) if user.is_suspended_permanently: abort(403, f"@{user.username} is already banned permanently!") days = 0.0 try: days = float(request.values.get("days")) except: pass if days < 0: abort(400, "You can't bans people for negative days!") reason = request.values.get("reason", "").strip()[:256] if not reason: abort(400, "You need to submit a reason for banning!") reason = filter_emojis_only(reason) if len(reason) > 256: abort(400, "Ban reason too long!") reason = reason_regex_post.sub(r'\1', reason) reason = reason_regex_comment.sub(r'\1', reason) user.ban(admin=v, reason=reason, days=days) if request.values.get("alts"): for x in get_alt_graph(user.id): if x.admin_level > v.admin_level: continue x.ban(admin=v, reason=reason, days=days) duration = "permanently" if days: days_txt = str(days) if days_txt.endswith('.0'): days_txt = days_txt[:-2] duration = f"for {days_txt} day" if days != 1: duration += "s" if reason: text = f"@{v.username} (a site admin) has banned you for **{days_txt}** days for the following reason:\n\n> {reason}" else: text = f"@{v.username} (a site admin) has banned you for **{days_txt}** days." else: if reason: text = f"@{v.username} (a site admin) has banned you permanently for the following reason:\n\n> {reason}" else: text = f"@{v.username} (a site admin) has banned you permanently." send_repeatable_notification(user.id, text) note = f'duration: {duration}, reason: "{reason}"' ma=ModAction( kind="ban_user", user_id=v.id, target_user_id=user.id, _note=note ) g.db.add(ma) if 'reason' in request.values: if request.values["reason"].startswith("/post/"): try: post = int(request.values["reason"].split("/post/")[1].split(None, 1)[0]) except: abort(400) post = get_post(post) if post.sub != 'chudrama': post.bannedfor = f'{duration} by @{v.username}' g.db.add(post) elif request.values["reason"].startswith("/comment/"): try: comment = int(request.values["reason"].split("/comment/")[1].split(None, 1)[0]) except: abort(400) comment = get_comment(comment) comment.bannedfor = f'{duration} by @{v.username}' g.db.add(comment) return {"message": f"@{user.username} has been banned {duration}!"} @app.post("/chud_user/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_AGENDAPOSTER']) def agendaposter(id, v): if id.startswith('p_'): post_id = id.split('p_')[1] post = g.db.get(Submission, post_id) user = post.author elif id.startswith('c_'): comment_id = id.split('c_')[1] comment = g.db.get(Comment, comment_id) user = comment.author else: user = get_account(id) if user.admin_level > v.admin_level: abort(403) if user.agendaposter == 1: abort(403, f"@{user.username} is already chudded permanently!") if user.marsify: abort(403, f"You can't chud someone while they're marsified!") days = 0.0 try: days = float(request.values.get("days")) except: pass if days < 0: abort(400, "You can't chud people for negative days!") reason = request.values.get("reason", "").strip() reason = filter_emojis_only(reason) reason = reason_regex_post.sub(r'\1', reason) reason = reason_regex_comment.sub(r'\1', reason) if days: if user.agendaposter: user.agendaposter += days * 86400 else: user.agendaposter = int(time.time()) + (days * 86400) days_txt = str(days) if days_txt.endswith('.0'): days_txt = days_txt[:-2] duration = f"for {days_txt} day" if days != 1: duration += "s" else: user.agendaposter = 1 duration = "permanently" user.agendaposter_phrase = "trans lives matter" text = f"@{v.username} (a site admin) has chudded you **{duration}**" if reason: text += f" for the following reason:\n\n> {reason}" text += f"\n\n**You now have to say this phrase in all posts and comments you make {duration}:**\n\n> {user.agendaposter_phrase}" user.chudded_by = v.id g.db.add(user) send_repeatable_notification(user.id, text) note = f'duration: {duration}' if reason: note += f', reason: "{reason}"' ma=ModAction( kind="chud", user_id=v.id, target_user_id=user.id, _note=note ) g.db.add(ma) badge_grant(user=user, badge_id=58) if 'reason' in request.values: if request.values["reason"].startswith("/post/"): try: post = int(request.values["reason"].split("/post/")[1].split(None, 1)[0]) except: abort(400) post = get_post(post) if post.sub == 'chudrama': abort(403, "You can't chud people in /h/chudrama") post.chuddedfor = f'{duration} by @{v.username}' g.db.add(post) elif request.values["reason"].startswith("/comment/"): try: comment = int(request.values["reason"].split("/comment/")[1].split(None, 1)[0]) except: abort(400) comment = get_comment(comment) if comment.post.sub == 'chudrama': abort(403, "You can't chud people in /h/chudrama") comment.chuddedfor = f'{duration} by @{v.username}' g.db.add(comment) return {"message": f"@{user.username} has been chudded {duration}!"} @app.post("/unban_user/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BAN']) def unban_user(id, v): if id.startswith('p_'): post_id = id.split('p_')[1] post = g.db.get(Submission, post_id) user = post.author elif id.startswith('c_'): comment_id = id.split('c_')[1] comment = g.db.get(Comment, comment_id) user = comment.author else: user = get_account(id) if not user.is_banned: abort(400) if FEATURES['AWARDS'] and user.ban_reason and user.ban_reason.startswith('1-Day ban award'): abort(403, "You can't undo a ban award!") user.is_banned = None user.unban_utc = 0 user.ban_reason = None send_repeatable_notification(user.id, f"@{v.username} (a site admin) has unbanned you!") g.db.add(user) for x in get_alt_graph(user.id): if x.is_banned: send_repeatable_notification(x.id, f"@{v.username} (a site admin) has unbanned you!") x.is_banned = None x.unban_utc = 0 x.ban_reason = None g.db.add(x) ma=ModAction( kind="unban_user", user_id=v.id, target_user_id=user.id, ) g.db.add(ma) return {"message": f"@{user.username} has been unbanned!"} @app.post("/mute_user/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BAN']) def mute_user(v:User, user_id): user = get_account(user_id) if not user.is_muted: user.is_muted = True ma = ModAction( kind='mod_mute_user', user_id=v.id, target_user_id=user.id, ) g.db.add(user) g.db.add(ma) check_for_alts(user) return {"message": f"@{user.username} has been muted!"} @app.post("/unmute_user/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BAN']) def unmute_user(v:User, user_id): user = get_account(user_id) if user.is_muted: user.is_muted = False ma = ModAction( kind='mod_unmute_user', user_id=v.id, target_user_id=user.id, ) g.db.add(user) g.db.add(ma) return {"message": f"@{user.username} has been unmuted!"} @app.post("/admin/progstack/post/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['PROGSTACK']) def progstack_post(post_id, v): post = get_post(post_id) post.is_approved = PROGSTACK_ID post.realupvotes = floor(post.realupvotes * PROGSTACK_MUL) g.db.add(post) ma=ModAction( kind="progstack_post", user_id=v.id, target_submission_id=post.id, ) g.db.add(ma) cache.delete_memoized(frontlist) return {"message": "Progressive stack applied on post!"} @app.post("/admin/unprogstack/post/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['PROGSTACK']) def unprogstack_post(post_id, v): post = get_post(post_id) post.is_approved = None g.db.add(post) ma=ModAction( kind="unprogstack_post", user_id=v.id, target_submission_id=post.id, ) g.db.add(ma) return {"message": "Progressive stack removed from post!"} @app.post("/admin/progstack/comment/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['PROGSTACK']) def progstack_comment(comment_id, v): comment = get_comment(comment_id) comment.is_approved = PROGSTACK_ID comment.realupvotes = floor(comment.realupvotes * PROGSTACK_MUL) g.db.add(comment) ma=ModAction( kind="progstack_comment", user_id=v.id, target_comment_id=comment.id, ) g.db.add(ma) cache.delete_memoized(comment_idlist) return {"message": "Progressive stack applied on comment!"} @app.post("/admin/unprogstack/comment/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['PROGSTACK']) def unprogstack_comment(comment_id, v): comment = get_comment(comment_id) comment.is_approved = None g.db.add(comment) ma=ModAction( kind="unprogstack_comment", user_id=v.id, target_comment_id=comment.id, ) g.db.add(ma) return {"message": "Progressive stack removed from comment!"} @app.post("/remove_post/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def remove_post(post_id, v): post = get_post(post_id) post.is_banned = True post.is_approved = None if not FEATURES['AWARDS'] or not post.stickied or not post.stickied.endswith(PIN_AWARD_TEXT): post.stickied = None post.is_pinned = False post.ban_reason = v.username g.db.add(post) ma=ModAction( kind="ban_post", user_id=v.id, target_submission_id=post.id, ) g.db.add(ma) cache.delete_memoized(frontlist) v.pay_account('coins', 1) g.db.add(v) purge_files_in_cache(f"https://{SITE}/") return {"message": "Post removed!"} @app.post("/approve_post/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def approve_post(post_id, v): post = get_post(post_id) if not complies_with_chud(post): abort(400, "You can't bypass the chud award!") if post.is_banned: ma=ModAction( kind="unban_post", user_id=v.id, target_submission_id=post.id, ) g.db.add(ma) post.is_banned = False post.ban_reason = None post.is_approved = v.id g.db.add(post) cache.delete_memoized(frontlist) v.charge_account('coins', 1) g.db.add(v) return {"message": "Post approved!"} @app.post("/distinguish/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_DISTINGUISH']) def distinguish_post(post_id, v): post = get_post(post_id) if post.distinguish_level: post.distinguish_level = 0 kind = 'undistinguish_post' else: post.distinguish_level = v.admin_level kind = 'distinguish_post' g.db.add(post) ma = ModAction( kind=kind, user_id=v.id, target_submission_id=post.id ) g.db.add(ma) if post.distinguish_level: return {"message": "Post distinguished!"} else: return {"message": "Post undistinguished!"} @app.post("/sticky/") @feature_required('PINS') @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def sticky_post(post_id, v): post = get_post(post_id) if post.is_banned: abort(403, "Can't sticky removed posts!") if FEATURES['AWARDS'] and post.stickied and post.stickied.endswith(PIN_AWARD_TEXT) and v.admin_level < PERMS["UNDO_AWARD_PINS"]: abort(403, "Can't pin award pins!") pins = g.db.query(Submission).filter(Submission.stickied != None, Submission.is_banned == False).count() if not post.stickied_utc: post.stickied_utc = int(time.time()) + 3600 pin_time = 'for 1 hour' code = 200 if v.id != post.author_id: send_repeatable_notification(post.author_id, f"@{v.username} (a site admin) has pinned [{post.title}](/post/{post_id})") else: if pins >= PIN_LIMIT + 1: abort(403, f"Can't exceed {PIN_LIMIT} pinned posts limit!") post.stickied_utc = None pin_time = 'permanently' code = 201 post.stickied = v.username g.db.add(post) ma=ModAction( kind="pin_post", user_id=v.id, target_submission_id=post.id, _note=pin_time ) g.db.add(ma) cache.delete_memoized(frontlist) return {"message": f"Post pinned {pin_time}!"}, code @app.post("/unsticky/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def unsticky_post(post_id, v): post = get_post(post_id) if post.stickied: if FEATURES['AWARDS'] and post.stickied.endswith(PIN_AWARD_TEXT) and v.admin_level < PERMS["UNDO_AWARD_PINS"]: abort(403, "Can't unpin award pins!") if post.author_id == LAWLZ_ID and post.stickied_utc and SITE_NAME == 'rDrama': abort(403, "Can't unpin lawlzposts!") post.stickied = None post.stickied_utc = None g.db.add(post) ma=ModAction( kind="unpin_post", user_id=v.id, target_submission_id=post.id ) g.db.add(ma) if v.id != post.author_id: send_repeatable_notification(post.author_id, f"@{v.username} (a site admin) has unpinned [{post.title}](/post/{post_id})") cache.delete_memoized(frontlist) return {"message": "Post unpinned!"} @app.post("/sticky_comment/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def sticky_comment(cid, v): comment = get_comment(cid, v=v) if comment.is_banned: abort(403, "Can't sticky removed comments!") if FEATURES['AWARDS'] and comment.stickied and comment.stickied.endswith(PIN_AWARD_TEXT) and v.admin_level < PERMS["UNDO_AWARD_PINS"]: abort(403, "Can't pin award pins!") if not comment.stickied: comment.stickied = v.username g.db.add(comment) ma=ModAction( kind="pin_comment", user_id=v.id, target_comment_id=comment.id ) g.db.add(ma) if v.id != comment.author_id: message = f"@{v.username} (a site admin) has pinned your [comment]({comment.shortlink})" send_repeatable_notification(comment.author_id, message) c = comment while c.level > 2: c = c.parent_comment c.stickied_child_id = comment.id g.db.add(c) return {"message": "Comment pinned!"} @app.post("/unsticky_comment/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def unsticky_comment(cid, v): comment = get_comment(cid, v=v) if comment.stickied: if FEATURES['AWARDS'] and comment.stickied.endswith(PIN_AWARD_TEXT) and v.admin_level < PERMS["UNDO_AWARD_PINS"]: abort(403, "Can't unpin award pins!") comment.stickied = None g.db.add(comment) ma=ModAction( kind="unpin_comment", user_id=v.id, target_comment_id=comment.id ) g.db.add(ma) if v.id != comment.author_id: message = f"@{v.username} (a site admin) has unpinned your [comment]({comment.shortlink})" send_repeatable_notification(comment.author_id, message) cleanup = g.db.query(Comment).filter_by(stickied_child_id=comment.id).all() for c in cleanup: c.stickied_child_id = None g.db.add(c) return {"message": "Comment unpinned!"} @app.post("/remove_comment/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def remove_comment(c_id, v): comment = get_comment(c_id) comment.is_banned = True comment.is_approved = None comment.ban_reason = v.username g.db.add(comment) ma=ModAction( kind="ban_comment", user_id=v.id, target_comment_id=comment.id, ) g.db.add(ma) return {"message": "Comment removed!"} @app.post("/approve_comment/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def approve_comment(c_id, v): comment = get_comment(c_id) if not complies_with_chud(comment): abort(400, "You can't bypass the chud award!") if comment.is_banned: ma=ModAction( kind="unban_comment", user_id=v.id, target_comment_id=comment.id, ) g.db.add(ma) comment.is_banned = False comment.ban_reason = None comment.is_approved = v.id g.db.add(comment) return {"message": "Comment approved!"} @app.post("/distinguish_comment/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_DISTINGUISH']) def admin_distinguish_comment(c_id, v): comment = get_comment(c_id, v=v) if comment.distinguish_level: comment.distinguish_level = 0 kind = 'undistinguish_comment' else: comment.distinguish_level = v.admin_level kind = 'distinguish_comment' g.db.add(comment) ma = ModAction( kind=kind, user_id=v.id, target_comment_id=comment.id ) g.db.add(ma) if comment.distinguish_level: return {"message": "Comment distinguished!"} else: return {"message": "Comment undistinguished!"} @app.get("/admin/banned_domains/") @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['DOMAINS_BAN']) def admin_banned_domains(v): banned_domains = g.db.query(BannedDomain) \ .order_by(BannedDomain.reason).all() return render_template("admin/banned_domains.html", v=v, banned_domains=banned_domains) @app.post("/admin/ban_domain") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['DOMAINS_BAN']) def ban_domain(v): domain=request.values.get("domain", "").strip().lower() if not domain: abort(400) reason=request.values.get("reason", "").strip() if not reason: abort(400, 'Reason is required!') if len(reason) > 100: abort(400, 'Reason is too long (max 100 characters)!') if len(reason) > 100: abort(400, 'Reason is too long!') existing = g.db.get(BannedDomain, domain) if not existing: d = BannedDomain(domain=domain, reason=reason) g.db.add(d) ma = ModAction( kind="ban_domain", user_id=v.id, _note=filter_emojis_only(f'{domain}, reason: {reason}') ) g.db.add(ma) return redirect("/admin/banned_domains/") @app.post("/admin/unban_domain/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['DOMAINS_BAN']) def unban_domain(v:User, domain): existing = g.db.get(BannedDomain, domain) if not existing: abort(400, 'Domain is not banned!') g.db.delete(existing) ma = ModAction( kind="unban_domain", user_id=v.id, _note=filter_emojis_only(domain) ) g.db.add(ma) return {"message": f"{domain} has been unbanned!"} @app.post("/admin/nuke_user") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def admin_nuke_user(v): user=get_user(request.values.get("user")) for post in g.db.query(Submission).filter_by(author_id=user.id).all(): if post.is_banned: continue post.is_banned = True post.ban_reason = v.username g.db.add(post) for comment in g.db.query(Comment).filter_by(author_id=user.id).all(): if comment.is_banned: continue comment.is_banned = True comment.ban_reason = v.username g.db.add(comment) ma=ModAction( kind="nuke_user", user_id=v.id, target_user_id=user.id, ) g.db.add(ma) return {"message": f"@{user.username}'s content has been removed!"} @app.post("/admin/unnuke_user") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['POST_COMMENT_MODERATION']) def admin_nunuke_user(v): user=get_user(request.values.get("user")) for post in g.db.query(Submission).filter_by(author_id=user.id).all(): if not post.is_banned: continue post.is_banned = False post.ban_reason = None post.is_approved = v.id g.db.add(post) for comment in g.db.query(Comment).filter_by(author_id=user.id).all(): if not comment.is_banned: continue comment.is_banned = False comment.ban_reason = None comment.is_approved = v.id g.db.add(comment) ma=ModAction( kind="unnuke_user", user_id=v.id, target_user_id=user.id, ) g.db.add(ma) return {"message": f"@{user.username}'s content has been approved!"} @app.post("/blacklist/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BLACKLIST']) def blacklist_user(user_id, v): user = get_account(user_id) if user.admin_level > v.admin_level: abort(403) user.blacklisted_by = v.id g.db.add(user) check_for_alts(user) ma = ModAction( kind="blacklist_user", user_id=v.id, target_user_id=user.id ) g.db.add(ma) return {"message": f"@{user.username} has been blacklisted from restricted holes!"} @app.post("/unblacklist/") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['USER_BLACKLIST']) def unblacklist_user(user_id, v): user = get_account(user_id) user.blacklisted_by = None g.db.add(user) for alt in get_alt_graph(user.id): alt.blacklisted_by = None g.db.add(alt) ma = ModAction( kind="unblacklist_user", user_id=v.id, target_user_id=user.id ) g.db.add(ma) return {"message": f"@{user.username} has been unblacklisted from restricted holes!"} @app.get('/admin/delete_media') @limiter.limit(DEFAULT_RATELIMIT) @limiter.limit(DEFAULT_RATELIMIT, key_func=get_ID) @admin_level_required(PERMS['DELETE_MEDIA']) def delete_media_get(v): return render_template("admin/delete_media.html", v=v) @app.post("/admin/delete_media") @limiter.limit('1/second', scope=rpath) @limiter.limit('1/second', scope=rpath, key_func=get_ID) @limiter.limit("50/day") @limiter.limit("50/day", key_func=get_ID) @admin_level_required(PERMS['DELETE_MEDIA']) def delete_media_post(v): url = request.values.get("url") if not url: return render_template("admin/delete_media.html", v=v, url=url, error="No url provided!") if not image_link_regex.fullmatch(url) and not video_link_regex.fullmatch(url): return render_template("admin/delete_media.html", v=v, url=url, error="Invalid url!") path = url.split(SITE)[1] if not os.path.isfile(path): return render_template("admin/delete_media.html", v=v, url=url, error="File not found on the server!") os.remove(path) ma=ModAction( kind="delete_media", user_id=v.id, _note=url, ) g.db.add(ma) purge_files_in_cache(url) return render_template("admin/delete_media.html", v=v, msg="Media deleted successfully!")