rDrama/files/helpers/get.py

338 lines
8.5 KiB
Python
Raw Normal View History

from typing import Callable, Iterable, List, Optional, Union
from sqlalchemy.orm import joinedload, selectinload
2022-05-04 23:09:46 +00:00
from files.classes import *
from flask import g
def sanitize_username(username:str) -> str:
if not username: return username
return username.replace('\\', '').replace('_', '\_').replace('%', '').replace('(', '').replace(')', '').strip()
2022-10-28 00:44:34 +00:00
def get_id(username:str, graceful=False) -> Optional[int]:
username = sanitize_username(username)
if not username:
if graceful: return None
abort(404)
2022-05-04 23:09:46 +00:00
user = g.db.query(
User.id
).filter(
or_(
User.username.ilike(username),
User.original_username.ilike(username)
)
).one_or_none()
2022-10-01 05:15:56 +00:00
if not user:
if graceful: return None
abort(404)
2022-05-04 23:09:46 +00:00
return user[0]
account linking improvements (#448) currently account delinking is very messy and can sometimes just not work we do codey stuff so it's not as bad also we create a pretty page for mops to mop up borked account links * alts: allow proper delinking * fix prev commit * url fix * fix 500 * fixes * :pepodrool: * flag * :pepodrool: redux * sdsdsdsds * correct endpoint * fix html page * alts: only adjust session history if flag is set * fix 500 * allow relinking * fsdsds * :pepodrool: redux * alts: don't fail if an alt isn't history * use postToastSwitch + some API changes * remove unnecessary variables * d-none * delink accounts mod action * fa-link-slash * alts: add form to create alt * remove copied and pasted template * rounded section * UI improvement + fix * \n * fix status * admin: remove duplicate route admin: do a permissions check on 2 pages that need it admin: set the manual flag for manually flagged alts * variable change * fix 500 * alts * add shadowban icon to alt link tool * shadowbanned tooltip * add user info section * fix 500, remove unnecessary form, and add alt votes button * trans and also link to page * margin * sdsdsd * stop the count * fix prev commit * with ctx * plural * alts * don't show shadowbanned users to those who can't see them this is... extremely rare and won't ever be seen in production however if perms were ever rearranged in the future, this keeps permissions correct * shadowban check in alt list * let shadow realm enthusiasts see shadowban alts * sdsdsds * test * be graceful where needed * sdsdsdsds * alts: don't allow adding the same account alts: clarify wording * rename and reorder on admin panel * EOL * remove frankly unnecessary check * try with a set * test * Revert "try with a set" This reverts commit 72be353fba5ffa39b37590cc5d3bf584c94ee06e. * Revert "Revert "try with a set"" This reverts commit 81e41890a192e8b46d0463477998e905fddf56ba. * Revert "Revert "Revert "try with a set""" This reverts commit be51592135a3c09848f993f0154bd2ac862ae505. * clean up test
2022-11-14 17:32:13 +00:00
def get_user(username:Optional[str], v:Optional[User]=None, graceful=False, include_blocks=False, include_shadowbanned=True) -> Optional[User]:
2022-05-04 23:09:46 +00:00
if not username:
if graceful: return None
abort(404)
2022-05-04 23:09:46 +00:00
username = sanitize_username(username)
if not username:
if graceful: return None
abort(404)
2022-05-04 23:09:46 +00:00
user = g.db.query(
User
).filter(
or_(
User.username.ilike(username),
User.original_username.ilike(username)
)
2022-07-03 11:56:40 +00:00
)
user = user.one_or_none()
2022-05-04 23:09:46 +00:00
2022-10-12 06:53:32 +00:00
if not user or (user.shadowbanned and not (include_shadowbanned or (v and v.can_see_shadowbanned))):
if graceful: return None
abort(404)
2022-05-04 23:09:46 +00:00
2022-11-02 20:22:23 +00:00
if v and include_blocks:
user = add_block_props(user, v)
2022-05-04 23:09:46 +00:00
return user
def get_users(usernames:Iterable[str], graceful=False) -> List[User]:
if not usernames: return []
usernames = [sanitize_username(n) for n in usernames]
if not any(usernames):
if graceful and len(usernames) == 0: return []
abort(404)
users = g.db.query(User).filter(
or_(
2022-06-17 22:03:23 +00:00
User.username.ilike(any_(usernames)),
User.original_username.ilike(any_(usernames))
)
).all()
if len(users) != len(usernames) and not graceful:
abort(404)
return users
def get_account(id:Union[str, int], v:Optional[User]=None, graceful=False, include_blocks=False, include_shadowbanned=True) -> Optional[User]:
try:
id = int(id)
except:
if graceful: return None
abort(404)
2022-05-04 23:09:46 +00:00
user = g.db.get(User, id)
2022-09-12 07:17:57 +00:00
2022-10-12 06:53:32 +00:00
if not user or (user.shadowbanned and not (include_shadowbanned or (v and v.can_see_shadowbanned))):
2022-09-12 07:17:57 +00:00
if not graceful: abort(404)
else: return None
2022-05-04 23:09:46 +00:00
if include_blocks:
user = add_block_props(user, v)
2022-05-04 23:09:46 +00:00
return user
def get_post(i:Union[str, int], v:Optional[User]=None, graceful=False) -> Optional[Submission]:
2022-07-01 23:11:48 +00:00
try: i = int(i)
except:
if graceful: return None
else: abort(404)
2022-07-01 23:11:48 +00:00
2022-06-24 13:19:53 +00:00
if not i:
if graceful: return None
else: abort(404)
2022-05-04 23:09:46 +00:00
if v:
vt = g.db.query(Vote).filter_by(user_id=v.id, submission_id=i).subquery()
2022-05-04 23:09:46 +00:00
blocking = v.blocking.subquery()
2022-06-24 13:19:53 +00:00
post = g.db.query(
2022-05-04 23:09:46 +00:00
Submission,
vt.c.vote_type,
blocking.c.target_id,
)
2022-06-24 13:19:53 +00:00
post=post.filter(Submission.id == i
2022-05-04 23:09:46 +00:00
).join(
vt,
vt.c.submission_id == Submission.id,
isouter=True
).join(
blocking,
blocking.c.target_id == Submission.author_id,
isouter=True
)
2022-06-24 13:19:53 +00:00
post=post.one_or_none()
2022-05-04 23:09:46 +00:00
2022-06-24 13:19:53 +00:00
if not post:
2022-05-04 23:09:46 +00:00
if graceful: return None
else: abort(404)
2022-06-24 13:19:53 +00:00
x = post[0]
x.voted = post[1] or 0
x.is_blocking = post[2] or 0
2022-05-04 23:09:46 +00:00
else:
2022-06-24 13:19:53 +00:00
post = g.db.get(Submission, i)
if not post:
2022-05-04 23:09:46 +00:00
if graceful: return None
else: abort(404)
2022-06-24 13:19:53 +00:00
x=post
2022-05-04 23:09:46 +00:00
return x
def get_posts(pids:Iterable[int], v:Optional[User]=None, eager:bool=False) -> List[Submission]:
if not pids: return []
2022-05-04 23:09:46 +00:00
if v:
vt = g.db.query(Vote.vote_type, Vote.submission_id).filter(
2022-05-04 23:09:46 +00:00
Vote.submission_id.in_(pids),
Vote.user_id==v.id
).subquery()
blocking = v.blocking.subquery()
blocked = v.blocked.subquery()
query = g.db.query(
Submission,
vt.c.vote_type,
blocking.c.target_id,
blocked.c.target_id,
).filter(
Submission.id.in_(pids)
).join(
vt, vt.c.submission_id==Submission.id, isouter=True
).join(
blocking,
blocking.c.target_id == Submission.author_id,
isouter=True
).join(
blocked,
blocked.c.user_id == Submission.author_id,
isouter=True
)
else:
query = g.db.query(Submission).filter(Submission.id.in_(pids))
if eager:
query = query.options(
selectinload(Submission.author).options(
selectinload(User.hats_equipped.and_(Hat.equipped == True)) \
.joinedload(Hat.hat_def, innerjoin=True),
2022-11-11 18:01:23 +00:00
selectinload(User.badges),
selectinload(User.sub_mods),
selectinload(User.sub_exiles),
),
selectinload(Submission.flags),
selectinload(Submission.awards),
selectinload(Submission.options),
)
results = query.all()
2022-05-04 23:09:46 +00:00
if v:
output = [p[0] for p in results]
2022-05-04 23:09:46 +00:00
for i in range(len(output)):
output[i].voted = results[i][1] or 0
output[i].is_blocking = results[i][2] or 0
output[i].is_blocked = results[i][3] or 0
2022-05-04 23:09:46 +00:00
else:
output = results
2022-05-04 23:09:46 +00:00
return sorted(output, key=lambda x: pids.index(x.id))
def get_comment(i:Union[str, int], v:Optional[User]=None, graceful=False) -> Optional[Comment]:
2022-07-01 23:11:48 +00:00
try: i = int(i)
except:
if graceful: return None
abort(404)
2022-07-01 23:11:48 +00:00
2022-06-24 13:19:53 +00:00
if not i:
if graceful: return None
else: abort(404)
2022-05-04 23:09:46 +00:00
2022-06-24 13:19:53 +00:00
comment=g.db.get(Comment, i)
if not comment:
if graceful: return None
else: abort(404)
2022-05-04 23:09:46 +00:00
return add_vote_and_block_props(comment, v, CommentVote)
def add_block_props(target:Union[Submission, Comment, User], v:Optional[User]):
2022-10-28 23:39:31 +00:00
if not v: return target
id = None
if any(isinstance(target, cls) for cls in [Submission, Comment]):
id = target.author_id
elif isinstance(target, User):
id = target.id
else:
raise TypeError("add_block_props only supports non-None submissions, comments, and users")
if hasattr(target, 'is_blocking') and hasattr(target, 'is_blocked'):
return target
if v.id == id or id == AUTOJANNY_ID: # users can't block or be blocked by themselves or AutoJanny
target.is_blocking = False
target.is_blocked = False
return target
block = g.db.query(UserBlock).filter(
or_(
and_(
UserBlock.user_id == v.id,
UserBlock.target_id == id
),
and_(
UserBlock.user_id == id,
UserBlock.target_id == v.id
)
)
).first()
target.is_blocking = block and block.user_id == v.id
target.is_blocked = block and block.target_id == v.id
return target
def add_vote_props(target:Union[Submission, Comment], v:Optional[User], vote_cls):
2022-10-28 23:39:31 +00:00
if hasattr(target, 'voted'): return target
vt = g.db.query(vote_cls.vote_type).filter_by(user_id=v.id)
if vote_cls == Vote:
vt = vt.filter_by(submission_id=target.id)
elif vote_cls == CommentVote:
vt = vt.filter_by(comment_id=target.id)
else:
vt = None
if vt: vt = vt.one_or_none()
target.voted = vt.vote_type if vt else 0
return target
def add_vote_and_block_props(target:Union[Submission, Comment], v:Optional[User], vote_cls):
2022-10-28 23:39:31 +00:00
if not v: return target
target = add_block_props(target, v)
return add_vote_props(target, v, vote_cls)
2022-05-04 23:09:46 +00:00
def get_comments(cids:Iterable[int], v:Optional[User]=None) -> List[Comment]:
2022-05-04 23:09:46 +00:00
if not cids: return []
if v:
output = get_comments_v_properties(v, True, None, Comment.id.in_(cids))[1]
2022-05-04 23:09:46 +00:00
else:
2022-07-03 06:12:53 +00:00
output = g.db.query(Comment).join(Comment.author).filter(User.shadowbanned == None, Comment.id.in_(cids)).all()
2022-05-04 23:09:46 +00:00
return sorted(output, key=lambda x: cids.index(x.id))
def get_comments_v_properties(v:User, include_shadowbanned=True, should_keep_func:Optional[Callable[[Comment], bool]]=None, *criterion):
if not v:
raise TypeError("A user is required")
votes = g.db.query(CommentVote.vote_type, CommentVote.comment_id).filter_by(user_id=v.id).subquery()
blocking = v.blocking.subquery()
blocked = v.blocked.subquery()
comments = g.db.query(
Comment,
votes.c.vote_type,
blocking.c.target_id,
blocked.c.target_id,
)
if not include_shadowbanned and not v.can_see_shadowbanned:
comments = comments.join(Comment.author).filter(User.shadowbanned == None)
comments = comments.filter(*criterion)
comments = comments.join(
votes,
votes.c.comment_id == Comment.id,
isouter=True
).join(
blocking,
blocking.c.target_id == Comment.author_id,
isouter=True
).join(
blocked,
blocked.c.user_id == Comment.author_id,
isouter=True
)
queried = comments.all()
output = []
dump = []
for c in queried:
comment = c[0]
comment.voted = c[1] or 0
comment.is_blocking = c[2] or 0
comment.is_blocked = c[3] or 0
if not should_keep_func or should_keep_func(c[0]): output.append(comment)
else: dump.append(comment)
return (comments, output)
2022-10-28 00:44:34 +00:00
def get_sub_by_name(sub:str, v:Optional[User]=None, graceful=False) -> Optional[Sub]:
2022-10-05 10:30:44 +00:00
if not sub:
if graceful: return None
else: abort(404)
sub = sub.replace('/h/', '').strip().lower()
if not sub:
if graceful: return None
else: abort(404)
sub = g.db.get(Sub, sub)
if not sub:
if graceful: return None
else: abort(404)
return sub