remove typing (useless code bloat)

master
Aevann 2023-07-30 03:42:06 +03:00
parent 51d6d6f3cf
commit 8beb2d7e8e
38 changed files with 245 additions and 261 deletions

View File

@ -1,4 +1,3 @@
from typing import Any, Callable, Optional, Tuple, Union
from sqlalchemy import Column, func
from flask import g
@ -25,9 +24,8 @@ class Leaderboard:
user_func = None
value_func = None
def __init__(self, header_name:str, table_header_name:str, html_id:str, table_column_name:str,
user_relative_url:Optional[str], query_function:Callable[..., Tuple[Any, Any, Any]],
criteria, v:User, value_func:Optional[Callable[[User], Union[int, Column]]], users, limit=LEADERBOARD_LIMIT):
def __init__(self, header_name, table_header_name, html_id, table_column_name,
user_relative_url, query_function, criteria, v, value_func, users, limit=LEADERBOARD_LIMIT):
self.header_name = header_name
self.table_header_name = table_header_name
self.html_id = html_id
@ -48,7 +46,7 @@ class Leaderboard:
self.value_func = lambda u: u[1] or 0
@classmethod
def get_simple_lb(cls, order_by, v:User, users, limit:int):
def get_simple_lb(cls, order_by, v, users, limit):
leaderboard = users.order_by(order_by.desc()).limit(limit).all()
position = None
if v not in leaderboard:
@ -65,7 +63,7 @@ class Leaderboard:
return func.rank().over(order_by=func.count(criteria).desc()).label("rank")
@classmethod
def get_badge_emoji_lb(cls, lb_criteria, v:User, users:Any, limit):
def get_badge_emoji_lb(cls, lb_criteria, v, users, limit):
sq = g.db.query(lb_criteria, cls.count_and_label(lb_criteria), cls.rank_filtered_rank_label_by_desc(lb_criteria))
if lb_criteria == Emoji.author_id:
sq = sq.filter(Emoji.kind.in_(["Marsey", "Platy", "Wolf", "Capy", "Carp", "Marsey Flags", "Marsey Alphabet"]))
@ -87,7 +85,7 @@ class Leaderboard:
return (leaderboard, position[0], position[1])
@classmethod
def get_blockers_lb(cls, lb_criteria, v:User, users:Any, limit):
def get_blockers_lb(cls, lb_criteria, v, users, limit):
if lb_criteria != UserBlock.target_id:
raise ValueError("This leaderboard function only supports UserBlock.target_id")
sq = g.db.query(lb_criteria, cls.count_and_label(lb_criteria)).group_by(lb_criteria).subquery()
@ -100,7 +98,7 @@ class Leaderboard:
return (leaderboard, position[0], position[1])
@classmethod
def get_hat_lb(cls, lb_criteria, v:User, users:Any, limit):
def get_hat_lb(cls, lb_criteria, v, users, limit):
leaderboard = g.db.query(User, func.count(lb_criteria)).join(lb_criteria).group_by(User).order_by(func.count(lb_criteria).desc())
sq = g.db.query(User.id, cls.count_and_label(lb_criteria), cls.rank_filtered_rank_label_by_desc(lb_criteria)).join(lb_criteria).group_by(User).subquery()
position = g.db.query(sq.c.rank, sq.c.count).filter(sq.c.id == v.id).limit(1).one_or_none()
@ -109,7 +107,7 @@ class Leaderboard:
return (leaderboard, position[0], position[1])
@classmethod
def get_upvotes_lb(cls, lb_criteria, v, users:Any, limit):
def get_upvotes_lb(cls, lb_criteria, v, users, limit):
users13 = cache.get("users13") or []
users13_1 = cache.get("users13_1") or []
users13_2 = cache.get("users13_2") or []
@ -125,7 +123,7 @@ class Leaderboard:
return (users13_accs, pos13[0], pos13[1])
@classmethod
def get_downvotes_lb(cls, lb_criteria, v:User, users:Any, limit):
def get_downvotes_lb(cls, lb_criteria, v, users, limit):
users9 = cache.get("users9") or []
users9_1 = cache.get("users9_1") or []
users9_2 = cache.get("users9_2") or []

View File

@ -1,6 +1,5 @@
import random
import time
from typing import Optional
from sqlalchemy import Column
from sqlalchemy.ext.mutable import MutableList
@ -55,7 +54,7 @@ class Sub(Base):
@property
@lazy
def has_banners(self) -> bool:
def has_banners(self):
return bool(self.bannerurls)
@property

View File

@ -1,6 +1,5 @@
import random
from operator import *
from typing import Union
import re
import pyotp
@ -480,7 +479,7 @@ class User(Base):
return self.offsitementions or self.admin_level >= PERMS['NOTIFICATIONS_REDDIT']
@lazy
def can_edit(self, target:Union[Post, Comment]) -> bool:
def can_edit(self, target):
if isinstance(target, Comment) and not target.post: return False
if self.id == target.author_id: return True
if not isinstance(target, Post): return False
@ -850,7 +849,7 @@ class User(Base):
return g.db.query(Follow).filter_by(target_id=self.id, user_id=user.id).one_or_none()
@lazy
def is_visible_to(self, user) -> bool:
def is_visible_to(self, user):
if not self.is_private: return True
if not user: return False
if self.id == user.id: return True
@ -1070,7 +1069,7 @@ class User(Base):
return f'{tier_name} - Donates ${tier_money}/month'
@classmethod
def can_see_content(cls, user:Optional["User"], other:Union[Post, Comment, Sub]) -> bool:
def can_see_content(cls, user, other):
'''
Whether a user can see this item (be it a post or comment)'s content.
If False, they won't be able to view its content.
@ -1087,7 +1086,7 @@ class User(Base):
return True
@classmethod
def can_see(cls, user:Optional["User"], other:Union[Post, Comment, Sub, "User"]) -> bool:
def can_see(cls, user, other):
'''
Whether a user can strictly see this item. can_see_content is used where
content of a thing can be hidden from view

View File

@ -1,6 +1,5 @@
import random
import time
from typing import Type
from urllib.parse import quote
from sqlalchemy.sql import func
import gevent
@ -26,8 +25,6 @@ from files.helpers.slots import check_slots_command
from files.routes.routehelpers import check_for_alts
post_target_type = Union[Post, User]
def _archiveorg(url):
try:
requests.get(f'https://web.archive.org/save/{url}',
@ -54,7 +51,7 @@ def snappy_report(post, reason):
message = f'@Snappy reported [{post.title}]({post.shortlink})\n\n> {reason}'
send_repeatable_notification(post.author_id, message)
def execute_snappy(post:Post, v:User):
def execute_snappy(post, v):
if post.sub and g.db.query(Exile.user_id).filter_by(user_id=SNAPPY_ID, sub=post.sub).one_or_none():
return
@ -259,7 +256,7 @@ def execute_snappy(post:Post, v:User):
post.comment_count += 1
post.replies = [c]
def execute_zozbot(c:Comment, level:int, post_target:post_target_type, v):
def execute_zozbot(c, level, post_target, v):
if SITE_NAME != 'rDrama': return
posting_to_post = isinstance(post_target, Post)
if random.random() >= 0.001: return
@ -324,7 +321,7 @@ def execute_zozbot(c:Comment, level:int, post_target:post_target_type, v):
push_notif({v.id}, f'New reply by @{c2.author_name}', "zoz", c2)
def execute_longpostbot(c:Comment, level:int, body, body_html, post_target:post_target_type, v:User):
def execute_longpostbot(c, level, body, body_html, post_target, v):
if SITE_NAME != 'rDrama': return
posting_to_post = isinstance(post_target, Post)
if not len(c.body.split()) >= 200: return
@ -425,7 +422,7 @@ def execute_antispam_post_check(title, v, url):
return False
return True
def execute_antispam_duplicate_comment_check(v:User, body_html:str):
def execute_antispam_duplicate_comment_check(v, body_html):
if v.admin_level >= PERMS['USE_ADMIGGER_THREADS']: return
'''
@ -445,7 +442,7 @@ def execute_antispam_duplicate_comment_check(v:User, body_html:str):
g.db.commit()
abort(403, "Too much spam!")
def execute_antispam_comment_check(body:str, v:User):
def execute_antispam_comment_check(body, v):
if v.admin_level >= PERMS['USE_ADMIGGER_THREADS']: return
if v.id in ANTISPAM_BYPASS_IDS: return
@ -483,7 +480,7 @@ def execute_antispam_comment_check(body:str, v:User):
g.db.commit()
abort(403, "Too much spam!")
def execute_dylan(v:User):
def execute_dylan(v):
if "dylan" in v.username.lower() and "hewitt" in v.username.lower():
v.shadowbanned = AUTOJANNY_ID
v.ban_reason = "Dylan"
@ -496,7 +493,7 @@ def execute_dylan(v:User):
)
g.db.add(ma)
def execute_under_siege(v:User, target:Optional[Union[Post, Comment]], body, kind:str) -> bool:
def execute_under_siege(v, target, body, kind):
if v.shadowbanned: return
if SITE == 'watchpeopledie.tv':
@ -546,7 +543,7 @@ def execute_under_siege(v:User, target:Optional[Union[Post, Comment]], body, kin
g.db.add(n)
def execute_lawlz_actions(v:User, p:Post):
def execute_lawlz_actions(v, p):
if v.id != LAWLZ_ID: return
if SITE_NAME != 'rDrama': return
if not FEATURES['PINS']: return
@ -577,7 +574,7 @@ def execute_lawlz_actions(v:User, p:Post):
g.db.add(ma_3)
def process_poll_options(v:User, target:Union[Post, Comment]):
def process_poll_options(v, target):
patterns = [(poll_regex, 0), (choice_regex, 1)]

View File

@ -24,7 +24,7 @@ def get_game_feed(game):
return list(map(format_game, games))
def get_user_stats(u:User, game:str, include_ties=False):
def get_user_stats(u, game, include_ties=False):
games = g.db.query(CasinoGame.user_id, CasinoGame.winnings).filter(CasinoGame.kind == game, CasinoGame.user_id == u.id)
wins = games.filter(CasinoGame.winnings > 0).count()
ties = games.filter(CasinoGame.winnings == 0).count() if include_ties else 0

View File

@ -1,5 +1,4 @@
import json
from typing import List, Optional, Union
import requests
@ -10,7 +9,7 @@ CLOUDFLARE_REQUEST_TIMEOUT_SECS = 5
CLOUDFLARE_AVAILABLE = CF_ZONE and CF_ZONE != DEFAULT_CONFIG_VALUE
def _request_from_cloudflare(url:str, method:str, post_data_str) -> bool:
def _request_from_cloudflare(url, method, post_data_str):
if not CLOUDFLARE_AVAILABLE: return False
try:
res = str(requests.request(method, f"{CLOUDFLARE_API_URL}/zones/{CF_ZONE}/{url}", headers=CF_HEADERS, data=post_data_str, timeout=CLOUDFLARE_REQUEST_TIMEOUT_SECS))
@ -18,13 +17,13 @@ def _request_from_cloudflare(url:str, method:str, post_data_str) -> bool:
return False
return res == "<Response [200]>"
def set_security_level(under_attack="high") -> bool:
def set_security_level(under_attack="high"):
return _request_from_cloudflare("settings/security_level", "PATCH", f'{{"value":"{under_attack}"}}')
def clear_entire_cache() -> bool:
def clear_entire_cache():
return _request_from_cloudflare("purge_cache", "POST", '{"purge_everything":true}')
def purge_files_in_cache(files:Union[List[str],str]) -> bool:
def purge_files_in_cache(files):
if not CLOUDFLARE_AVAILABLE: return False
if isinstance(files, str):
files = [files]

View File

@ -1,4 +1,3 @@
from typing import Callable, Iterable, List, Optional, Union
from flask import *
from sqlalchemy import and_, any_, or_
@ -8,11 +7,11 @@ from files.classes import Comment, CommentVote, Hat, Sub, Post, User, UserBlock,
from files.helpers.config.const import *
from files.__main__ import cache
def sanitize_username(username:str) -> str:
def sanitize_username(username):
if not username: return username
return username.replace('\\', '').replace('_', '\_').replace('%', '').replace('(', '').replace(')', '').strip()
def get_id(username:str, graceful=False) -> Optional[int]:
def get_id(username, graceful=False):
username = sanitize_username(username)
if not username:
if graceful: return None
@ -33,7 +32,7 @@ def get_id(username:str, graceful=False) -> Optional[int]:
return user[0]
def get_user(username:Optional[str], v:Optional[User]=None, graceful=False, include_blocks=False) -> Optional[User]:
def get_user(username, v=None, graceful=False, include_blocks=False):
if not username:
if graceful: return None
abort(404)
@ -62,7 +61,7 @@ def get_user(username:Optional[str], v:Optional[User]=None, graceful=False, incl
user = add_block_props(user, v)
return user
def get_users(usernames:Iterable[str], ids_only=False, graceful=False) -> List[User]:
def get_users(usernames, ids_only=False, graceful=False):
if not usernames: return []
usernames = [sanitize_username(n) for n in usernames]
if not any(usernames):
@ -90,7 +89,7 @@ def get_users(usernames:Iterable[str], ids_only=False, graceful=False) -> List[U
return users
def get_account(id:Union[str, int], v:Optional[User]=None, graceful=False, include_blocks=False) -> Optional[User]:
def get_account(id, v=None, graceful=False, include_blocks=False):
try:
id = int(id)
except:
@ -108,7 +107,7 @@ def get_account(id:Union[str, int], v:Optional[User]=None, graceful=False, inclu
return user
def get_accounts_dict(ids:Union[Iterable[str], Iterable[int]], v:Optional[User]=None, graceful=False) -> Optional[dict[int, User]]:
def get_accounts_dict(ids, v=None, graceful=False):
if not ids: return {}
try:
ids = set([int(id) for id in ids])
@ -121,7 +120,7 @@ def get_accounts_dict(ids:Union[Iterable[str], Iterable[int]], v:Optional[User]=
if len(users) != len(ids) and not graceful: abort(404)
return {u.id:u for u in users}
def get_post(i:Union[str, int], v:Optional[User]=None, graceful=False) -> Optional[Post]:
def get_post(i, v=None, graceful=False):
try: i = int(i)
except:
if graceful: return None
@ -169,7 +168,7 @@ def get_post(i:Union[str, int], v:Optional[User]=None, graceful=False) -> Option
return x
def get_posts(pids:Iterable[int], v:Optional[User]=None, extra:Optional[Callable[[Query], Query]]=None) -> List[Post]:
def get_posts(pids, v=None, extra=None):
if not pids: return []
if v:
@ -215,7 +214,7 @@ def get_posts(pids:Iterable[int], v:Optional[User]=None, extra:Optional[Callable
return sorted(output, key=lambda x: pids.index(x.id))
def get_comment(i:Union[str, int], v:Optional[User]=None, graceful=False) -> Optional[Comment]:
def get_comment(i, v=None, graceful=False):
try: i = int(i)
except:
if graceful: return None
@ -232,7 +231,7 @@ def get_comment(i:Union[str, int], v:Optional[User]=None, graceful=False) -> Opt
return add_vote_and_block_props(comment, v, CommentVote)
def add_block_props(target:Union[Post, Comment, User], v:Optional[User]):
def add_block_props(target, v):
if not v: return target
id = None
@ -267,7 +266,7 @@ def add_block_props(target:Union[Post, Comment, User], v:Optional[User]):
target.is_blocked = block and block.target_id == v.id
return target
def add_vote_props(target:Union[Post, Comment], v:Optional[User], vote_cls):
def add_vote_props(target, v, vote_cls):
if hasattr(target, 'voted'): return target
vt = g.db.query(vote_cls.vote_type).filter_by(user_id=v.id)
@ -281,12 +280,12 @@ def add_vote_props(target:Union[Post, Comment], v:Optional[User], vote_cls):
target.voted = vt.vote_type if vt else 0
return target
def add_vote_and_block_props(target:Union[Post, Comment], v:Optional[User], vote_cls):
def add_vote_and_block_props(target, v, vote_cls):
if not v: return target
target = add_block_props(target, v)
return add_vote_props(target, v, vote_cls)
def get_comments(cids:Iterable[int], v:Optional[User]=None, extra:Optional[Callable[[Query], Query]]=None) -> List[Comment]:
def get_comments(cids, v=None, extra=None):
if not cids: return []
if v:
output = get_comments_v_properties(v, None, Comment.id.in_(cids))[1]
@ -296,7 +295,7 @@ def get_comments(cids:Iterable[int], v:Optional[User]=None, extra:Optional[Calla
output = output.filter(Comment.id.in_(cids)).all()
return sorted(output, key=lambda x: cids.index(x.id))
def get_comments_v_properties(v:User, should_keep_func:Optional[Callable[[Comment], bool]]=None, *criterion):
def get_comments_v_properties(v, should_keep_func=None, *criterion):
if not v:
raise TypeError("A user is required")
votes = g.db.query(CommentVote.vote_type, CommentVote.comment_id).filter_by(user_id=v.id).subquery()
@ -332,7 +331,7 @@ def get_comments_v_properties(v:User, should_keep_func:Optional[Callable[[Commen
else: dump.append(comment)
return (comments, output)
def get_sub_by_name(sub:str, v:Optional[User]=None, graceful=False) -> Optional[Sub]:
def get_sub_by_name(sub, v=None, graceful=False):
if not sub:
if graceful: return None
else: abort(404)
@ -347,7 +346,7 @@ def get_sub_by_name(sub:str, v:Optional[User]=None, graceful=False) -> Optional[
return sub
@cache.memoize(timeout=3600)
def get_profile_picture(identifier:Union[int, str]) -> str:
def get_profile_picture(identifier):
if isinstance(identifier, int):
x = get_account(identifier, graceful=True)
else:

View File

@ -1,6 +1,6 @@
from files.helpers.config.const import *
def log_file(log_str:str, log_filename="rdrama.log"):
def log_file(log_str, log_filename="rdrama.log"):
'''
Simple method to log a string to a file
'''

View File

@ -3,7 +3,6 @@ import subprocess
import time
import requests
from shutil import copyfile
from typing import Optional
import gevent
import imagehash
@ -175,7 +174,7 @@ def process_video(file, v):
else:
return f"{SITE_FULL}{new}"
def process_image(filename:str, v, resize=0, trim=False, uploader_id:Optional[int]=None, db=None):
def process_image(filename, v, resize=0, trim=False, uploader_id=None, db=None):
# thumbnails are processed in a thread and not in the request context
# if an image is too large or webp conversion fails, it'll crash
# to avoid this, we'll simply return None instead

View File

@ -1,5 +1,4 @@
import time
from typing import Iterable
import itertools
import requests
@ -21,7 +20,7 @@ from files.classes.notifications import Notification
# with current keyword quantities. If this ever changes, consider reading the
# value from /meta (or just guessing) and doing a random selection of keywords.
def offsite_mentions_task(cache:Cache):
def offsite_mentions_task(cache):
site_mentions = get_mentions(cache, const.REDDIT_NOTIFS_SITE)
notify_mentions(site_mentions)
@ -32,7 +31,7 @@ def offsite_mentions_task(cache:Cache):
g.db.commit() # commit early otherwise localhost testing fails to commit
def get_mentions(cache:Cache, queries:Iterable[str], reddit_notifs_users=False):
def get_mentions(cache, queries, reddit_notifs_users=False):
kinds = ['post', 'comment']
mentions = []
exclude_subreddits = ['PokemonGoRaids', 'SubSimulatorGPT2', 'SubSimGPT2Interactive']

View File

@ -27,7 +27,7 @@ OWO_EXCLUDE_PATTERNS = [
re.compile(r'\bthe\b', flags=re.I|re.A), # exclude: 'the' ↦ 'teh'
]
def owoify(source: str) -> str:
def owoify(source):
word_matches = OWO_WORD_REGEX.findall(source)
space_matches = OWO_SPACE_REGEX.findall(source)

View File

@ -1,7 +1,6 @@
import random
import re
from random import choice, choices
from typing import List, Optional, Union
from .config.const import *
@ -157,7 +156,7 @@ pronouns_regex = re.compile("([a-z]{1,7})\/[a-z]{1,7}(\/[a-z]{1,7})?", flags=re.
html_title_regex = re.compile("<title>(.{1,200})</title>", flags=re.I)
def sub_matcher(match:re.Match, upper=False, replace_with:Union[dict[str, str], dict[str, List[str]]]=SLURS_FOR_REPLACING):
def sub_matcher(match, upper=False, replace_with=SLURS_FOR_REPLACING):
group_num = 0
match_str = match.group(group_num)
if match_str.startswith('<'):
@ -169,7 +168,7 @@ def sub_matcher(match:re.Match, upper=False, replace_with:Union[dict[str, str],
else:
return repl.upper()
def sub_matcher_upper(match, replace_with:Union[dict[str, str], dict[str, List[str]]]=SLURS_FOR_REPLACING):
def sub_matcher_upper(match, replace_with=SLURS_FOR_REPLACING):
return sub_matcher(match, upper=True, replace_with=replace_with)
@ -186,13 +185,13 @@ def sub_matcher_profanities(match, upper=False):
def sub_matcher_profanities_upper(match):
return sub_matcher_profanities(match, upper=True)
def censor_slurs(body:Optional[str], logged_user):
def censor_slurs(body, logged_user):
if not body: return ""
if '<pre>' in body or '<code>' in body:
return body
def replace_re(body:str, regex:re.Pattern, regex_upper:re.Pattern, sub_func, sub_func_upper):
def replace_re(body, regex, regex_upper, sub_func, sub_func_upper):
body = regex_upper.sub(sub_func_upper, body)
return regex.sub(sub_func, body)

View File

@ -4,7 +4,6 @@ import re
import signal
from functools import partial
from os import path, listdir
from typing import Any
from urllib.parse import parse_qs, urlparse, unquote
from sqlalchemy.sql import func
@ -267,7 +266,7 @@ def render_emoji(html, regexp, golden, emojis_used, b=False, is_title=False):
return html
def with_sigalrm_timeout(timeout: int):
def with_sigalrm_timeout(timeout):
'Use SIGALRM to raise an exception if the function executes for longer than timeout seconds'
# while trying to test this using time.sleep I discovered that gunicorn does in fact do some
@ -289,7 +288,7 @@ def with_sigalrm_timeout(timeout: int):
return wrapped
return inner
def remove_cuniform(sanitized:Optional[str]) -> str:
def remove_cuniform(sanitized):
if not sanitized: return ""
sanitized = sanitized.replace('\u200e','').replace('\u200b','').replace('\u202e','').replace("\ufeff", "")
sanitized = sanitized.replace("𒐪","").replace("𒐫","").replace("","")

View File

@ -19,14 +19,14 @@ _SETTINGS = {
"ddos_detected": False
}
def get_setting(setting:str):
def get_setting(setting):
if not setting or not isinstance(setting, str): raise TypeError()
return _SETTINGS[setting]
def get_settings() -> dict[str, bool]:
def get_settings():
return _SETTINGS
def toggle_setting(setting:str):
def toggle_setting(setting):
val = not _SETTINGS[setting]
_SETTINGS[setting] = val
_save_settings()

View File

@ -119,7 +119,7 @@ def shuffle(stuff):
return stuff
def check_slots_command(c:Comment, v:User, u:User):
def check_slots_command(c, v, u):
if not FEATURES['GAMBLING']: return
body = c.body.lower()

View File

@ -1,5 +1,4 @@
import time
from typing import Optional
from sqlalchemy.sql import func
@ -58,7 +57,7 @@ def sort_objects(sort, objects, cls):
else:
return objects.order_by(cls.downvotes - cls.upvotes, cls.created_utc.desc())
def make_age_string(compare:Optional[int]) -> str:
def make_age_string(compare):
if not compare or compare < 1577865600: return ""
age = int(time.time()) - compare

View File

@ -103,7 +103,7 @@ def edit_rules_post(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['ADMIN_ADD'])
def make_admin(v:User, username):
def make_admin(v, username):
user = get_user(username)
user.admin_level = 1
@ -127,7 +127,7 @@ def make_admin(v:User, username):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['ADMIN_REMOVE'])
def remove_admin(v:User, username):
def remove_admin(v, username):
if SITE == 'devrama.net':
abort(403, "You can't remove admins on devrama!")
@ -157,7 +157,7 @@ def remove_admin(v:User, username):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['POST_BETS_DISTRIBUTE'])
def distribute(v:User, kind, option_id):
def distribute(v, kind, option_id):
autojanny = get_account(AUTOJANNY_ID)
if autojanny.coins == 0: abort(400, "@AutoJanny has 0 coins")
@ -227,7 +227,7 @@ def distribute(v:User, kind, option_id):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['ADMIN_ACTIONS_REVERT'])
def revert_actions(v:User, username):
def revert_actions(v, username):
revertee = get_user(username)
if revertee.admin_level > v.admin_level:
@ -388,7 +388,7 @@ def admin_home(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['SITE_SETTINGS'])
def change_settings(v:User, setting):
def change_settings(v, setting):
if setting not in get_settings().keys():
abort(404, f"Setting '{setting}' not found")
@ -672,7 +672,7 @@ def alt_votes_get(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['USER_LINK'])
def admin_view_alts(v:User, username=None):
def admin_view_alts(v, username=None):
u = get_user(username or request.values.get('username'), graceful=True)
return render_template('admin/alts.html', v=v, u=u, alts=u.alts if u else None)
@ -682,7 +682,7 @@ def admin_view_alts(v:User, username=None):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['USER_LINK'])
def admin_add_alt(v:User, username):
def admin_add_alt(v, username):
user1 = get_user(username)
user2 = get_user(request.values.get('other_username'))
if user1.id == user2.id: abort(400, "Can't add the same account as alts of each other")
@ -718,7 +718,7 @@ def admin_add_alt(v:User, username):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['USER_LINK'])
def admin_delink_relink_alt(v:User, username, other):
def admin_delink_relink_alt(v, username, other):
user1 = get_user(username)
user2 = get_account(other)
ids = [user1.id, user2.id]
@ -1203,7 +1203,7 @@ def unban_user(fullname, v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['USER_BAN'])
def mute_user(v:User, user_id):
def mute_user(v, user_id):
user = get_account(user_id)
if not user.is_muted:
@ -1226,7 +1226,7 @@ def mute_user(v:User, user_id):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['USER_BAN'])
def unmute_user(v:User, user_id):
def unmute_user(v, user_id):
user = get_account(user_id)
if user.is_muted:
@ -1711,7 +1711,7 @@ def ban_domain(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@admin_level_required(PERMS['DOMAINS_BAN'])
def unban_domain(v:User, domain):
def unban_domain(v, domain):
existing = g.db.get(BannedDomain, domain)
if not existing: abort(400, 'Domain is not banned!')

View File

@ -55,7 +55,7 @@ def before_request():
@app.after_request
def after_request(response:Response):
def after_request(response):
user_id = None
if response.status_code < 400:
@ -91,14 +91,14 @@ def teardown_request(error):
_rollback_and_close_db()
stdout.flush()
def _commit_and_close_db() -> bool:
def _commit_and_close_db():
if not getattr(g, 'db', None): return False
g.db.commit()
g.db.close()
del g.db
return True
def _rollback_and_close_db() -> bool:
def _rollback_and_close_db():
if not getattr(g, 'db', None): return False
g.db.rollback()
g.db.close()

View File

@ -22,7 +22,7 @@ def submit_marseys_redirect():
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def submit_emojis(v:User):
def submit_emojis(v):
if v.admin_level >= PERMS['VIEW_PENDING_SUBMITTED_EMOJIS']:
emojis = g.db.query(Emoji).filter(Emoji.submitter_id != None)
else:
@ -43,7 +43,7 @@ def submit_emojis(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def submit_emoji(v:User):
def submit_emoji(v):
file = request.files["image"]
name = request.values.get('name', '').lower().strip()
tags = request.values.get('tags', '').lower().strip()
@ -103,7 +103,7 @@ def submit_emoji(v:User):
return redirect(f"/submit/emojis?msg='{name}' submitted successfully!")
def verify_permissions_and_get_asset(cls, asset_type:str, v:User, name:str, make_lower=False):
def verify_permissions_and_get_asset(cls, asset_type, v, name, make_lower=False):
if cls not in ASSET_TYPES: raise Exception("not a valid asset type")
name = name.strip()
if make_lower: name = name.lower()
@ -220,7 +220,7 @@ def approve_emoji(v, name):
return {"message": f"'{emoji.name}' approved!"}
def remove_asset(cls, type_name:str, v:User, name:str) -> dict[str, str]:
def remove_asset(cls, type_name, v, name):
if cls not in ASSET_TYPES: raise Exception("not a valid asset type")
should_make_lower = cls == Emoji
if should_make_lower: name = name.lower()
@ -262,14 +262,14 @@ def remove_asset(cls, type_name:str, v:User, name:str) -> dict[str, str]:
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def remove_emoji(v:User, name):
def remove_emoji(v, name):
return remove_asset(Emoji, "emoji", v, name)
@app.get("/submit/hats")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def submit_hats(v:User):
def submit_hats(v):
if v.admin_level >= PERMS['VIEW_PENDING_SUBMITTED_HATS']: hats = g.db.query(HatDef).filter(HatDef.submitter_id != None)
else: hats = g.db.query(HatDef).filter(HatDef.submitter_id == v.id)
hats = hats.order_by(HatDef.created_utc.desc()).all()
@ -283,7 +283,7 @@ def submit_hats(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def submit_hat(v:User):
def submit_hat(v):
name = request.values.get('name', '').strip()
description = request.values.get('description', '').strip()
username = request.values.get('author', '').strip()
@ -413,7 +413,7 @@ def approve_hat(v, name):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def remove_hat(v:User, name):
def remove_hat(v, name):
return remove_asset(HatDef, 'hat', v, name)
@app.get("/admin/update/emojis")

View File

@ -26,14 +26,14 @@ from .front import frontlist
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def shop_awards(v:User):
def shop_awards(v):
return redirect('/shop/awards')
@app.get("/shop/awards")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def shop(v:User):
def shop(v):
AWARDS = deepcopy(AWARDS_ENABLED)
if v.house:
@ -60,7 +60,7 @@ def shop(v:User):
@limiter.limit("100/minute;200/hour;1000/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("100/minute;200/hour;1000/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def buy(v:User, award):
def buy(v, award):
if award == 'ghost':
abort(403, "You can't buy this award!")

View File

@ -15,7 +15,7 @@ from files.__main__ import app, limiter
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def casino(v:User):
def casino(v):
if v.rehab:
return render_template("casino/rehab.html", v=v), 403
@ -26,7 +26,7 @@ def casino(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def casino_game_page(v:User, game):
def casino_game_page(v, game):
if v.rehab:
return render_template("casino/rehab.html", v=v), 403
elif game not in CASINO_GAME_KINDS:
@ -56,7 +56,7 @@ def casino_game_page(v:User, game):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def casino_game_feed(v:User, game):
def casino_game_feed(v, game):
if v.rehab:
abort(403, "You are under Rehab award effect!")
elif game not in CASINO_GAME_KINDS:
@ -71,7 +71,7 @@ def casino_game_feed(v:User, game):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def lottershe(v:User):
def lottershe(v):
if v.rehab:
return render_template("casino/rehab.html", v=v)
@ -85,7 +85,7 @@ def lottershe(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def pull_slots(v:User):
def pull_slots(v):
if v.rehab:
abort(403, "You are under Rehab award effect!")
@ -119,7 +119,7 @@ def pull_slots(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def blackjack_deal_to_player(v:User):
def blackjack_deal_to_player(v):
if v.rehab:
abort(403, "You are under Rehab award effect!")
@ -141,7 +141,7 @@ def blackjack_deal_to_player(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def blackjack_player_hit(v:User):
def blackjack_player_hit(v):
if v.rehab:
abort(403, "You are under Rehab award effect!")
@ -159,7 +159,7 @@ def blackjack_player_hit(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def blackjack_player_stay(v:User):
def blackjack_player_stay(v):
if v.rehab:
abort(403, "You are under Rehab award effect!")
@ -177,7 +177,7 @@ def blackjack_player_stay(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def blackjack_player_doubled_down(v:User):
def blackjack_player_doubled_down(v):
if v.rehab:
abort(403, "You are under Rehab award effect!")
@ -195,7 +195,7 @@ def blackjack_player_doubled_down(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def blackjack_player_bought_insurance(v:User):
def blackjack_player_bought_insurance(v):
if v.rehab:
abort(403, "You are under Rehab award effect!")
@ -211,7 +211,7 @@ def blackjack_player_bought_insurance(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def roulette_get_bets(v:User):
def roulette_get_bets(v):
if v.rehab:
abort(403, "You are under Rehab award effect!")
@ -226,7 +226,7 @@ def roulette_get_bets(v:User):
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(CASINO_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def roulette_player_placed_bet(v:User):
def roulette_player_placed_bet(v):
if v.rehab:
abort(403, "You are under Rehab award effect!")

View File

@ -95,7 +95,7 @@ def post_pid_comment_cid(cid, v, pid=None, anything=None, sub=None):
@limiter.limit("20/minute;200/hour;1000/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("20/minute;200/hour;1000/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_banned
def comment(v:User):
def comment(v):
parent_fullname = request.values.get("parent_fullname").strip()
if len(parent_fullname) < 3: abort(400)
id = parent_fullname[2:]

View File

@ -175,7 +175,7 @@ def frontlist(v=None, sort="hot", page=1, t="all", ids_only=True, filter_words='
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def random_post(v:User):
def random_post(v):
p = g.db.query(Post.id).filter(Post.deleted_utc == 0, Post.is_banned == False, Post.private == False).order_by(func.random()).first()
@ -189,7 +189,7 @@ def random_post(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def random_user(v:User):
def random_user(v):
u = g.db.query(User.username).filter(User.song != None).order_by(func.random()).first()
if u: u = u[0]
@ -230,7 +230,7 @@ def comment_idlist(v=None, page=1, sort="new", t="day", gt=0, lt=0):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def all_comments(v:User):
def all_comments(v):
page = get_page()
sort=request.values.get("sort", "new")

View File

@ -11,7 +11,7 @@ from files.__main__ import app, limiter
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def ping_groups(v:User):
def ping_groups(v):
groups = g.db.query(Group).order_by(Group.created_utc).all()
return render_template('groups.html', v=v, groups=groups, cost=GROUP_COST, msg=get_msg(), error=get_error())
@ -64,7 +64,7 @@ def create_group(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def join_group(v:User, group_name):
def join_group(v, group_name):
group_name = group_name.strip().lower()
group = g.db.get(Group, group_name)
@ -83,7 +83,7 @@ def join_group(v:User, group_name):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def leave_group(v:User, group_name):
def leave_group(v, group_name):
group_name = group_name.strip().lower()
if group_name == 'jannies':
@ -112,7 +112,7 @@ def leave_group(v:User, group_name):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def memberships(v:User, group_name):
def memberships(v, group_name):
group_name = group_name.strip().lower()
group = g.db.get(Group, group_name)
@ -136,7 +136,7 @@ def memberships(v:User, group_name):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def group_approve(v:User, group_name, user_id):
def group_approve(v, group_name, user_id):
group_name = group_name.strip().lower()
group = g.db.get(Group, group_name)
@ -162,7 +162,7 @@ def group_approve(v:User, group_name, user_id):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def group_reject(v:User, group_name, user_id):
def group_reject(v, group_name, user_id):
group_name = group_name.strip().lower()
group = g.db.get(Group, group_name)

View File

@ -11,7 +11,7 @@ from files.__main__ import app, limiter
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def hats(v:User):
def hats(v):
owned_hat_ids = [x.hat_id for x in v.owned_hats]
sort = request.values.get("sort")
@ -73,7 +73,7 @@ def hats(v:User):
@limiter.limit('100/minute;1000/3 days', deduct_when=lambda response: response.status_code < 400)
@limiter.limit('100/minute;1000/3 days', deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def buy_hat(v:User, hat_id):
def buy_hat(v, hat_id):
try: hat_id = int(hat_id)
except: abort(404, "Hat not found!")
@ -120,7 +120,7 @@ def buy_hat(v:User, hat_id):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def equip_hat(v:User, hat_id):
def equip_hat(v, hat_id):
try: hat_id = int(hat_id)
except: abort(404, "Hat not found!")
@ -138,7 +138,7 @@ def equip_hat(v:User, hat_id):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def unequip_hat(v:User, hat_id):
def unequip_hat(v, hat_id):
try: hat_id = int(hat_id)
except: abort(404, "Hat not found!")
@ -154,7 +154,7 @@ def unequip_hat(v:User, hat_id):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def hat_owners(v:User, hat_id):
def hat_owners(v, hat_id):
try: hat_id = int(hat_id)
except: abort(404, "Hat not found!")

View File

@ -23,7 +23,7 @@ NO_LOGIN_REDIRECT_URLS = ("/login", "/logout", "/signup", "/forgot", "/reset", "
@app.get("/login")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_desired
def login_get(v:Optional[User]):
def login_get(v):
redir = request.values.get("redirect", "").strip().rstrip('?').lower()
if v:
if redir and is_site_url(redir) and redir not in NO_LOGIN_REDIRECT_URLS:
@ -35,7 +35,7 @@ def login_get(v:Optional[User]):
@limiter.limit('1/second', scope=rpath)
@limiter.limit("6/minute;20/day", deduct_when=lambda response: response.status_code < 400)
@auth_desired
def login_post(v:Optional[User]):
def login_post(v):
if v: abort(400)
username = request.values.get("username")
@ -105,7 +105,7 @@ def login_post(v:Optional[User]):
return redirect(redir)
return redirect('/')
def log_failed_admin_login_attempt(account:User, type:str):
def log_failed_admin_login_attempt(account, type):
if not account or account.admin_level < PERMS['SITE_WARN_ON_INVALID_AUTH']: return
ip = get_CF()
print(f"A site admin from {ip} failed to login to account @{account.user_name} (invalid {type})")
@ -125,7 +125,7 @@ def on_login(account, redir=None):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def me(v:User):
def me(v):
if v.client: return v.json
else: return redirect(v.url)
@ -146,7 +146,7 @@ def logout(v):
@app.get("/signup")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_desired
def sign_up_get(v:Optional[User]):
def sign_up_get(v):
if not get_setting('signups'):
abort(403, "New account registration is currently closed. Please come back later!")
@ -195,7 +195,7 @@ def sign_up_get(v:Optional[User]):
@limiter.limit('1/second', scope=rpath)
@limiter.limit("10/day", deduct_when=lambda response: response.status_code < 400)
@auth_desired
def sign_up_post(v:Optional[User]):
def sign_up_post(v):
if not get_setting('signups'):
abort(403, "New account registration is currently closed. Please come back later!")
@ -436,7 +436,7 @@ def get_reset():
@limiter.limit('1/second', scope=rpath)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_desired
def post_reset(v:Optional[User]):
def post_reset(v):
if v: return redirect('/')
user_id = request.values.get("user_id")
timestamp = 0
@ -475,7 +475,7 @@ def post_reset(v:Optional[User]):
@app.get("/lost_2fa")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_desired
def lost_2fa(v:Optional[User]):
def lost_2fa(v):
if v and not v.mfa_secret: abort(400, "You don't have two-factor authentication enabled")
return render_template("login/lost_2fa.html", v=v)

View File

@ -23,7 +23,7 @@ def lottery_start(v):
@limiter.limit("100/minute;500/hour;1000/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("100/minute;500/hour;1000/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def lottery_buy(v:User):
def lottery_buy(v):
try: quantity = int(request.values.get("quantity"))
except: abort(400, "Invalid ticket quantity!")
@ -41,7 +41,7 @@ def lottery_buy(v:User):
@limiter.limit("100/minute;500/hour;1000/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("100/minute;500/hour;1000/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def lottery_active(v:User):
def lottery_active(v):
lottery, participants = get_active_lottery_stats()
return {"message": "", "stats": {"user": v.lottery_stats, "lottery": lottery, "participants": participants}}

View File

@ -27,7 +27,7 @@ def verify_email(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def activate(v:User):
def activate(v):
email = request.values.get("email", "").strip().lower()
if not email_regex.fullmatch(email):

View File

@ -85,7 +85,7 @@ def notifications_modmail(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def notifications_messages(v:User):
def notifications_messages(v):
page = get_page()
# All of these queries are horrible. For whomever comes here after me,
@ -160,7 +160,7 @@ def notifications_messages(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def notifications_posts(v:User):
def notifications_posts(v):
page = get_page()
listing = g.db.query(Post).filter(
@ -207,7 +207,7 @@ def notifications_posts(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def notifications_modactions(v:User):
def notifications_modactions(v):
page = get_page()
if v.admin_level >= PERMS['NOTIFICATIONS_MODERATOR_ACTIONS']:
@ -254,7 +254,7 @@ def notifications_modactions(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def notifications_reddit(v:User):
def notifications_reddit(v):
page = get_page()
if not v.can_view_offsitementions: abort(403)
@ -293,7 +293,7 @@ def notifications_reddit(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def notifications(v:User):
def notifications(v):
page = get_page()
if not session.get("GLOBAL") and v.admin_level < PERMS['USER_SHADOWBAN'] and not request.values.get('nr'):

View File

@ -11,7 +11,7 @@ from files.__main__ import app, limiter
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def authorize_prompt(v:User):
def authorize_prompt(v):
client_id = request.values.get("client_id")
application = g.db.query(OauthApp).filter_by(client_id=client_id).one_or_none()
if not application:

View File

@ -81,7 +81,7 @@ def publish(pid, v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def submit_get(v:User, sub=None):
def submit_get(v, sub=None):
sub = get_sub_by_name(sub, graceful=True)
if request.path.startswith('/h/') and not sub: abort(404)
@ -275,7 +275,7 @@ def more_comments(v, cid):
return render_template("comments.html", v=v, comments=comments, p=p, render_replies=True)
def thumbnail_thread(pid:int, vid:int):
def thumbnail_thread(pid, vid):
db = db_session()
def expand_url(post_url, fragment_url):
if fragment_url.startswith("https://"):
@ -459,7 +459,7 @@ def is_repost(v):
@limiter.limit(POST_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(POST_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_banned
def submit_post(v:User, sub=None):
def submit_post(v, sub=None):
url = request.values.get("url", "").strip()
if '\\' in url: abort(400)
@ -952,7 +952,7 @@ def pin_post(post_id, v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def set_new_sort(post_id:int, v:User):
def set_new_sort(post_id, v):
p = get_post(post_id)
if not v.can_edit(p): abort(403, "Only the post author can do that!")
p.new = True
@ -974,7 +974,7 @@ def set_new_sort(post_id:int, v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def unset_new_sort(post_id:int, v:User):
def unset_new_sort(post_id, v):
p = get_post(post_id)
if not v.can_edit(p): abort(403, "Only the post author can do that!")
p.new = None

View File

@ -157,7 +157,7 @@ def remove_report_comment(v, cid, uid):
g.db.add(ma)
return {"message": "Report removed successfully!"}
def move_post(post:Post, v:User, reason:str) -> Union[bool, str]:
def move_post(post, v, reason):
if post.ghost:
return False

View File

@ -1,7 +1,6 @@
import time
from random import randint
from typing import Optional, Union, Callable, List, Set
from sqlalchemy.orm import aliased, deferred
from sqlalchemy.sql import case, literal
from sqlalchemy.sql.expression import or_
@ -13,19 +12,19 @@ from files.helpers.config.const import *
from files.helpers.security import generate_hash, validate_hash
from files.__main__ import cache
def get_raw_formkey(u:User):
def get_raw_formkey(u):
return f"{session['session_id']}+{u.id}+{u.login_nonce}"
def get_formkey(u:Optional[User]):
def get_formkey(u):
if not u: return "" # if no user exists, give them a blank formkey
return generate_hash(get_raw_formkey(u))
def validate_formkey(u:User, formkey:Optional[str]) -> bool:
def validate_formkey(u, formkey):
if not formkey: return False
return validate_hash(get_raw_formkey(u), formkey)
@cache.memoize()
def get_alt_graph_ids(uid:int) -> Set[int]:
def get_alt_graph_ids(uid):
alt_graph_cte = g.db.query(literal(uid).label('user_id')).select_from(Alt).cte('alt_graph', recursive=True)
alt_graph_cte_inner = g.db.query(
@ -40,11 +39,11 @@ def get_alt_graph_ids(uid:int) -> Set[int]:
alt_graph_cte = alt_graph_cte.union(alt_graph_cte_inner)
return set([x[0] for x in g.db.query(User.id).filter(User.id == alt_graph_cte.c.user_id, User.id != uid).all()])
def get_alt_graph(uid:int) -> List[User]:
def get_alt_graph(uid):
alt_ids = get_alt_graph_ids(uid)
return g.db.query(User).filter(User.id.in_(alt_ids)).order_by(User.username).all()
def add_alt(user1:int, user2:int):
def add_alt(user1, user2):
if AEVANN_ID in (user1, user2) or CARP_ID in (user1, user2):
return
li = [user1, user2]
@ -56,7 +55,7 @@ def add_alt(user1:int, user2:int):
cache.delete_memoized(get_alt_graph_ids, user1)
cache.delete_memoized(get_alt_graph_ids, user2)
def check_for_alts(current:User, include_current_session=False):
def check_for_alts(current, include_current_session=False):
current_id = current.id
ids = [x[0] for x in g.db.query(User.id).all()]
past_accs = set(session.get("history", [])) if include_current_session else set()
@ -105,7 +104,7 @@ def check_for_alts(current:User, include_current_session=False):
u.blacklisted_by = current.blacklisted_by
g.db.add(u)
def execute_shadowban_viewers_and_voters(v:Optional[User], target:Union[Post, Comment]):
def execute_shadowban_viewers_and_voters(v, target):
if not v or not v.shadowbanned: return
if not target: return
if v.id != target.author_id: return

View File

@ -49,7 +49,7 @@ def searchparse(text):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def searchposts(v:User):
def searchposts(v):
query = request.values.get("q", '').strip()
if not query:
abort(403, "Empty searches aren't allowed!")
@ -185,7 +185,7 @@ def searchposts(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def searchcomments(v:User):
def searchcomments(v):
query = request.values.get("q", '').strip()
if not query:
abort(403, "Empty searches aren't allowed!")
@ -289,7 +289,7 @@ def searchcomments(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def searchmessages(v:User):
def searchmessages(v):
query = request.values.get("q", '').strip()
if not query:
abort(403, "Empty searches aren't allowed!")
@ -380,7 +380,7 @@ def searchmessages(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def searchusers(v:User):
def searchusers(v):
query = request.values.get("q", '').strip()
if not query:
abort(403, "Empty searches aren't allowed!")

View File

@ -30,14 +30,14 @@ from files.__main__ import app, cache, limiter
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def settings(v:User):
def settings(v):
return redirect("/settings/personal")
@app.get("/settings/personal")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def settings_personal(v:User):
def settings_personal(v):
return render_template("settings/personal.html", v=v, error=get_error(), msg=get_msg())
@app.delete('/settings/background')
@ -124,7 +124,7 @@ def settings_personal_post(v):
# begin common selectors #
def update_flag(column_name:str, request_name:str):
def update_flag(column_name, request_name):
if not request.values.get(request_name, ''): return False
request_flag = request.values.get(request_name, '') == 'true'
if request_flag != getattr(v, column_name):
@ -132,7 +132,7 @@ def settings_personal_post(v):
return True
return False
def update_potentially_permanent_flag(column_name:str, request_name:str, friendly_name:str, badge_id:Optional[int]):
def update_potentially_permanent_flag(column_name, request_name, friendly_name, badge_id):
if not request.values.get(request_name): return False
current_value = getattr(v, column_name)
if FEATURES['USERS_PERMANENT_WORD_FILTERS'] and current_value > 1:
@ -149,7 +149,7 @@ def settings_personal_post(v):
return True
return False
def set_selector_option(column_name:str, api_name:str, valid_values:Iterable[str], error_msg:str="value"):
def set_selector_option(column_name, api_name, valid_values, error_msg="value"):
opt = request.values.get(api_name)
if opt: opt = opt.strip()
if not opt: return False
@ -398,7 +398,7 @@ def settings_personal_post(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def filters(v:User):
def filters(v):
filters=request.values.get("filters")[:1000].strip()
if filters == v.custom_filter_list:
@ -409,7 +409,7 @@ def filters(v:User):
return redirect("/settings/advanced?msg=Your custom filters have been updated!")
def set_color(v:User, attr:str, color:Optional[str]):
def set_color(v, attr, color):
current = getattr(v, attr)
color = color.strip().lower() if color else None
if color:
@ -621,7 +621,7 @@ def settings_images_banner(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def settings_css_get(v:User):
def settings_css_get(v):
return render_template("settings/css.html", v=v, msg=get_msg(), profilecss=v.profilecss)
@app.post("/settings/css")
@ -657,7 +657,7 @@ def settings_profilecss(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def settings_security(v:User):
def settings_security(v):
return render_template("settings/security.html",
v=v,
mfa_secret=pyotp.random_base32() if not v.mfa_secret else None,
@ -668,7 +668,7 @@ def settings_security(v:User):
@app.get("/settings/blocks")
@auth_required
def settings_blocks(v:User):
def settings_blocks(v):
return render_template("settings/blocks.html", v=v)
@app.post("/settings/block")
@ -721,14 +721,14 @@ def settings_unblock_user(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def settings_apps(v:User):
def settings_apps(v):
return render_template("settings/apps.html", v=v)
@app.get("/settings/advanced")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def settings_advanced_get(v:User):
def settings_advanced_get(v):
return render_template("settings/advanced.html", v=v, msg=get_msg(), error=get_error())
@app.post("/settings/name_change")

View File

@ -57,7 +57,7 @@ def marseys_redirect():
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def emoji_list(v:User):
def emoji_list(v):
emojis = get_emoji_list()
authors = get_accounts_dict([e.author_id for e in emojis], v=v, graceful=True)
@ -105,7 +105,7 @@ def emojis(v):
@app.get('/sidebar')
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_desired
def sidebar(v:Optional[User]):
def sidebar(v):
return render_template('sidebar.html', v=v)
@ -113,7 +113,7 @@ def sidebar(v:Optional[User]):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def participation_stats(v:User):
def participation_stats(v):
stats = cache.get('stats') or {}
if v.client: return stats
return render_template("stats.html", v=v, title="Content Statistics", data=stats)
@ -127,14 +127,14 @@ def chart():
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def weekly_chart(v:User):
def weekly_chart(v):
return send_file(statshelper.chart_path(kind="weekly", site=SITE))
@app.get("/daily_chart")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def daily_chart(v:User):
def daily_chart(v):
return send_file(statshelper.chart_path(kind="daily", site=SITE))
@app.get("/admin/patrons")
@ -152,7 +152,7 @@ def patrons(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def admins(v:User):
def admins(v):
admins = g.db.query(User).filter(User.admin_level >= PERMS['ADMIN_MOP_VISIBLE']).order_by(User.admin_level.desc(), User.truescore.desc()).all()
return render_template("admins.html", v=v, admins=admins)
@ -161,7 +161,7 @@ def admins(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def log(v:User):
def log(v):
page = get_page()
@ -234,7 +234,7 @@ def log_item(id, v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def static_megathread_index(v:User):
def static_megathread_index(v):
if SITE_NAME != 'rDrama':
abort(404)
@ -259,7 +259,7 @@ def api(v):
@app.get("/admin/chat")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_desired
def contact(v:Optional[User]):
def contact(v):
return render_template("contact.html", v=v, msg=get_msg())
@app.post("/contact")
@ -332,7 +332,7 @@ def badge_list(site, can_view_patron_badges):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def badges(v:User):
def badges(v):
badges, counts = badge_list(SITE, v.admin_level >= PERMS['VIEW_PATRONS'])
return render_template("badges.html", v=v, badges=badges, counts=counts)
@ -357,13 +357,13 @@ def blocks(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def formatting(v:User):
def formatting(v):
return render_template("formatting.html", v=v)
@app.get("/app")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_desired
def mobile_app(v:Optional[User]):
def mobile_app(v):
return render_template("app.html", v=v)
@app.post("/dismiss_mobile_tip")
@ -392,7 +392,7 @@ def transfers_id(id, v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def transfers(v:User):
def transfers(v):
comments = g.db.query(Comment).filter(Comment.author_id == AUTOJANNY_ID, Comment.parent_post == None, Comment.body_html.like("%</a> has transferred %"))

View File

@ -13,7 +13,7 @@ from files.__main__ import app, cache, limiter
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def exile_post(v:User, pid):
def exile_post(v, pid):
if v.shadowbanned: abort(500)
p = get_post(pid)
sub = p.sub
@ -48,7 +48,7 @@ def exile_post(v:User, pid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def exile_comment(v:User, cid):
def exile_comment(v, cid):
if v.shadowbanned: abort(500)
c = get_comment(cid)
sub = c.post.sub
@ -83,7 +83,7 @@ def exile_comment(v:User, cid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def unexile(v:User, sub, uid):
def unexile(v, sub, uid):
u = get_account(uid)
if not v.mods(sub): abort(403)
@ -115,7 +115,7 @@ def unexile(v:User, sub, uid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def block_sub(v:User, sub):
def block_sub(v, sub):
sub = get_sub_by_name(sub).name
existing = g.db.query(SubBlock).filter_by(user_id=v.id, sub=sub).one_or_none()
if not existing:
@ -130,7 +130,7 @@ def block_sub(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def unblock_sub(v:User, sub):
def unblock_sub(v, sub):
sub = get_sub_by_name(sub)
if not User.can_see(v, sub):
abort(403)
@ -149,7 +149,7 @@ def unblock_sub(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def subscribe_sub(v:User, sub):
def subscribe_sub(v, sub):
sub = get_sub_by_name(sub).name
existing = g.db.query(SubJoin).filter_by(user_id=v.id, sub=sub).one_or_none()
if not existing:
@ -164,7 +164,7 @@ def subscribe_sub(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def unsubscribe_sub(v:User, sub):
def unsubscribe_sub(v, sub):
sub = get_sub_by_name(sub).name
subscribe = g.db.query(SubJoin).filter_by(user_id=v.id, sub=sub).one_or_none()
if subscribe:
@ -178,7 +178,7 @@ def unsubscribe_sub(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def follow_sub(v:User, sub):
def follow_sub(v, sub):
sub = get_sub_by_name(sub)
if not User.can_see(v, sub):
abort(403)
@ -196,7 +196,7 @@ def follow_sub(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def unfollow_sub(v:User, sub):
def unfollow_sub(v, sub):
sub = get_sub_by_name(sub)
subscription = g.db.query(SubSubscription).filter_by(user_id=v.id, sub=sub.name).one_or_none()
if subscription:
@ -209,7 +209,7 @@ def unfollow_sub(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def mods(v:User, sub):
def mods(v, sub):
sub = get_sub_by_name(sub)
if not User.can_see(v, sub):
abort(403)
@ -222,7 +222,7 @@ def mods(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def sub_exilees(v:User, sub):
def sub_exilees(v, sub):
sub = get_sub_by_name(sub)
if not User.can_see(v, sub):
abort(403)
@ -237,7 +237,7 @@ def sub_exilees(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def sub_blockers(v:User, sub):
def sub_blockers(v, sub):
sub = get_sub_by_name(sub)
if not User.can_see(v, sub):
abort(403)
@ -253,7 +253,7 @@ def sub_blockers(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def sub_followers(v:User, sub):
def sub_followers(v, sub):
sub = get_sub_by_name(sub)
if not User.can_see(v, sub):
abort(403)
@ -271,7 +271,7 @@ def sub_followers(v:User, sub):
@limiter.limit("30/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("30/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def add_mod(v:User, sub):
def add_mod(v, sub):
if SITE_NAME == 'WPD': abort(403)
sub = get_sub_by_name(sub).name
if not v.mods(sub): abort(403)
@ -311,7 +311,7 @@ def add_mod(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def remove_mod(v:User, sub):
def remove_mod(v, sub):
sub = get_sub_by_name(sub).name
if not v.mods(sub): abort(403)
@ -404,7 +404,7 @@ def create_sub2(v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def kick(v:User, pid):
def kick(v, pid):
post = get_post(pid)
if not post.sub: abort(403)
@ -437,7 +437,7 @@ def kick(v:User, pid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def sub_settings(v:User, sub):
def sub_settings(v, sub):
sub = get_sub_by_name(sub)
if not v.mods(sub.name): abort(403)
return render_template('sub/settings.html', v=v, sidebar=sub.sidebar, sub=sub, css=sub.css)
@ -449,7 +449,7 @@ def sub_settings(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def post_sub_sidebar(v:User, sub):
def post_sub_sidebar(v, sub):
sub = get_sub_by_name(sub)
if not v.mods(sub.name): abort(403)
if v.shadowbanned: return redirect(f'/h/{sub}/settings')
@ -479,7 +479,7 @@ def post_sub_sidebar(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def post_sub_css(v:User, sub):
def post_sub_css(v, sub):
sub = get_sub_by_name(sub)
css = request.values.get('css', '').strip()
@ -522,7 +522,7 @@ def get_sub_css(sub):
@limiter.limit("50/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("50/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def upload_sub_banner(v:User, sub:str):
def upload_sub_banner(v, sub):
if g.is_tor: abort(403, "Image uploads are not allowed through Tor")
sub = get_sub_by_name(sub)
@ -552,7 +552,7 @@ def upload_sub_banner(v:User, sub:str):
@limiter.limit("1/second;30/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("1/second;30/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def delete_sub_banner(v:User, sub:str, index:int):
def delete_sub_banner(v, sub, index):
sub = get_sub_by_name(sub)
if not v.mods(sub.name): abort(403)
if v.shadowbanned: return redirect(f'/h/{sub}/settings')
@ -583,7 +583,7 @@ def delete_sub_banner(v:User, sub:str, index:int):
@limiter.limit("1/10 second;30/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("1/10 second;30/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def delete_all_sub_banners(v:User, sub:str):
def delete_all_sub_banners(v, sub):
sub = get_sub_by_name(sub)
if not v.mods(sub.name): abort(403)
if v.shadowbanned: return redirect(f'/h/{sub}/settings')
@ -611,7 +611,7 @@ def delete_all_sub_banners(v:User, sub:str):
@limiter.limit("10/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("10/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def sub_sidebar(v:User, sub):
def sub_sidebar(v, sub):
if g.is_tor: abort(403, "Image uploads are not allowed through TOR!")
sub = get_sub_by_name(sub)
@ -644,7 +644,7 @@ def sub_sidebar(v:User, sub):
@limiter.limit("10/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("10/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def sub_marsey(v:User, sub):
def sub_marsey(v, sub):
if g.is_tor: abort(403, "Image uploads are not allowed through TOR!")
sub = get_sub_by_name(sub)
@ -676,7 +676,7 @@ def sub_marsey(v:User, sub):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def subs(v:User):
def subs(v):
subs = g.db.query(Sub, func.count(Post.sub)).outerjoin(Post, Sub.name == Post.sub).group_by(Sub.name).order_by(func.count(Post.sub).desc()).all()
total_users = g.db.query(User).count()
return render_template('sub/subs.html', v=v, subs=subs, total_users=total_users)
@ -687,7 +687,7 @@ def subs(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def hole_pin(v:User, pid):
def hole_pin(v, pid):
p = get_post(pid)
if not p.sub: abort(403)
@ -723,7 +723,7 @@ def hole_pin(v:User, pid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def hole_unpin(v:User, pid):
def hole_unpin(v, pid):
p = get_post(pid)
if not p.sub: abort(403)
@ -756,7 +756,7 @@ def hole_unpin(v:User, pid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def sub_stealth(v:User, sub):
def sub_stealth(v, sub):
sub = get_sub_by_name(sub)
if sub.name in {'braincels','smuggies','mnn'} and v.admin_level < PERMS["MODS_EVERY_HOLE"]:
abort(403)
@ -852,7 +852,7 @@ def unpin_comment_mod(cid, v):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def hole_log(v:User, sub):
def hole_log(v, sub):
sub = get_sub_by_name(sub)
if not User.can_see(v, sub):
abort(403)

View File

@ -3,7 +3,6 @@ import json
import math
import time
from collections import Counter
from typing import Literal
import gevent
import qrcode
@ -99,7 +98,7 @@ def claim_rewards_all_users():
print(f'@{user.username} rewards claimed successfully!', flush=True)
def transfer_currency(v:User, username:str, currency_name:Literal['coins', 'marseybux'], apply_tax:bool):
def transfer_currency(v, username, currency_name, apply_tax):
MIN_CURRENCY_TRANSFER = 100
TAX_PCT = 0.03
receiver = get_user(username, v=v)
@ -180,7 +179,7 @@ def upvoters_downvoters(v, username, uid, cls, vote_cls, vote_dir, template, sta
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def upvoters_posts(v:User, username, uid):
def upvoters_posts(v, username, uid):
return upvoters_downvoters(v, username, uid, Post, Vote, 1, "userpage/voted_posts.html", None)
@ -188,7 +187,7 @@ def upvoters_posts(v:User, username, uid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def upvoters_comments(v:User, username, uid):
def upvoters_comments(v, username, uid):
return upvoters_downvoters(v, username, uid, Comment, CommentVote, 1, "userpage/voted_comments.html", True)
@ -196,7 +195,7 @@ def upvoters_comments(v:User, username, uid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def downvoters_posts(v:User, username, uid):
def downvoters_posts(v, username, uid):
return upvoters_downvoters(v, username, uid, Post, Vote, -1, "userpage/voted_posts.html", None)
@ -204,7 +203,7 @@ def downvoters_posts(v:User, username, uid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def downvoters_comments(v:User, username, uid):
def downvoters_comments(v, username, uid):
return upvoters_downvoters(v, username, uid, Comment, CommentVote, -1, "userpage/voted_comments.html", True)
def upvoting_downvoting(v, username, uid, cls, vote_cls, vote_dir, template, standalone):
@ -246,7 +245,7 @@ def upvoting_downvoting(v, username, uid, cls, vote_cls, vote_dir, template, sta
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def upvoting_posts(v:User, username, uid):
def upvoting_posts(v, username, uid):
return upvoting_downvoting(v, username, uid, Post, Vote, 1, "userpage/voted_posts.html", None)
@ -254,7 +253,7 @@ def upvoting_posts(v:User, username, uid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def upvoting_comments(v:User, username, uid):
def upvoting_comments(v, username, uid):
return upvoting_downvoting(v, username, uid, Comment, CommentVote, 1, "userpage/voted_comments.html", True)
@ -262,7 +261,7 @@ def upvoting_comments(v:User, username, uid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def downvoting_posts(v:User, username, uid):
def downvoting_posts(v, username, uid):
return upvoting_downvoting(v, username, uid, Post, Vote, -1, "userpage/voted_posts.html", None)
@ -270,7 +269,7 @@ def downvoting_posts(v:User, username, uid):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def downvoting_comments(v:User, username, uid):
def downvoting_comments(v, username, uid):
return upvoting_downvoting(v, username, uid, Comment, CommentVote, -1, "userpage/voted_comments.html", True)
def user_voted(v, username, cls, vote_cls, template, standalone):
@ -306,7 +305,7 @@ def user_voted(v, username, cls, vote_cls, template, standalone):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def user_voted_posts(v:User, username):
def user_voted_posts(v, username):
return user_voted(v, username, Post, Vote, "userpage/voted_posts.html", None)
@ -314,14 +313,14 @@ def user_voted_posts(v:User, username):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def user_voted_comments(v:User, username):
def user_voted_comments(v, username):
return user_voted(v, username, Comment, CommentVote, "userpage/voted_comments.html", True)
@app.get("/banned")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def banned(v:User):
def banned(v):
users = g.db.query(User).filter(
User.is_banned != None,
or_(User.unban_utc == 0, User.unban_utc > time.time()),
@ -334,7 +333,7 @@ def banned(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def grassed(v:User):
def grassed(v):
users = g.db.query(User).filter(
User.ban_reason.like('grass award used by @%'),
User.unban_utc > time.time(),
@ -347,7 +346,7 @@ def grassed(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def chuds(v:User):
def chuds(v):
users = g.db.query(User).filter(
or_(User.chud == 1, User.chud > time.time()),
)
@ -357,7 +356,7 @@ def chuds(v:User):
users = users.order_by(User.username).all()
return render_template("chuds.html", v=v, users=users)
def all_upvoters_downvoters(v:User, username:str, vote_dir:int, is_who_simps_hates:bool):
def all_upvoters_downvoters(v, username, vote_dir, is_who_simps_hates):
if username == 'Snappy':
abort(403, "For performance reasons, you can't see Snappy's statistics!")
vote_str = 'votes'
@ -414,28 +413,28 @@ def all_upvoters_downvoters(v:User, username:str, vote_dir:int, is_who_simps_hat
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def upvoters(v:User, username:str):
def upvoters(v, username):
return all_upvoters_downvoters(v, username, 1, False)
@app.get("/@<username>/downvoters")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def downvoters(v:User, username:str):
def downvoters(v, username):
return all_upvoters_downvoters(v, username, -1, False)
@app.get("/@<username>/upvoting")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def upvoting(v:User, username:str):
def upvoting(v, username):
return all_upvoters_downvoters(v, username, 1, True)
@app.get("/@<username>/downvoting")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def downvoting(v:User, username:str):
def downvoting(v, username):
return all_upvoters_downvoters(v, username, -1, True)
@app.post("/@<username>/suicide")
@ -445,7 +444,7 @@ def downvoting(v:User, username:str):
@limiter.limit("5/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("5/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def suicide(v:User, username:str):
def suicide(v, username):
user = get_user(username)
suicide = f"Hi there,\n\nA [concerned user](/id/{v.id}) reached out to us about you.\n\nWhen you're in the middle of something painful, it may feel like you don't have a lot of options. But whatever you're going through, you deserve help and there are people who are here for you.\n\nThere are resources available in your area that are free, confidential, and available 24/7:\n\n- Call, Text, or Chat with Canada's [Crisis Services Canada](https://www.crisisservicescanada.ca/en/)\n\n- Call, Email, or Visit the UK's [Samaritans](https://www.samaritans.org/)\n\n- Text CHAT to America's [Crisis Text Line](https://www.crisistextline.org/) at 741741.\n\nIf you don't see a resource in your area above, the moderators keep a comprehensive list of resources and hotlines for people organized by location. Find Someone Now\n\nIf you think you may be depressed or struggling in another way, don't ignore it or brush it aside. Take yourself and your feelings seriously, and reach out to someone.\n\nIt may not feel like it, but you have options. There are people available to listen to you, and ways to move forward.\n\nYour fellow users care about you and there are people who want to help."
if not v.shadowbanned:
@ -457,7 +456,7 @@ def suicide(v:User, username:str):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def get_coins(v:User, username:str):
def get_coins(v, username):
user = get_user(username, v=v)
return {"coins": user.coins}
@ -467,7 +466,7 @@ def get_coins(v:User, username:str):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def transfer_coins(v:User, username:str):
def transfer_coins(v, username):
return transfer_currency(v, username, 'coins', True)
@app.post("/@<username>/transfer_bux")
@ -477,7 +476,7 @@ def transfer_coins(v:User, username:str):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@is_not_permabanned
def transfer_bux(v:User, username:str):
def transfer_bux(v, username):
return transfer_currency(v, username, 'marseybux', False)
@app.get("/leaderboard")
@ -485,7 +484,7 @@ def transfer_bux(v:User, username:str):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
@cache.memoize()
def leaderboard(v:User):
def leaderboard(v):
users = g.db.query(User)
coins = Leaderboard("Coins", "coins", "coins", "Coins", None, Leaderboard.get_simple_lb, User.coins, v, lambda u:u.coins, users)
@ -563,7 +562,7 @@ def get_profilecss(id):
@app.get("/@<username>/song")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
def usersong(username:str):
def usersong(username):
user = get_user(username)
if not user.song:
abort(404)
@ -677,7 +676,7 @@ def message2(v, username=None, id=None):
@limiter.limit("6/minute;50/hour;200/day", deduct_when=lambda response: response.status_code < 400)
@limiter.limit("6/minute;50/hour;200/day", deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def messagereply(v:User):
def messagereply(v):
body = request.values.get("body", "")
body = body[:COMMENT_BODY_LENGTH_LIMIT].strip()
@ -776,7 +775,7 @@ def messagereply(v:User):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def mfa_qr(v:User, secret:str):
def mfa_qr(v, secret):
x = pyotp.TOTP(secret)
qr = qrcode.QRCode(
error_correction=qrcode.constants.ERROR_CORRECT_L
@ -794,7 +793,7 @@ def mfa_qr(v:User, secret:str):
@app.get("/is_available/<name>")
@limiter.limit("100/day", deduct_when=lambda response: response.status_code < 400)
def is_available(name:str):
def is_available(name):
name=name.strip()
@ -831,14 +830,14 @@ def user_id(v, id, path=''):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def redditor_moment_redirect(v:User, username:str):
def redditor_moment_redirect(v, username):
return redirect(f"/@{username}")
@app.get("/@<username>/blockers")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def blockers(v:User, username:str):
def blockers(v, username):
u = get_user(username, v=v)
page = get_page()
@ -857,7 +856,7 @@ def blockers(v:User, username:str):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def blocking(v:User, username:str):
def blocking(v, username):
u = get_user(username, v=v)
page = get_page()
@ -876,7 +875,7 @@ def blocking(v:User, username:str):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def followers(v:User, username:str):
def followers(v, username):
u = get_user(username, v=v)
if not (v.id == u.id or v.admin_level >= PERMS['USER_FOLLOWS_VISIBLE']):
@ -898,7 +897,7 @@ def followers(v:User, username:str):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def following(v:User, username:str):
def following(v, username):
u = get_user(username, v=v)
if not (v.id == u.id or v.admin_level >= PERMS['USER_FOLLOWS_VISIBLE']):
abort(403)
@ -919,7 +918,7 @@ def following(v:User, username:str):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def visitors(v:User, username:str):
def visitors(v, username):
u = get_user(username, v=v)
page = get_page()
@ -931,7 +930,7 @@ def visitors(v:User, username:str):
return render_template("userpage/views.html", v=v, u=u, views=views, total=total, page=page)
@cache.memoize()
def userpagelisting(user:User, v=None, page:int=1, sort="new", t="all"):
def userpagelisting(user, v=None, page=1, sort="new", t="all"):
posts = g.db.query(Post).filter_by(author_id=user.id, is_pinned=False).options(load_only(Post.id))
if not (v and (v.admin_level >= PERMS['POST_COMMENT_MODERATION'] or v.id == user.id)):
posts = posts.filter_by(is_banned=False, private=False, ghost=False, deleted_utc=0)
@ -944,7 +943,7 @@ def userpagelisting(user:User, v=None, page:int=1, sort="new", t="all"):
@app.get("/@<username>")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_required
def u_username_wall(v:Optional[User], username:str):
def u_username_wall(v, username):
u = get_user(username, v=v, include_blocks=True)
if username != u.username:
return redirect(f"/@{u.username}")
@ -989,7 +988,7 @@ def u_username_wall(v:Optional[User], username:str):
@app.get("/@<username>/wall/comment/<int:cid>")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_required
def u_username_wall_comment(v:User, username:str, cid):
def u_username_wall_comment(v, username, cid):
comment = get_comment(cid, v=v)
if not comment.wall_user_id: abort(400)
if not User.can_see(v, comment): abort(403)
@ -1031,7 +1030,7 @@ def u_username_wall_comment(v:User, username:str, cid):
@app.get("/@<username>/posts")
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@auth_required
def u_username(v:Optional[User], username:str):
def u_username(v, username):
u = get_user(username, v=v, include_blocks=True)
if username != u.username:
return redirect(SITE_FULL + request.full_path.replace(username, u.username))
@ -1284,7 +1283,7 @@ def user_profile_name(username):
return redirect(get_profile_picture(username))
def get_saves_and_subscribes(v, template, relationship_cls, page:int, standalone=False):
def get_saves_and_subscribes(v, template, relationship_cls, page, standalone=False):
if relationship_cls in {SaveRelationship, Subscription}:
query = relationship_cls.post_id
join = relationship_cls.post
@ -1322,7 +1321,7 @@ def get_saves_and_subscribes(v, template, relationship_cls, page:int, standalone
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def saved_posts(v:User, username):
def saved_posts(v, username):
page = get_page()
return get_saves_and_subscribes(v, "userpage/posts.html", SaveRelationship, page, False)
@ -1331,7 +1330,7 @@ def saved_posts(v:User, username):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def saved_comments(v:User, username):
def saved_comments(v, username):
page = get_page()
return get_saves_and_subscribes(v, "userpage/comments.html", CommentSaveRelationship, page, True)
@ -1340,7 +1339,7 @@ def saved_comments(v:User, username):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def subscribed_posts(v:User, username):
def subscribed_posts(v, username):
page = get_page()
return get_saves_and_subscribes(v, "userpage/posts.html", Subscription, page, False)
@ -1351,7 +1350,7 @@ def subscribed_posts(v:User, username):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def fp(v:User, fp):
def fp(v, fp):
if session.get("GLOBAL"):
return '', 204
@ -1389,7 +1388,7 @@ def toggle_pins(sub, sort):
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def bid_list(v:User, bid):
def bid_list(v, bid):
try: bid = int(bid)
except: abort(400)
@ -1493,7 +1492,7 @@ def gumroad():
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400)
@limiter.limit(DEFAULT_RATELIMIT, deduct_when=lambda response: response.status_code < 400, key_func=get_ID)
@auth_required
def settings_claim_rewards(v:User):
def settings_claim_rewards(v):
if not (v.email and v.is_activated):
abort(400, f"You must have a verified email to verify {patron} status and claim your rewards!")