rDrama/files/helpers/get.py

372 lines
9.0 KiB
Python

from flask import *
from sqlalchemy import and_, any_, or_
from sqlalchemy.orm import joinedload, Query, load_only
from files.classes import Comment, CommentVote, Hat, Hole, Post, User, UserBlock, Vote
from files.helpers.config.const import *
from files.__main__ import cache
# Escape SQL pattern-matching special characters
def escape_for_search(string):
return string.replace('\\', '').replace('_', '\_').replace('%', '\%').strip()
def sanitize_username(username):
username = username.lstrip('@').replace('(', '').replace(')', '')
username = escape_for_search(username)
return username
def get_user(username, v=None, graceful=False, include_blocks=False, attributes=None):
if not username:
if graceful: return None
abort(400, "Empty username.")
search_name = sanitize_username(username)
if not search_name:
if graceful: return None
abort(400, "Empty username.")
user = g.db.query(
User
).filter(
or_(
User.username.ilike(search_name),
User.original_username.ilike(search_name),
User.extra_username.ilike(search_name),
User.prelock_username.ilike(search_name),
)
)
if attributes:
user = user.options(load_only(*attributes))
user = user.one_or_none()
if not user:
if graceful: return None
abort(404, f"A user with the name '{username}' was not found!")
if v and include_blocks:
user = add_block_props(user, v)
return user
def get_users(usernames, ids_only=False, graceful=False):
if not usernames: return []
usernames = [sanitize_username(n) for n in usernames]
if not any(usernames):
if graceful and len(usernames) == 0: return []
abort(400, "Empty usernames.")
if ids_only:
users = g.db.query(User.id)
else:
users = g.db.query(User)
users = users.filter(
or_(
User.username.ilike(any_(usernames)),
User.original_username.ilike(any_(usernames)),
User.extra_username.ilike(any_(usernames)),
User.prelock_username.ilike(any_(usernames)),
)
).all()
if len(users) != len(usernames) and not graceful:
abort(404, "Users not found.")
if ids_only:
users = [x[0] for x in users]
return users
def get_account(id, v=None, graceful=False, include_blocks=False):
try:
id = int(id)
except:
if graceful: return None
abort(400, "User ID needs to be an integer.")
user = g.db.get(User, id)
if not user:
if not graceful: abort(404, "User not found.")
else: return None
if include_blocks:
user = add_block_props(user, v)
return user
def get_accounts_dict(ids, v=None, graceful=False):
if not ids: return {}
try:
ids = set(int(id) for id in ids)
except:
if graceful: return None
abort(400, "User IDs need to be an integer.")
users = g.db.query(User).filter(User.id.in_(ids))
users = users.all()
if len(users) != len(ids) and not graceful:
abort(404, "Users not found.")
return {u.id:u for u in users}
def get_post(i, v=None, graceful=False):
try: i = int(i)
except:
if graceful: return None
else: abort(400, "Post ID needs to be an integer.")
if not i:
if graceful: return None
else: abort(400, "Empty post ID.")
if v:
vt = g.db.query(Vote).filter_by(user_id=v.id, post_id=i).subquery()
blocking = v.blocking.subquery()
post = g.db.query(
Post,
vt.c.vote_type,
blocking.c.target_id,
)
post = post.filter(Post.id == i
).outerjoin(
vt,
vt.c.post_id == Post.id,
).outerjoin(
blocking,
blocking.c.target_id == Post.author_id,
)
post = post.one_or_none()
if not post:
if graceful: return None
else: abort(404, "Post not found.")
x = post[0]
x.voted = post[1] or 0
x.is_blocking = post[2] or 0
else:
post = g.db.get(Post, i)
if not post:
if graceful: return None
else: abort(404, "Post not found.")
x=post
return x
def get_posts(pids, v=None, extra=None):
if not pids: return []
if v:
vt = g.db.query(Vote.vote_type, Vote.post_id).filter(
Vote.post_id.in_(pids),
Vote.user_id==v.id
).subquery()
blocking = v.blocking.subquery()
blocked = v.blocked.subquery()
query = g.db.query(
Post,
vt.c.vote_type,
blocking.c.target_id,
blocked.c.target_id,
).filter(
Post.id.in_(pids)
).outerjoin(
vt, vt.c.post_id==Post.id
).outerjoin(
blocking,
blocking.c.target_id == Post.author_id,
).outerjoin(
blocked,
blocked.c.user_id == Post.author_id,
)
else:
query = g.db.query(Post).filter(Post.id.in_(pids))
if extra: query = extra(query)
results = query.all()
if v:
output = [p[0] for p in results]
for i in range(len(output)):
output[i].voted = results[i][1] or 0
output[i].is_blocking = results[i][2] or 0
output[i].is_blocked = results[i][3] or 0
else:
output = results
return sorted(output, key=lambda x: pids.index(x.id))
def get_comment(i, v=None, graceful=False):
try: i = int(i)
except:
if graceful: return None
abort(404, "Comment ID needs to be an integer.")
if not i:
if graceful: return None
else: abort(404, "Empty comment ID.")
comment = g.db.get(Comment, i)
if not comment:
if graceful: return None
else: abort(404, "Comment not found.")
return add_vote_and_block_props(comment, v, CommentVote)
def add_block_props(target, v):
if not v: return target
id = None
if any(isinstance(target, cls) for cls in {Post, Comment}):
id = target.author_id
elif isinstance(target, User):
id = target.id
else:
raise TypeError("add_block_props only supports non-None posts, comments, and users")
if hasattr(target, 'is_blocking') and hasattr(target, 'is_blocked'):
return target
if v.id == id or id == AUTOJANNY_ID: # users can't block or be blocked by themselves or AutoJanny
target.is_blocking = False
target.is_blocked = False
return target
block = g.db.query(UserBlock).filter(
or_(
and_(
UserBlock.user_id == v.id,
UserBlock.target_id == id
),
and_(
UserBlock.user_id == id,
UserBlock.target_id == v.id
)
)
).first()
target.is_blocking = block and block.user_id == v.id
target.is_blocked = block and block.target_id == v.id
return target
def add_vote_props(target, v, vote_cls):
if hasattr(target, 'voted'): return target
vt = g.db.query(vote_cls.vote_type).filter_by(user_id=v.id)
if vote_cls == Vote:
vt = vt.filter_by(post_id=target.id)
elif vote_cls == CommentVote:
vt = vt.filter_by(comment_id=target.id)
else:
vt = None
if vt: vt = vt.one_or_none()
target.voted = vt.vote_type if vt else 0
return target
def add_vote_and_block_props(target, v, vote_cls):
if not v: return target
target = add_block_props(target, v)
return add_vote_props(target, v, vote_cls)
def get_comments(cids, v=None, extra=None):
if not cids: return []
if v:
output = get_comments_v_properties(v, None, Comment.id.in_(cids))[1]
else:
output = g.db.query(Comment).join(Comment.author)
if extra: output = extra(output)
output = output.filter(Comment.id.in_(cids)).all()
return sorted(output, key=lambda x: cids.index(x.id))
def get_comments_v_properties(v, should_keep_func=None, *criterion):
if not v:
raise TypeError("A user is required")
votes = g.db.query(CommentVote.vote_type, CommentVote.comment_id).filter_by(user_id=v.id).subquery()
blocking = v.blocking.subquery()
blocked = v.blocked.subquery()
comments = g.db.query(
Comment,
votes.c.vote_type,
blocking.c.target_id,
blocked.c.target_id,
)
comments = comments.filter(*criterion)
comments = comments.outerjoin(
votes,
votes.c.comment_id == Comment.id,
).outerjoin(
blocking,
blocking.c.target_id == Comment.author_id,
).outerjoin(
blocked,
blocked.c.user_id == Comment.author_id,
)
queried = comments.all()
output = []
dump = []
for c in queried:
comment = c[0]
comment.voted = c[1] or 0
comment.is_blocking = c[2] or 0
comment.is_blocked = c[3] or 0
if not should_keep_func or should_keep_func(c[0]): output.append(comment)
else: dump.append(comment)
return (comments, output)
def get_hole(hole_name, v=None, graceful=False):
if not hole_name:
if graceful: return None
else: abort(404, f"/h/{hole_name} was not found.")
hole_name = hole_name.replace('/h/', '').replace('h/', '').strip().lower()
if not hole_name:
if graceful: return None
else: abort(404, f"/h/{hole_name} was not found.")
hole = g.db.get(Hole, hole_name)
if not hole:
if graceful: return None
else: abort(404, f"/h/{hole_name} was not found.")
return hole
@cache.memoize(timeout=3600)
def get_profile_picture(identifier):
if isinstance(identifier, int):
x = get_account(identifier, graceful=True)
else:
x = get_user(identifier, graceful=True)
return x.profile_url if x else 'not_found'
def get_msg():
if request.referrer and request.referrer.split('?')[0] == request.base_url:
return request.values.get("msg")
else:
return None
def get_error():
if request.referrer and request.referrer.split('?')[0] == request.base_url:
return request.values.get("error")
else:
return None
def get_page():
try: return max(int(request.values.get("page", 1)), 1)
except: return 1
def get_obj_hole(obj):
if isinstance(obj, Comment):
if obj.parent_post:
obj.hole = g.db.query(Post.hole).filter_by(id=obj.parent_post).one()[0]
else:
obj.hole = None
return obj.hole