import base64 import io import json import re from typing import Callable, TypeVar import meme_generator from meme_generator import WebcomicPanel, OneCharacterWebcomicPanel, TwoCharacterWebcomicPanel, TitleCardWebcomicPanel, add_watermark, create_webcomic from RDramaAPIInterface import RDramaAPIInterface from datetime import datetime from os.path import exists, join, realpath, split from threading import Timer from BotModels import Base, Comment, Post, User from sqlalchemy import create_engine from sqlalchemy.orm import Session import os from markdown import markdown from bs4 import BeautifulSoup from utils import get_real_filename TEST_MODE = True TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4" MINUTES_BEFORE_FORCED_SHUTDOWN = 10 DB_FILENAME = "automeme_database.db" PAGES_TO_SCAN = 5 AUTOMEME_ID = 13427 ALLOWED_COMMENTS_PER_POST = 20 ALLOWED_COMMENTS_PER_USER_PER_DAY = 20 EMOJI_REGEX = r":[^ ]*:" IMAGE_REGEX = r"!\[\]\(/images/([1234567890]*)\.webp\)" PARSED_IMAGE_REGEX = r"IMAGE:/images/[1234567890]*\.webp" INJECTABLE_IMAGE_REGEX = r"IMAGE:/images/\1\.webp" # rdrama = RDramaAPIInterface(TEST_AUTH_TOKEN, "localhost", sleep=5, https=False) # print(open('emoji_cache/klanjak.webp', 'rb')) # image = meme_generator.create_classic_meme_from_emoji("marseyannoyed", "THAT FACE WHEN YOU", "FORGET TO TAKE OUT THE TRASH") # output = io.BytesIO() # image.save(output, format="webp") # file = {'file': ('based.webp', output.getvalue(), 'image/webp')} # rdrama.reply_to_comment_easy(175, 1, "assddsfssdssdsd", file=file) def comment_with_image(message, image, comment_id, post_id): file = {'file': ('based.webp', image.get_binary(), 'image/webp')} return rdrama.reply_to_comment_easy(comment_id, post_id, message, file=file)['id'] class TextLine: def __init__(self, string): self.line = text_elements(string) @property def text(self) -> str: text = [i.text for i in self.line if isinstance(i, Text)] return " ".join(text) @property def captions(self) -> 'list[Text]': return [i for i in self.line if isinstance(i, Text)] @property def images(self) -> 'list[Image]': return [i for i in self.line if isinstance(i, Image)] @property def emojis(self) -> 'list[Emoji]': return [i for i in self.line if isinstance(i, Emoji)] @property def is_dialogue_line(self): return len(self.emojis) == 1 and len(self.captions) == 1 @property def is_argument_line(self): return len(self.emojis) == 2 and (len(self.captions) == 2 or len(self.captions) == 1) @property def is_pure_text_line(self): return len(self.emojis) == 0 and len(self.images) == 0 @property def is_big_marsey_line(self): return len(self.emojis) == 1 and self.emojis[0].big and len(self.captions) == 0 @property def is_image_line(self): return len(self.images) == 1 and len(self.captions) == 0 class TextElement(): pass class Text(TextElement): def __init__(self, text): self.text = text def __repr__(self) -> str: return f"Text({self.text})" class Image(TextElement): def __init__(self, link): self.link = link def __repr__(self) -> str: return f"Link({self.link})" class Emoji(TextElement): def __init__(self, emoji, big): self.emoji = emoji self.big = big def __repr__(self) -> str: return f"Emoji({self.emoji}, big={self.big})" def get_text_only(text_elements : list[TextElement]) -> str: text = [i.text for i in text_elements if isinstance(i, Text)] return " ".join(text) def text_elements(string : str): FULL_REGEX = rf"({EMOJI_REGEX})|({PARSED_IMAGE_REGEX})" elements = re.split(FULL_REGEX, string) to_return = [] for element in elements: if element == None: continue if element.strip() == "": continue if re.match(EMOJI_REGEX, element): if "#" in element: big = True element = element.replace("#","") else: big = False element = element.strip(":") to_return.append(Emoji(element, big)) elif re.match(PARSED_IMAGE_REGEX, element): to_return.append(Image(element.strip()[6:])) else: to_return.append(Text(element.strip())) return to_return def strip_markdown(markdown_string): markdown_string = re.sub(IMAGE_REGEX, INJECTABLE_IMAGE_REGEX, markdown_string) markdown_string = re.sub(">.*\n", "", markdown_string) try: html = markdown(markdown_string) except AttributeError: html = markdown_string #if there is no markdown in the string you get an error soup = BeautifulSoup(html, "html.parser") text = ''.join(soup.findAll(text=True)) text = re.sub(r"!blackjack[^ ]*", "", text) text = re.sub(r"fortune", "", text) text = re.sub(r"factcheck", "", text) text = re.sub("!slots.*?\s", "", text) text = re.sub(r"\"", "'", text) # make sure there are only letters in the string. if len(set(list(text.lower())).intersection(set(["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']))) == 0: return "" text = re.sub("@(.*?)\s", "", text) text = re.sub("!slots.*?\s", "", text) text = re.sub("(?i)trans lives matter", "", text) return text def remove_duplicates(list): return [json.loads(j) for j in set([json.dumps(i) for i in list])] def get_eligible_comments(rdrama : RDramaAPIInterface, session : Session): comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data'] comments = [comment for comment in comments if not comment['is_bot']] #No bots comments = [comment for comment in comments if not comment['author']['id'] == AUTOMEME_ID] #Don't reply to self comments = [comment for comment in comments if Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST] #Don't spam posts comments = [comment for comment in comments if User.get_number_of_comments(comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] #Double check that we haven't replied to the comment comments = remove_duplicates(comments) #Remove the duplicates return comments T = TypeVar('T') def lambda_count(list : list[T], predicate : 'Callable[[T], bool]' ): return sum(1 for i in list if predicate(i)) def get_full_rdrama_image_url(partial_url) -> str: if (TEST_MODE): return f"http://localhost{partial_url}" else: return f"https://rdrama.net{partial_url}" def main_processing_task(rdrama : RDramaAPIInterface, session : Session): is_chudded = False #Do we have the chud award? can_communicate = True #Can we send any message at all? is_pizzad = False rdrama.get_front_page() automeme_information = rdrama.get_user_information(AUTOMEME_ID) print(f"coins: {automeme_information['coins']} comments: {automeme_information['comment_count']}") for badge in automeme_information['badges']: if (badge['name'] == "Marsey Award"): print("We have the marsey award. STOP.") can_communicate = False if (badge['name'] == "Chud"): print("We have the CHUD award. CONTINUE") is_chudded = True if (badge['name'] == "Bird Site Award"): print("We have the Bird Site Award. STOP.") can_communicate = False if (badge['name'] == "Pizzashill Award"): print("We have the Pizzashill Award. CONTINUE.") is_pizzad = True if automeme_information['is_banned']: print("We are banned. STOP.") can_communicate = False if can_communicate: eligible_comments = get_eligible_comments(rdrama, session) for eligible_comment in eligible_comments: begin = datetime.now() under_post_limit = Post.get_number_of_replies(eligible_comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST under_user_limit = User.get_number_of_comments(eligible_comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY has_not_replied_to_comment = Comment.get_comment(eligible_comment['id'], session) is None if (not (under_post_limit and under_user_limit and has_not_replied_to_comment)): continue comment_text = eligible_comment['body'] cleaned_comment_text = strip_markdown(comment_text) comment_lines = cleaned_comment_text.split("\n") comment_lines = [comment_line for comment_line in comment_lines if comment_line != ""] element_lines = [TextLine(line) for line in comment_lines] argument_lines_count, dialog_lines_count, text_lines_count, big_marsey_lines_count = 0,0,0,0 dialog_lines = list(filter(lambda a : a.is_dialogue_line, element_lines)) argument_lines = list(filter(lambda a : a.is_argument_line, element_lines)) pure_text_lines = list(filter(lambda a : a.is_pure_text_line, element_lines)) big_marsey_lines = list(filter(lambda a : a.is_big_marsey_line, element_lines)) image_lines = list(filter(lambda a : a.is_image_line, element_lines)) argument_lines_count = len(argument_lines) dialog_lines_count = len(dialog_lines) pure_text_lines_count = len(pure_text_lines) big_marsey_lines_count = len(big_marsey_lines) image_lines_count = len(image_lines) image = None if (dialog_lines_count == 2): #Soy vs Chad line1 = dialog_lines[0] line2 = dialog_lines[1] emoji1 = line1.emojis[0].emoji emoji2 = line2.emojis[0].emoji caption1 = line1.text caption2 = line2.text image = meme_generator.create_soy_vs_chad_meme(emoji1, emoji2, caption1, caption2) elif (big_marsey_lines_count == 1 and pure_text_lines_count == 1): # Modern Meme with Marsey text_line = pure_text_lines[0] marsey_line = big_marsey_lines[0] marsey = marsey_line.emojis[0].emoji caption = text_line.text image = meme_generator.create_modern_meme_from_emoji(marsey, caption) elif (image_lines_count == 1 and pure_text_lines_count == 1): # Modern Meme with Image text_line = pure_text_lines[0] image_line = image_lines[0] image = image_line.images[0].link full_image_url = get_full_rdrama_image_url(image) caption = text_line.text image = meme_generator.create_modern_meme_from_url(full_image_url, caption) elif (big_marsey_lines_count == 1 and pure_text_lines_count == 2): # Classic Meme with big marsey top_text_line = pure_text_lines[0] bottom_text_line = pure_text_lines[1] marsey_line = big_marsey_lines[0] emoji = marsey_line.emojis[0].emoji top_caption = top_text_line.text bottom_caption = bottom_text_line.text image = meme_generator.create_classic_meme_from_emoji(emoji, top_caption, bottom_caption) elif (image_lines_count == 1 and pure_text_lines_count == 2): # Classic Meme with Image top_text_line = pure_text_lines[0] bottom_text_line = pure_text_lines[1] image_line = image_lines[0] image = image_line.images[0].link full_image_url = get_full_rdrama_image_url(image) top_caption = top_text_line.text bottom_caption = bottom_text_line.text image = meme_generator.create_classic_meme_from_url(full_image_url, top_caption, bottom_caption) elif (argument_lines_count >= 1 or dialog_lines_count >= 1): panels : 'list[WebcomicPanel]' = [] for element_line in element_lines: if element_line.is_dialogue_line: caption = element_line.text emoji = element_line.emojis[0].emoji if len(caption) > 100: in_background = True else: in_background = False oneCharacterWebcomicPanel = OneCharacterWebcomicPanel(emoji, caption, in_background) panels.append(oneCharacterWebcomicPanel) elif element_line.is_argument_line: left_caption = element_line.captions[0].text if len(element_line.captions) == 2: right_caption = element_line.captions[1].text else: right_caption = "" left_emoji = element_line.emojis[0].emoji right_emoji = element_line.emojis[1].emoji twoCharacterWebcomicPanel = TwoCharacterWebcomicPanel(left_emoji, left_caption, right_emoji, right_caption) panels.append(twoCharacterWebcomicPanel) elif element_line.is_pure_text_line: panels.append(TitleCardWebcomicPanel(element_line.text)) image = create_webcomic(panels) if image != None: image = add_watermark(image, eligible_comment['author']['username']) user_id = eligible_comment['author']['id'] parent_comment_id = eligible_comment['id'] post_id = eligible_comment['post_id'] automeme_comment_id = comment_with_image("yo got a meme for ya nigga", image, eligible_comment['id'], eligible_comment['post_id']) Comment.create_new_comment(parent_comment_id, automeme_comment_id, session) Post.increment_replies(post_id, session) User.increase_number_of_comments(user_id, session) end = datetime.now() print(end-begin) if __name__ == "__main__": TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4" print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ") if TEST_MODE: website = "localhost" auth = TEST_AUTH_TOKEN https = False timeout = 1 AUTOMEME_ID = 7 OPERATOR_ID = 9 ACTUALLY_CALL_OPEN_AI = False else: website = "rdrama.net" with open(get_real_filename("rdrama_auth_token"), "r") as f: auth = f.read() https = True timeout = 10 rdrama = RDramaAPIInterface(auth, website, timeout, https=https) #Set up fail safe def exitfunc(): print("*That's it, you're going in the retard squisher.*") print("FAILSAFE FORCED SHUTDOWN", datetime.now()) os._exit(0) timer = Timer(60*MINUTES_BEFORE_FORCED_SHUTDOWN, exitfunc) timer.start() engine = create_engine(f"sqlite:///{get_real_filename(DB_FILENAME)}") Base.metadata.create_all(engine) with Session(engine) as session: main_processing_task(rdrama, session) session.commit() timer.cancel()