bbbb/bbbb.py

518 lines
22 KiB
Python

from difflib import SequenceMatcher
import json
from lib2to3.pytree import Base
from numpy import roll
import traceback
import openai
from RDramaAPIInterface import RDramaAPIInterface
import pprint
from os.path import exists, join, realpath, split
import random
from markdown import markdown
import re
from bs4 import BeautifulSoup
from better_profanity import profanity
import time
from datetime import datetime
from BBBB_Models import Base, Comment, OpenAIToken, Post, User
from sqlalchemy.orm import Session
from threading import Timer
import os
from sqlalchemy import create_engine
TEST_MODE = False
TIME_NORMALIZATION_MODE = True
MAX_INPUT_TOKENS = 512
random.seed(time.time())
pp = pprint.PrettyPrinter()
NAUGHTY_WORDS = [
"faggot","fag","nigger","kike","spic","gook"]
SNARKY_MARSIES = [
'marseysneed',
'marseyseethe',
'marseyeyeroll',
'marseycope',
'marseyl',
'marseybrainlet',
'marseymalding',
'soyquack',
'soymad',
'soycry',
'seethejak',
]
SM_THRESHOLD = 0.6
BBBB_ID = 12125
BBBB_USERNAME = "bbbb"
OPERATOR_ID = 3635
PAGES_TO_SCAN = 5
COMMENT_MEMORY = 100
ALLOWED_COMMENTS_PER_USER_PER_DAY = 10
ALLOWED_COMMENTS_PER_POST = 20
ALLOWED_CONVERSATION_DEPTH = 3
MINUTES_BEFORE_FORCED_SHUTDOWN = 10
ACTUALLY_CALL_OPEN_AI = True
MIDNIGHT_PROBABILITY_MULTIPLIER = 4
AFTERNOON_MORNING_MULTIPLIER = 3
NOON_MULTIPLER = 2
profanity.load_censor_words(NAUGHTY_WORDS)
def strip_markdown(markdown_string):
markdown_string = re.sub(">.*\n", "", markdown_string)
try:
html = markdown(markdown_string)
except AttributeError:
html = markdown_string #if there is no markdown in the string you get an error
soup = BeautifulSoup(html, "html.parser")
text = ''.join(soup.findAll(text=True))
text = re.sub(r":[^ ]*:", "", text) #remove marseys
text = re.sub(r"!blackjack[^ ]*", "", text)
text = re.sub(r"fortune", "", text)
text = re.sub(r"factcheck", "", text)
text = re.sub(r"!slots", "", text)
text = re.sub(r"([^\.])\n", r"\1. ", text)
text = re.sub(r"(\.)\n", ". ", text)
text = re.sub(r"\n", "", text)
text = re.sub(r"http://[^ ]*", "", text)
text = re.sub(r"https://[^ ]*", "", text)
text = re.sub(r"\"", "'", text)
text = remove_naughty_words(text)
# make sure there are only letters in the string.
if len(set(list(text.lower())).intersection(set(["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']))) == 0:
text = ""
text = re.sub("@?bbbb is", "you are", text, flags=re.IGNORECASE)
text = re.sub("@?bbbb's", "your", text, flags=re.IGNORECASE)
text = re.sub("@?bbbb", "you", text, flags=re.IGNORECASE)
text = re.sub("@(.*?)\s", "", text)
text = re.sub("!slots.*?\s", "", text)
text = re.sub("(?i)trans lives matter", "", text)
return text
def normalize_for_quick_substring_detection(string):
string = string.lower()
string = re.sub(r"[!?>-]", ".", string)
string = ''.join([letter for letter in string if letter in ["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z', '.']])
return string
def quick_substring_detection(x, y):
x_normalize = normalize_for_quick_substring_detection(x)
y_normalize = normalize_for_quick_substring_detection(y)
if (x_normalize in y_normalize):
print(f"[QSD] entire string x \"({x_normalize})\" is in y \"({y_normalize})\"")
return True
for sentence in x_normalize.split("."):
if len(sentence) < 20:
continue
if sentence == "":
continue
if sentence in y_normalize:
print(f"[QSD] sentence \"{sentence}\" is in y (\"{y_normalize}\")")
return True
return False
def remove_duplicates(list):
return [json.loads(j) for j in set([json.dumps(i) for i in list])]
def remove_naughty_words(string):
return profanity.censor(string)
def get_real_filename(filename : str):
path_to_script = realpath(__file__)
path_to_script_directory, _ = split(path_to_script)
return join(path_to_script_directory, filename)
with open(get_real_filename("open_ai_token"), "r") as f:
openai.api_key = f.read()
def rude_reply(comment, session):
if ACTUALLY_CALL_OPEN_AI:
response = OpenAIToken.call_open_ai(f"Write an abrasive reply to this comment: \"{comment}\"", session)
else:
print("NOT Calling OPENAI...")
message = "foo"*random.randrange(1,20)
response = {
'choices': [{
'text': f"\n\n{message}"
}]
}
return response
def rude_reply_wrapper(comment, past_comments, session):
print(f"[RRW] Getting reply for \"{comment}\"")
if (len(comment.split(" ")) > MAX_INPUT_TOKENS):
print("[RRW]")
new_comment = ""
for sentence in comment.split("."):
if len(sentence) + len(new_comment) < MAX_INPUT_TOKENS:
new_comment+=sentence
else:
break
print(f"[RRW] Holy fuck that's a lot of words. New comment is \"{new_comment}\"")
comment = new_comment
valid_replies = []
short_reply = ""
for i in range(5):
reply = rude_reply(comment, session)['choices'][0]['text']
reply = reply.strip()
print(f"[RRW] Trying \"{reply}\"")
if reply[0] == "\"" and reply[-1] == "\"":
print("[RRW] (Removing the outer parens)]")
reply = reply[1:-1]
original_reply = reply
sm_ratio = SequenceMatcher(None, original_reply.lower(), comment.lower()).ratio()
if sm_ratio > SM_THRESHOLD:
print(f"[RRW] SM says too similar. ratio = {sm_ratio}")
reply = ""
if (quick_substring_detection(original_reply, comment)):
print(f"[RRW] QSD says too similar.")
reply = ""
elif len(reply.split(" ")) < 5:
print(f"[RRW] Too short")
short_reply = reply
#valid_replies.append(reply)
continue
else:
print("[RRW] Under the original system, we would accept this answer.")
valid_replies.append(reply)
if (len(valid_replies) ==0 ):
print("[RRW] No valid replies.")
if (short_reply != ""):
return short_reply
return ""
scored_replies = []
for reply in valid_replies:
high_score = 0
high_scoring_string = ""
for past_comment in past_comments:
sm_ratio = SequenceMatcher(None, reply.lower(), past_comment.lower()).ratio()
if sm_ratio > high_score:
high_score = sm_ratio
high_scoring_string = past_comment
scored_replies.append((reply, high_score))
scored_replies.sort(key=lambda a : a[1])
print(f"[RRW] returning {scored_replies[0]}")
return scored_replies[0][0]
def get_conversation_depth(comment_id : int, rdrama : RDramaAPIInterface) -> int:
depth = 0
was_last_bbbb = True
last_comment_id = comment_id
while True:
comment = rdrama.get_comment(last_comment_id)
if comment['level'] == 1:
return depth
if comment['author_name'] == BBBB_USERNAME:
was_last_bbbb = True
last_comment_id = comment['parent_comment_id']
depth += 1
elif not was_last_bbbb:
return depth
else:
was_last_bbbb = False
last_comment_id = comment['parent_comment_id']
def parse_tracking_file(filename, func):
if not exists(filename):
return []
to_return = []
with open(filename, "r") as f:
for id in f.readlines():
to_return.append(func(id))
return to_return
def parse_id_file(filename):
return parse_tracking_file(filename, lambda a : int(a))
def parse_comments_file(filename):
return parse_tracking_file(filename, lambda a : str(a).strip())
def write_tracking_file(filename, ids):
if (len(ids) > COMMENT_MEMORY):
ids = ids[0:COMMENT_MEMORY]
with open(filename, "w+") as f:
for id in ids:
f.write(f"{id}\n")
def get_rude_reply(comment_to_reply_to, is_chudded, is_pizzad, session):
past_comments = Comment.get_past_comments(session)
reply = ""
if (comment_to_reply_to == ""):
marsey = random.choice(SNARKY_MARSIES)
reply = f":{marsey}:"
else:
reply = rude_reply_wrapper(comment_to_reply_to, past_comments, session)
if reply == "":
marsey = random.choice(SNARKY_MARSIES)
reply = f":{marsey}:"
else:
past_comments = past_comments.insert(0, reply)
if is_chudded:
reply += "\n\nTRANS LIVES MATTER"
if is_pizzad:
if len(reply) < 280:
reply += "\n\n"
while len(reply) < 280: reply += 'pizza'
return reply
def do_rude_reply(rdrama, comment_id, post_id, user_id, comment_to_reply_to, is_chudded, is_pizzad, session : Session, depth = 0):
rude_reply = get_rude_reply(comment_to_reply_to, is_chudded, is_pizzad, session)
reply = rdrama.reply_to_comment_easy(comment_id, post_id, rude_reply)
reply_id = reply['id']
Comment.create_new_comment(comment_id, reply_id, depth, rude_reply, session)
Post.increment_replies(post_id, session)
User.increase_number_of_comments(user_id, session)
session.commit()
def do_rude_post_reply(rdrama : RDramaAPIInterface, post_id, user_id, post_contents, is_chudded, is_pizzad, session):
rdrama.reply_to_post(post_id, get_rude_reply(post_contents, is_chudded, is_pizzad, session))
Post.register_post_reply(post_id, session)
Post.increment_replies(post_id, session)
User.increase_number_of_comments(user_id, session)
session.commit()
def can_reply_to_comment(parent_comment_id, user_id, post_id, session : Session, bulk = False) -> bool:
if (not bulk and parent_comment_id != None and Comment.get_conversation_depth(parent_comment_id, session) > ALLOWED_CONVERSATION_DEPTH):
print(f"Cannot reply to comment. Conversation is too deep. Depth = {Comment.get_conversation_depth(parent_comment_id, session)}")
return False
elif (Post.get_number_of_replies(post_id, session) > ALLOWED_COMMENTS_PER_POST):
print(f"Cannot reply to comment. Post has too many bbbb replies.")
return False
elif (User.get_number_of_comments(user_id, session) > ALLOWED_COMMENTS_PER_USER_PER_DAY):
print(f"Cannot reply to comment. User has replied too many times since last refresh.")
return False
else:
return True
def can_reply_to_post(post_id, user_id, session : Session) -> bool:
if (Post.get_number_of_replies(post_id, session) > ALLOWED_COMMENTS_PER_POST):
print(f"Cannot reply to post. Post has too many replies.")
return False
elif (User.get_number_of_comments(user_id, session) > ALLOWED_COMMENTS_PER_USER_PER_DAY):
print(f"Cannot reply to post. User has replied too many times since last refresh.")
return False
elif (Post.has_replied_to_post(post_id, session)):
print(f"Cannot reply to post. We have already replied to the post.")
return False
else:
return True
def handle_comment_ping(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session):
print("I got pinged.")
comment_info = rdrama.get_comment(notification['id'])
if comment_info['level'] != 1:
parent_comment_info = rdrama.get_comment(comment_info['parent_comment_id'])
if not (parent_comment_info['is_bot'] or parent_comment_info['author_name'] == BBBB_USERNAME or parent_comment_info['author_name'] == "👻"):
print("I can reply to the parent comment.")
comment_to_reply_to = strip_markdown(parent_comment_info['body'])
comment_id = parent_comment_info['id']
comment_post_id = notification['post_id']
comment_user_id = notification['user_id']
Comment.has_replied_to_comment(parent_comment_info['id'], session)
parent_comment_id = None if 'parent_comment_id' not in parent_comment_info else parent_comment_info['parent_comment_id']
if (can_reply_to_comment(parent_comment_id, comment_post_id, comment_user_id, session) and Comment.get_comment(parent_comment_info['id'], session) == None):
depth = 0 if parent_comment_id == None else Comment.get_conversation_depth(parent_comment_info['parent_comment_id'], session)
do_rude_reply(rdrama, comment_id, comment_post_id, comment_user_id, comment_to_reply_to, is_chudded, is_pizzad, session, depth = depth+1)
else:
print("I already replied to this comment.")
else:
print("I cannot reply to the parent comment.")
else:
#It's a post
print("The parent is a post.")
post_id = comment_info['post_id']
post = rdrama.get_post(post_id)
post_body = post['body']
post_user_id = post['author']['id']
if (can_reply_to_post(post_id, post_user_id, session)):
do_rude_post_reply(rdrama, post_id, post_user_id, strip_markdown(post_body), is_chudded, is_pizzad, session)
def handle_post_ping(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session):
print("I was pinged in a post.")
if not Post.has_replied_to_post(notification['post_id'], session):
post_id = notification['post_id']
#comment_id_blacklist.insert(0, post_id) #TODO: Add record of this.
post = rdrama.get_post(post_id)
post_body = post['body']
post_user_id = post['author']['id']
do_rude_post_reply(rdrama, post_id, post_user_id, strip_markdown(post_body), is_chudded, is_pizzad, session)
def handle_comment_mention(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session):
comment_to_reply_to = strip_markdown(notification['message'])
#comment_id_blacklist.insert(0, notification['id']) #TODO: Record keeping
depth = Comment.get_conversation_depth(notification['parent_comment_id'], session)
print(f"Someone replied to me / pinged me. Depth = {depth}")
if (can_reply_to_comment(notification['parent_comment_id'], notification['user_id'], notification['post_id'], session) and Comment.get_comment(notification['id'], session) == None):
print("I can reply, the depth is not too much.")
do_rude_reply(rdrama, notification['id'], notification['post_id'], notification['user_id'], comment_to_reply_to, is_chudded, is_pizzad, session, depth=depth+1)
else:
return
if ("@bbbb" in notification['message']):
handle_comment_ping(rdrama, notification, is_chudded, is_pizzad, session)
def handle_direct_message(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session):
print(notification['message'])
message = notification['message']
if (notification['user_id'] == OPERATOR_ID):
if (len(message.split(" ")) == 1):
comment_id = int(message)
comment_info = rdrama.get_comment(comment_id)
pp.pprint(comment_info)
post_id = comment_info['post_id']
user_id = comment_info['user_id']
comment_to_reply_to = strip_markdown(comment_info['body'])
#comment_id_blacklist.insert(0, comment_id) #TODO: Add a record here.
do_rude_reply(rdrama, comment_id, post_id, user_id, comment_to_reply_to, is_chudded, is_pizzad, session)
print(F"DOING A REQUESTED RUDE REPLY TO {comment_id}")
def reply_to_notifications(rdrama : RDramaAPIInterface, is_chudded, is_pizzad, session):
unread = rdrama.get_parsed_notification()
for notification in unread:
try:
if (notification['type'] == 'direct_message'):
handle_direct_message(rdrama, notification, is_chudded, is_pizzad, session)
elif (notification['type'] == 'comment_mention'):
handle_comment_mention(rdrama, notification, is_chudded, is_pizzad, session)
elif (notification['type'] == 'post_mention'):
handle_post_ping(rdrama, notification, is_chudded, is_pizzad, session)
else:
print("Unhandled notification")
pp.pprint(notification)
except BaseException as e:
print(f"problem with with a notification :( {e}")
pp.pprint(notification)
traceback.print_exc()
def is_self(comment):
try:
if comment['author'] == "👻":
return True
return comment['author']['id'] == BBBB_ID
except Exception as e:
print(f"YIKES! This comment is a wierd one... {e}")
pp.pprint(comment)
return True
def reply_to_random_comment(rdrama : RDramaAPIInterface, is_chudded, is_pizzad, session):
comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data']
comments = [comment for comment in comments if not comment['is_bot']] #No bots
comments = [comment for comment in comments if not is_self(comment)] #Don't reply to self
comments = [comment for comment in comments if can_reply_to_comment(None, comment['author']['id'], comment['post_id'], session)]
comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None]
comments = [{
'body' : strip_markdown(comment['body']),
'id' : comment['id'],
'author_id': comment['author']['id'],
'parent_id' : comment['post_id']
} for comment in comments] #Normal form, easier to work with
comments = [comment for comment in comments if comment['body'] != ""]
comments = [comment for comment in comments if len(comment['body'].split(" ")) > 10] #Get rid of short sentences
comments = remove_duplicates(comments)
if len(comments) == 0:
print("No comments to reply to")
return
comment_to_reply_to = random.choice(comments)
do_rude_reply(rdrama, comment_to_reply_to['id'], comment_to_reply_to['parent_id'], comment_to_reply_to['author_id'], comment_to_reply_to['body'], is_chudded, is_pizzad, session)
pp.pprint(comment_to_reply_to)
def main_processing_task(rdrama : RDramaAPIInterface, session : Session):
is_chudded = False #Do we have the chud award?
can_communicate = True #Can we send any message at all?
is_pizzad = False
rdrama.get_front_page()
bbbb_information = rdrama.get_user_information(BBBB_ID)
print(f"coins: {bbbb_information['coins']} comments: {bbbb_information['comment_count']}")
for badge in bbbb_information['badges']:
if (badge['name'] == "Marsey Award"):
print("We have the marsey award. STOP.")
can_communicate = False
if (badge['name'] == "Chud"):
print("We have the CHUD award. CONTINUE")
is_chudded = True
if (badge['name'] == "Bird Site Award"):
print("We have the Bird Site Award. STOP.")
can_communicate = False
if (badge['name'] == "Pizzashill Award"):
print("We have the Pizzashill Award. CONTINUE.")
is_pizzad = True
if bbbb_information['is_banned']:
print("We are banned. STOP.")
can_communicate = False
if can_communicate:
try:
reply_to_notifications(rdrama, is_chudded, is_pizzad, session)
except BaseException as e:
print(f"Problem replying to notifications :( {e} (Aevann probably fucked up the /unread endpoint lol)")
traceback.print_exc()
hours_since_noon = abs(datetime.now().hour-12)
odds = 0
if hours_since_noon > 9:
odds = MIDNIGHT_PROBABILITY_MULTIPLIER*hours_since_noon
elif hours_since_noon > 6:
odds = AFTERNOON_MORNING_MULTIPLIER *hours_since_noon
else:
odds = NOON_MULTIPLER * hours_since_noon
rolled_number = random.randrange(1,odds+2)
if (not TIME_NORMALIZATION_MODE or rolled_number == 1):
if (not TIME_NORMALIZATION_MODE):
print("TIME NORMALIZATION MODE IS OFF")
try:
reply_to_random_comment(rdrama, is_chudded, is_pizzad, session)
except BaseException as e:
print(f"Problem with replying to random comment :( {e}")
traceback.print_exc()
else:
print(f"Check failed. Odds = 1/{odds+1}. Rolled = {rolled_number}")
if __name__ == "__main__":
TEST_AUTH_TOKEN = "lNYlc-A77HDaAgUSbu6Px_T2I6QFEZR-P7sDtTZ4Lw-3S9LlETGijvVb-hysKUHgKKRwO3bE4eH1pPrmvBMgbY0C1XsAgaSHEaY2OyBdSEUWSxYIVp_1fGnkegdZDORZ" #todo - parameterize
print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ")
if TEST_MODE:
website = "localhost"
auth = TEST_AUTH_TOKEN
https = False
timeout = 1
BBBB_ID = 6
OPERATOR_ID = 9
ACTUALLY_CALL_OPEN_AI = False
else:
website = "rdrama.net"
with open(get_real_filename("rdrama_auth_token"), "r") as f:
auth = f.read()
https = True
timeout = 10
rdrama = RDramaAPIInterface(auth, website, timeout, https=https)
#Set up fail safe
def exitfunc():
print("*That's it, you're going in the retard squisher.*")
print("FAILSAFE FORCED SHUTDOWN", datetime.now())
os._exit(0)
timer = Timer(60*MINUTES_BEFORE_FORCED_SHUTDOWN, exitfunc)
timer.start()
db_filename = "bbbb_database.db"
engine = create_engine(f"sqlite:///{get_real_filename(db_filename)}")
Base.metadata.create_all(engine)
with Session(engine) as session:
main_processing_task(rdrama, session)
session.commit()
timer.cancel()