from difflib import SequenceMatcher import json from lib2to3.pytree import Base from numpy import roll import traceback import openai from RDramaAPIInterface import RDramaAPIInterface import pprint from os.path import exists, join, realpath, split import random from markdown import markdown import re from bs4 import BeautifulSoup from better_profanity import profanity import time from datetime import datetime from BBBB_Models import Base, Comment, OpenAIToken, Post, User from sqlalchemy.orm import Session from threading import Timer import os from sqlalchemy import create_engine TEST_MODE = False TIME_NORMALIZATION_MODE = True MAX_INPUT_TOKENS = 512 random.seed(time.time()) pp = pprint.PrettyPrinter() NAUGHTY_WORDS = [ "faggot","fag","nigger","kike","spic","gook"] SNARKY_MARSIES = [ 'marseysneed', 'marseyseethe', 'marseyeyeroll', 'marseycope', 'marseyl', 'marseybrainlet', 'marseymalding', 'soyquack', 'soymad', 'soycry', 'seethejak', ] SM_THRESHOLD = 0.6 BBBB_ID = 12125 BBBB_USERNAME = "bbbb" OPERATOR_ID = 3635 PAGES_TO_SCAN = 5 COMMENT_MEMORY = 100 ALLOWED_COMMENTS_PER_USER_PER_DAY = 10 ALLOWED_COMMENTS_PER_POST = 20 ALLOWED_CONVERSATION_DEPTH = 3 MINUTES_BEFORE_FORCED_SHUTDOWN = 10 ACTUALLY_CALL_OPEN_AI = True MIDNIGHT_PROBABILITY_MULTIPLIER = 4 AFTERNOON_MORNING_MULTIPLIER = 3 NOON_MULTIPLER = 2 profanity.load_censor_words(NAUGHTY_WORDS) def strip_markdown(markdown_string): markdown_string = re.sub(">.*\n", "", markdown_string) try: html = markdown(markdown_string) except AttributeError: html = markdown_string #if there is no markdown in the string you get an error soup = BeautifulSoup(html, "html.parser") text = ''.join(soup.findAll(text=True)) text = re.sub(r":[^ ]*:", "", text) #remove marseys text = re.sub(r"!blackjack[^ ]*", "", text) text = re.sub(r"fortune", "", text) text = re.sub(r"factcheck", "", text) text = re.sub(r"!slots", "", text) text = re.sub(r"([^\.])\n", r"\1. ", text) text = re.sub(r"(\.)\n", ". ", text) text = re.sub(r"\n", "", text) text = re.sub(r"http://[^ ]*", "", text) text = re.sub(r"https://[^ ]*", "", text) text = re.sub(r"\"", "'", text) text = remove_naughty_words(text) # make sure there are only letters in the string. if len(set(list(text.lower())).intersection(set(["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']))) == 0: text = "" text = re.sub("@?bbbb is", "you are", text, flags=re.IGNORECASE) text = re.sub("@?bbbb's", "your", text, flags=re.IGNORECASE) text = re.sub("@?bbbb", "you", text, flags=re.IGNORECASE) text = re.sub("@(.*?)\s", "", text) text = re.sub("!slots.*?\s", "", text) text = re.sub("(?i)trans lives matter", "", text) return text def normalize_for_quick_substring_detection(string): string = string.lower() string = re.sub(r"[!?>-]", ".", string) string = ''.join([letter for letter in string if letter in ["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z', '.']]) return string def quick_substring_detection(x, y): x_normalize = normalize_for_quick_substring_detection(x) y_normalize = normalize_for_quick_substring_detection(y) if (x_normalize in y_normalize): print(f"[QSD] entire string x \"({x_normalize})\" is in y \"({y_normalize})\"") return True for sentence in x_normalize.split("."): if len(sentence) < 20: continue if sentence == "": continue if sentence in y_normalize: print(f"[QSD] sentence \"{sentence}\" is in y (\"{y_normalize}\")") return True return False def remove_duplicates(list): return [json.loads(j) for j in set([json.dumps(i) for i in list])] def remove_naughty_words(string): return profanity.censor(string) def get_real_filename(filename : str): path_to_script = realpath(__file__) path_to_script_directory, _ = split(path_to_script) return join(path_to_script_directory, filename) with open(get_real_filename("open_ai_token"), "r") as f: openai.api_key = f.read() def rude_reply(comment, session): if ACTUALLY_CALL_OPEN_AI: response = OpenAIToken.call_open_ai(f"Write an abrasive reply to this comment: \"{comment}\"", session) else: print("NOT Calling OPENAI...") message = "foo"*random.randrange(1,20) response = { 'choices': [{ 'text': f"\n\n{message}" }] } return response def rude_reply_wrapper(comment, past_comments, session): print(f"[RRW] Getting reply for \"{comment}\"") if (len(comment.split(" ")) > MAX_INPUT_TOKENS): print("[RRW]") new_comment = "" for sentence in comment.split("."): if len(sentence) + len(new_comment) < MAX_INPUT_TOKENS: new_comment+=sentence else: break print(f"[RRW] Holy fuck that's a lot of words. New comment is \"{new_comment}\"") comment = new_comment valid_replies = [] short_reply = "" for i in range(5): reply = rude_reply(comment, session)['choices'][0]['text'] reply = reply.strip() print(f"[RRW] Trying \"{reply}\"") if reply[0] == "\"" and reply[-1] == "\"": print("[RRW] (Removing the outer parens)]") reply = reply[1:-1] original_reply = reply sm_ratio = SequenceMatcher(None, original_reply.lower(), comment.lower()).ratio() if sm_ratio > SM_THRESHOLD: print(f"[RRW] SM says too similar. ratio = {sm_ratio}") reply = "" if (quick_substring_detection(original_reply, comment)): print(f"[RRW] QSD says too similar.") reply = "" elif len(reply.split(" ")) < 5: print(f"[RRW] Too short") short_reply = reply #valid_replies.append(reply) continue else: print("[RRW] Under the original system, we would accept this answer.") valid_replies.append(reply) if (len(valid_replies) ==0 ): print("[RRW] No valid replies.") if (short_reply != ""): return short_reply return "" scored_replies = [] for reply in valid_replies: high_score = 0 high_scoring_string = "" for past_comment in past_comments: sm_ratio = SequenceMatcher(None, reply.lower(), past_comment.lower()).ratio() if sm_ratio > high_score: high_score = sm_ratio high_scoring_string = past_comment scored_replies.append((reply, high_score)) scored_replies.sort(key=lambda a : a[1]) print(f"[RRW] returning {scored_replies[0]}") return scored_replies[0][0] def get_conversation_depth(comment_id : int, rdrama : RDramaAPIInterface) -> int: depth = 0 was_last_bbbb = True last_comment_id = comment_id while True: comment = rdrama.get_comment(last_comment_id) if comment['level'] == 1: return depth if comment['author_name'] == BBBB_USERNAME: was_last_bbbb = True last_comment_id = comment['parent_comment_id'] depth += 1 elif not was_last_bbbb: return depth else: was_last_bbbb = False last_comment_id = comment['parent_comment_id'] def parse_tracking_file(filename, func): if not exists(filename): return [] to_return = [] with open(filename, "r") as f: for id in f.readlines(): to_return.append(func(id)) return to_return def parse_id_file(filename): return parse_tracking_file(filename, lambda a : int(a)) def parse_comments_file(filename): return parse_tracking_file(filename, lambda a : str(a).strip()) def write_tracking_file(filename, ids): if (len(ids) > COMMENT_MEMORY): ids = ids[0:COMMENT_MEMORY] with open(filename, "w+") as f: for id in ids: f.write(f"{id}\n") def get_rude_reply(comment_to_reply_to, is_chudded, is_pizzad, session): past_comments = Comment.get_past_comments(session) reply = "" if (comment_to_reply_to == ""): marsey = random.choice(SNARKY_MARSIES) reply = f":{marsey}:" else: reply = rude_reply_wrapper(comment_to_reply_to, past_comments, session) if reply == "": marsey = random.choice(SNARKY_MARSIES) reply = f":{marsey}:" else: past_comments = past_comments.insert(0, reply) if is_chudded: reply += "\n\nTRANS LIVES MATTER" if is_pizzad: if len(reply) < 280: reply += "\n\n" while len(reply) < 280: reply += 'pizza' return reply def do_rude_reply(rdrama, comment_id, post_id, user_id, comment_to_reply_to, is_chudded, is_pizzad, session : Session, depth = 0): rude_reply = get_rude_reply(comment_to_reply_to, is_chudded, is_pizzad, session) reply = rdrama.reply_to_comment_easy(comment_id, post_id, rude_reply) reply_id = reply['id'] Comment.create_new_comment(comment_id, reply_id, depth, rude_reply, session) Post.increment_replies(post_id, session) User.increase_number_of_comments(user_id, session) session.commit() def do_rude_post_reply(rdrama : RDramaAPIInterface, post_id, user_id, post_contents, is_chudded, is_pizzad, session): rdrama.reply_to_post(post_id, get_rude_reply(post_contents, is_chudded, is_pizzad, session)) Post.register_post_reply(post_id, session) Post.increment_replies(post_id, session) User.increase_number_of_comments(user_id, session) session.commit() def can_reply_to_comment(parent_comment_id, user_id, post_id, session : Session, bulk = False) -> bool: if (not bulk and parent_comment_id != None and Comment.get_conversation_depth(parent_comment_id, session) > ALLOWED_CONVERSATION_DEPTH): print(f"Cannot reply to comment. Conversation is too deep. Depth = {Comment.get_conversation_depth(parent_comment_id, session)}") return False elif (Post.get_number_of_replies(post_id, session) > ALLOWED_COMMENTS_PER_POST): print(f"Cannot reply to comment. Post has too many bbbb replies.") return False elif (User.get_number_of_comments(user_id, session) > ALLOWED_COMMENTS_PER_USER_PER_DAY): print(f"Cannot reply to comment. User has replied too many times since last refresh.") return False else: return True def can_reply_to_post(post_id, user_id, session : Session) -> bool: if (Post.get_number_of_replies(post_id, session) > ALLOWED_COMMENTS_PER_POST): print(f"Cannot reply to post. Post has too many replies.") return False elif (User.get_number_of_comments(user_id, session) > ALLOWED_COMMENTS_PER_USER_PER_DAY): print(f"Cannot reply to post. User has replied too many times since last refresh.") return False elif (Post.has_replied_to_post(post_id, session)): print(f"Cannot reply to post. We have already replied to the post.") return False else: return True def handle_comment_ping(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session): print("I got pinged.") comment_info = rdrama.get_comment(notification['id']) if comment_info['level'] != 1: parent_comment_info = rdrama.get_comment(comment_info['parent_comment_id']) if not (parent_comment_info['is_bot'] or parent_comment_info['author_name'] == BBBB_USERNAME or parent_comment_info['author_name'] == "👻"): print("I can reply to the parent comment.") comment_to_reply_to = strip_markdown(parent_comment_info['body']) comment_id = parent_comment_info['id'] comment_post_id = notification['post_id'] comment_user_id = notification['user_id'] Comment.has_replied_to_comment(parent_comment_info['id'], session) parent_comment_id = None if 'parent_comment_id' not in parent_comment_info else parent_comment_info['parent_comment_id'] if (can_reply_to_comment(parent_comment_id, comment_post_id, comment_user_id, session) and Comment.get_comment(parent_comment_info['id'], session) == None): depth = 0 if parent_comment_id == None else Comment.get_conversation_depth(parent_comment_info['parent_comment_id'], session) do_rude_reply(rdrama, comment_id, comment_post_id, comment_user_id, comment_to_reply_to, is_chudded, is_pizzad, session, depth = depth+1) else: print("I already replied to this comment.") else: print("I cannot reply to the parent comment.") else: #It's a post print("The parent is a post.") post_id = comment_info['post_id'] post = rdrama.get_post(post_id) post_body = post['body'] post_user_id = post['author']['id'] if (can_reply_to_post(post_id, post_user_id, session)): do_rude_post_reply(rdrama, post_id, post_user_id, strip_markdown(post_body), is_chudded, is_pizzad, session) def handle_post_ping(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session): print("I was pinged in a post.") if not Post.has_replied_to_post(notification['post_id'], session): post_id = notification['post_id'] #comment_id_blacklist.insert(0, post_id) #TODO: Add record of this. post = rdrama.get_post(post_id) post_body = post['body'] post_user_id = post['author']['id'] do_rude_post_reply(rdrama, post_id, post_user_id, strip_markdown(post_body), is_chudded, is_pizzad, session) def handle_comment_mention(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session): comment_to_reply_to = strip_markdown(notification['message']) #comment_id_blacklist.insert(0, notification['id']) #TODO: Record keeping depth = Comment.get_conversation_depth(notification['parent_comment_id'], session) print(f"Someone replied to me / pinged me. Depth = {depth}") if (can_reply_to_comment(notification['parent_comment_id'], notification['user_id'], notification['post_id'], session) and Comment.get_comment(notification['id'], session) == None): print("I can reply, the depth is not too much.") do_rude_reply(rdrama, notification['id'], notification['post_id'], notification['user_id'], comment_to_reply_to, is_chudded, is_pizzad, session, depth=depth+1) else: return if ("@bbbb" in notification['message']): handle_comment_ping(rdrama, notification, is_chudded, is_pizzad, session) def handle_direct_message(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session): print(notification['message']) message = notification['message'] if (notification['user_id'] == OPERATOR_ID): if (len(message.split(" ")) == 1): comment_id = int(message) comment_info = rdrama.get_comment(comment_id) pp.pprint(comment_info) post_id = comment_info['post_id'] user_id = comment_info['user_id'] comment_to_reply_to = strip_markdown(comment_info['body']) #comment_id_blacklist.insert(0, comment_id) #TODO: Add a record here. do_rude_reply(rdrama, comment_id, post_id, user_id, comment_to_reply_to, is_chudded, is_pizzad, session) print(F"DOING A REQUESTED RUDE REPLY TO {comment_id}") def reply_to_notifications(rdrama : RDramaAPIInterface, is_chudded, is_pizzad, session): unread = rdrama.get_parsed_notification() for notification in unread: try: if (notification['type'] == 'direct_message'): handle_direct_message(rdrama, notification, is_chudded, is_pizzad, session) elif (notification['type'] == 'comment_mention'): handle_comment_mention(rdrama, notification, is_chudded, is_pizzad, session) elif (notification['type'] == 'post_mention'): handle_post_ping(rdrama, notification, is_chudded, is_pizzad, session) else: print("Unhandled notification") pp.pprint(notification) except BaseException as e: print(f"problem with with a notification :( {e}") pp.pprint(notification) traceback.print_exc() def is_self(comment): try: if comment['author'] == "👻": return True return comment['author']['id'] == BBBB_ID except Exception as e: print(f"YIKES! This comment is a wierd one... {e}") pp.pprint(comment) return True def reply_to_random_comment(rdrama : RDramaAPIInterface, is_chudded, is_pizzad, session): comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data'] comments = [comment for comment in comments if not comment['is_bot']] #No bots comments = [comment for comment in comments if not is_self(comment)] #Don't reply to self comments = [comment for comment in comments if can_reply_to_comment(None, comment['author']['id'], comment['post_id'], session)] comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] comments = [{ 'body' : strip_markdown(comment['body']), 'id' : comment['id'], 'author_id': comment['author']['id'], 'parent_id' : comment['post_id'] } for comment in comments] #Normal form, easier to work with comments = [comment for comment in comments if comment['body'] != ""] comments = [comment for comment in comments if len(comment['body'].split(" ")) > 10] #Get rid of short sentences comments = remove_duplicates(comments) if len(comments) == 0: print("No comments to reply to") return comment_to_reply_to = random.choice(comments) do_rude_reply(rdrama, comment_to_reply_to['id'], comment_to_reply_to['parent_id'], comment_to_reply_to['author_id'], comment_to_reply_to['body'], is_chudded, is_pizzad, session) pp.pprint(comment_to_reply_to) def main_processing_task(rdrama : RDramaAPIInterface, session : Session): is_chudded = False #Do we have the chud award? can_communicate = True #Can we send any message at all? is_pizzad = False rdrama.get_front_page() bbbb_information = rdrama.get_user_information(BBBB_ID) print(f"coins: {bbbb_information['coins']} comments: {bbbb_information['comment_count']}") for badge in bbbb_information['badges']: if (badge['name'] == "Marsey Award"): print("We have the marsey award. STOP.") can_communicate = False if (badge['name'] == "Chud"): print("We have the CHUD award. CONTINUE") is_chudded = True if (badge['name'] == "Bird Site Award"): print("We have the Bird Site Award. STOP.") can_communicate = False if (badge['name'] == "Pizzashill Award"): print("We have the Pizzashill Award. CONTINUE.") is_pizzad = True if bbbb_information['is_banned']: print("We are banned. STOP.") can_communicate = False if can_communicate: try: reply_to_notifications(rdrama, is_chudded, is_pizzad, session) except BaseException as e: print(f"Problem replying to notifications :( {e} (Aevann probably fucked up the /unread endpoint lol)") traceback.print_exc() hours_since_noon = abs(datetime.now().hour-12) odds = 0 if hours_since_noon > 9: odds = MIDNIGHT_PROBABILITY_MULTIPLIER*hours_since_noon elif hours_since_noon > 6: odds = AFTERNOON_MORNING_MULTIPLIER *hours_since_noon else: odds = NOON_MULTIPLER * hours_since_noon rolled_number = random.randrange(1,odds+2) if (not TIME_NORMALIZATION_MODE or rolled_number == 1): if (not TIME_NORMALIZATION_MODE): print("TIME NORMALIZATION MODE IS OFF") try: reply_to_random_comment(rdrama, is_chudded, is_pizzad, session) except BaseException as e: print(f"Problem with replying to random comment :( {e}") traceback.print_exc() else: print(f"Check failed. Odds = 1/{odds+1}. Rolled = {rolled_number}") if __name__ == "__main__": TEST_AUTH_TOKEN = "lNYlc-A77HDaAgUSbu6Px_T2I6QFEZR-P7sDtTZ4Lw-3S9LlETGijvVb-hysKUHgKKRwO3bE4eH1pPrmvBMgbY0C1XsAgaSHEaY2OyBdSEUWSxYIVp_1fGnkegdZDORZ" #todo - parameterize print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ") if TEST_MODE: website = "localhost" auth = TEST_AUTH_TOKEN https = False timeout = 1 BBBB_ID = 6 OPERATOR_ID = 9 ACTUALLY_CALL_OPEN_AI = False else: website = "rdrama.net" with open(get_real_filename("rdrama_auth_token"), "r") as f: auth = f.read() https = True timeout = 10 rdrama = RDramaAPIInterface(auth, website, timeout, https=https) #Set up fail safe def exitfunc(): print("*That's it, you're going in the retard squisher.*") print("FAILSAFE FORCED SHUTDOWN", datetime.now()) os._exit(0) timer = Timer(60*MINUTES_BEFORE_FORCED_SHUTDOWN, exitfunc) timer.start() db_filename = "bbbb_database.db" engine = create_engine(f"sqlite:///{get_real_filename(db_filename)}") Base.metadata.create_all(engine) with Session(engine) as session: main_processing_task(rdrama, session) session.commit() timer.cancel()