forked from rDrama/rDrama
Implement Deux login with rDrama challenge.
parent
63ed000b0b
commit
88108594ad
|
@ -1010,6 +1010,7 @@ DISCORD_BOT_TOKEN = environ.get("DISCORD_BOT_TOKEN",'').strip()
|
|||
DISCORD_AUTH = environ.get("DISCORD_AUTH",'').strip()
|
||||
GIPHY_KEY = environ.get('GIPHY_KEY').strip()
|
||||
MASTER_KEY = environ.get("MASTER_KEY")
|
||||
TRUSTED_SERVER_PSK = environ.get("TRUSTED_SERVER_PSK", None)
|
||||
FP = environ.get("FP")
|
||||
|
||||
if SITE_NAME in ('Cringetopia', 'WPD'): PATRON_DEFAULT = 7
|
||||
|
|
|
@ -4,6 +4,7 @@ from files.helpers.const import *
|
|||
from files.helpers.get import *
|
||||
from files.__main__ import db_session, limiter
|
||||
from random import randint
|
||||
import functools
|
||||
import user_agents
|
||||
|
||||
def get_logged_in_user():
|
||||
|
@ -107,6 +108,23 @@ def auth_required(f):
|
|||
wrapper.__name__ = f.__name__
|
||||
return wrapper
|
||||
|
||||
def auth_trusted_server(func):
|
||||
@functools.wraps(func)
|
||||
def inner(*args, **kwargs):
|
||||
if not TRUSTED_SERVER_PSK: abort(401)
|
||||
|
||||
auth = request.headers.get("Authorization", None)
|
||||
if not auth: abort(401)
|
||||
|
||||
auth_words = auth.split(' ')
|
||||
if len(auth_words) != 2 or auth_words[0] != 'TrustedServer':
|
||||
abort(401)
|
||||
|
||||
if not auth_words[1] == TRUSTED_SERVER_PSK:
|
||||
abort(403)
|
||||
|
||||
return make_response(func(*args, **kwargs))
|
||||
return inner
|
||||
|
||||
def is_not_permabanned(f):
|
||||
|
||||
|
@ -156,4 +174,4 @@ def lottery_required(f):
|
|||
return make_response(f(v=v))
|
||||
|
||||
wrapper.__name__ = f.__name__
|
||||
return wrapper
|
||||
return wrapper
|
||||
|
|
|
@ -6,6 +6,7 @@ from files.helpers.regex import *
|
|||
from files.helpers.actions import *
|
||||
from files.helpers.get import *
|
||||
import requests
|
||||
import secrets
|
||||
|
||||
@app.get("/login")
|
||||
@auth_desired
|
||||
|
@ -135,21 +136,79 @@ def login_post():
|
|||
hash=hash,
|
||||
failed=True,
|
||||
)
|
||||
|
||||
else:
|
||||
abort(400)
|
||||
|
||||
on_login(account)
|
||||
|
||||
redir = request.values.get("redirect")
|
||||
if redir:
|
||||
redir = redir.replace("/logged_out", "").strip()
|
||||
if is_site_url(redir): return redirect(redir)
|
||||
return redirect('/')
|
||||
|
||||
def on_login(account, redir=None):
|
||||
session["lo_user"] = account.id
|
||||
session["login_nonce"] = account.login_nonce
|
||||
if account.id == AEVANN_ID: session["verified"] = time.time()
|
||||
|
||||
check_for_alts(account.id)
|
||||
|
||||
@app.get("/loginshared/auth/<site_for>")
|
||||
@auth_required
|
||||
def loginshared_authenticate(v, site_for):
|
||||
# Despite providing an interface for general site_for for forward-compat,
|
||||
# loginshared_* is only designed at present for login on Deux using rDrama.
|
||||
if not (SITE == 'rdrama.net' and site_for == 'deuxrama.net'):
|
||||
abort(403)
|
||||
|
||||
token = loginshared_secret_token(site_for, v.id)
|
||||
|
||||
# Must be https! Downgrading security leaks secrets in query string.
|
||||
redirect_url = f'https://deuxrama.net/loginshared/verify/' \
|
||||
+ f'rdrama.net/{v.id}/{token}'
|
||||
return redirect(redirect_url)
|
||||
|
||||
@app.get("/loginshared/secret/<site_for>/<user_id>")
|
||||
@auth_trusted_server
|
||||
def loginshared_secret(site_for, user_id):
|
||||
if not (SITE == 'rdrama.net' and site_for == 'deuxrama.net'):
|
||||
abort(403)
|
||||
|
||||
return loginshared_secret_token(site_for, user_id)
|
||||
|
||||
def loginshared_secret_token(site_for, user_id):
|
||||
cache_key = f'loginshared_secret_token:{site_for}:{user_id}'
|
||||
|
||||
token = cache.get(cache_key)
|
||||
if token is None:
|
||||
token = secrets.token_urlsafe(32)
|
||||
cache.set(cache_key, token, timeout=15)
|
||||
|
||||
return token
|
||||
|
||||
@app.get("/loginshared/verify/<site_from>/<user_id>/<token>")
|
||||
def loginshared_verify(site_from, user_id, token):
|
||||
if not TRUSTED_SERVER_PSK:
|
||||
abort(403)
|
||||
if not (SITE == 'deuxrama.net' and site_from == 'rdrama.net'):
|
||||
abort(403)
|
||||
|
||||
provider_url = f'https://rdrama.net/loginshared/secret/deuxrama.net/{user_id}'
|
||||
provider_auth = f'TrustedServer {TRUSTED_SERVER_PSK}'
|
||||
provider_resp = requests.get(provider_url,
|
||||
headers={'Authorization': provider_auth},
|
||||
timeout=5)
|
||||
if provider_resp.status_code != 200:
|
||||
abort(500)
|
||||
|
||||
if provider_resp.text != token:
|
||||
time.sleep(random.uniform(0, 2))
|
||||
return render_template("login.html", failed=True)
|
||||
|
||||
account = get_account(user_id)
|
||||
on_login(account)
|
||||
|
||||
redir = request.values.get("redirect")
|
||||
if redir:
|
||||
redir = redir.replace("/logged_out", "").strip()
|
||||
if is_site_url(redir): return redirect(redir)
|
||||
return redirect('/')
|
||||
|
||||
@app.get("/me")
|
||||
|
|
Loading…
Reference in New Issue