rDrama/files/routes/chat.py

153 lines
3.6 KiB
Python

import time
from files.helpers.jinja2 import timestamp
from files.helpers.wrappers import *
from files.helpers.sanitize import sanitize
from files.helpers.const import *
from files.helpers.alerts import *
from files.helpers.regex import *
from datetime import datetime
from flask_socketio import SocketIO, emit
from files.__main__ import app, limiter, cache
from flask import render_template, make_response, send_from_directory
import sys
import atexit
if SITE == 'localhost':
socketio = SocketIO(
app,
async_mode='gevent',
logger=True,
engineio_logger=True,
debug=True
)
else:
socketio = SocketIO(
app,
async_mode='gevent',
)
typing = []
online = []
cache.set(ONLINE_STR, len(online), timeout=0)
muted = cache.get(f'{SITE}_muted') or {}
messages = cache.get(f'{SITE}_chat') or []
total = cache.get(f'{SITE}_total') or 0
@app.get("/chat")
@is_not_permabanned
def chat(v):
return render_template("chat.html", v=v, messages=messages)
@socketio.on('speak')
@limiter.limit("3/second;10/minute")
@limiter.limit("3/second;10/minute", key_func=lambda:f'{SITE}-{session.get("lo_user")}')
@is_not_permabanned
def speak(data, v):
if v.is_banned: return '', 403
vname = v.username.lower()
if vname in muted:
if time.time() < muted[vname]: return '', 403
else: del muted[vname]
global messages, total
if SITE == 'rdrama.net': text = data[:200].strip()
else: text = data[:1000].strip()
if not text: return '', 403
text_html = sanitize(text, count_marseys=True)
time_number = int(time.time())
time_string = timestamp(time_number)
data={
"avatar": v.profile_url,
"hat": v.hat_active,
"username": v.username,
"namecolor": v.name_color,
"text": text,
"text_html": text_html,
"text_censored": censor_slurs(text_html, 'chat'),
"time": time_number,
"timestamp": time_string
}
if v.shadowbanned:
emit('speak', data)
elif blackjack and any(i in text.lower() for i in blackjack.split()):
emit('speak', data)
v.shadowbanned = 'AutoJanny'
g.db.add(v)
send_repeatable_notification(CARP_ID, f"{v.username} has been shadowbanned because of a chat message.")
else:
emit('speak', data, broadcast=True)
messages.append(data)
messages = messages[-100:]
total += 1
if v.admin_level > 1:
text = text.lower()
for i in mute_regex.finditer(text):
username = i.group(1).lower()
duration = int(int(i.group(2)) * 60 + time.time())
muted[username] = duration
typing = []
return '', 204
@socketio.on('connect')
@is_not_permabanned
def connect(v):
if v.username not in online:
online.append(v.username)
emit("online", online, broadcast=True)
cache.set(ONLINE_STR, len(online), timeout=0)
emit('online', online)
emit('catchup', messages)
emit('typing', typing)
return '', 204
@socketio.on('disconnect')
@is_not_permabanned
def disconnect(v):
if v.username in online:
online.remove(v.username)
emit("online", online, broadcast=True)
cache.set(ONLINE_STR, len(online), timeout=0)
if v.username in typing: typing.remove(v.username)
emit('typing', typing, broadcast=True)
return '', 204
@socketio.on('typing')
@is_not_permabanned
def typing_indicator(data, v):
if data and v.username not in typing: typing.append(v.username)
elif not data and v.username in typing: typing.remove(v.username)
emit('typing', typing, broadcast=True)
return '', 204
@socketio.on('delete')
@admin_level_required(2)
def delete(text, v):
for message in messages:
if message['text'] == text:
messages.remove(message)
emit('delete', text, broadcast=True)
return '', 204
def close_running_threads():
cache.set(f'{SITE}_chat', messages)
cache.set(f'{SITE}_total', total)
cache.set(f'{SITE}_muted', muted)
atexit.register(close_running_threads)