diff --git a/BotModels.py b/BotModels.py index a79b1c6..81337d7 100644 --- a/BotModels.py +++ b/BotModels.py @@ -1,5 +1,6 @@ from datetime import datetime from email.policy import default +from sqlite3 import Date import time import openai import sqlalchemy @@ -110,3 +111,33 @@ class Post(Base): def register_post_reply(post_id : int, session : Session): Post.get_post(post_id, session).has_replied = True + +''' +Just for keeping a log of all comments we encounter +''' +class CommentString(Base): + __tablename__ = "comment_string" + id = Column(Integer, primary_key = True) + user_id = Column(Integer) + time = Column(DateTime) + text = Column(String) + + def add_comment(comment_id : int, user_id : int, time : datetime, text: str, session : Session): + # Check if it exists + + stmt = sqlalchemy.select(CommentString).where(CommentString.id == comment_id) + comment = session.execute(stmt).scalar_one_or_none() + if (comment == None): + comment = CommentString( + id = comment_id, + user_id = user_id, + time = time, + text = text + ) + session.add(comment) + +''' +Mostly for keeping track of how long it's been since the last run, so we can run things without worrying about set number of pages +''' +class ScriptCalls(Base): + pass \ No newline at end of file diff --git a/automeme.py b/automeme.py index b6ec54c..d4bd260 100644 --- a/automeme.py +++ b/automeme.py @@ -10,7 +10,7 @@ from RDramaAPIInterface import RDramaAPIInterface from datetime import datetime from os.path import exists, join, realpath, split from threading import Timer -from BotModels import Base, Comment, Post, User +from BotModels import Base, Comment, CommentString, Post, User from sqlalchemy import create_engine from sqlalchemy.orm import Session import os @@ -19,7 +19,7 @@ from bs4 import BeautifulSoup from utils import get_real_filename TEST_MODE = True -DRY_MODE = True +DRY_MODE = False TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4" MINUTES_BEFORE_FORCED_SHUTDOWN = 10 DB_FILENAME = "automeme_database.db" @@ -36,7 +36,10 @@ CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE = 1.0 #0.5 CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE = 1.0 WEBCOMIC_TRIGGER_CHANCE = 1.0 -EMOJI_REGEX = r":[^ ]*:" +FREE_POSTS = [6] +RESTRICTED_POSTS = [5, 16583, 75878, 35835] + +EMOJI_REGEX = r":[#!a-zA-Z0-9]*:" IMAGE_REGEX = r"!\[\]\(/images/([1234567890]*)\.webp\)" GIF_REGEX = r"https:\/\/media\.giphy\.com\/media\/([a-zA-Z0-9]*)\/giphy\.webp" PARSED_IMAGE_REGEX = r"IMAGE:/images/[1234567890]*\.webp" @@ -80,11 +83,11 @@ class TextLine: @property def is_dialogue_line(self): - return len(self.emojis) == 1 and len(self.captions) == 1 + return len(self.emojis) == 1 and len(self.captions) == 1 and isinstance(self.line[0], Emoji) @property def is_argument_line(self): - return len(self.emojis) == 2 and (len(self.captions) == 2 or len(self.captions) == 1) + return len(self.emojis) == 2 and (len(self.captions) == 2 or len(self.captions) == 1) and isinstance(self.line[0], Emoji) and isinstance(self.line[2], Emoji) @property def is_pure_text_line(self): @@ -194,11 +197,20 @@ def remove_duplicates(list): def get_eligible_comments(rdrama : RDramaAPIInterface, session : Session): comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data'] comments = [comment for comment in comments if not comment['is_bot']] #No bots - comments = [comment for comment in comments if not comment['author']['id'] == AUTOMEME_ID] #Don't reply to self + comments = [comment for comment in comments if comment['author'] == '👻' or not comment['author']['id'] == AUTOMEME_ID] #Don't reply to self comments = [comment for comment in comments if Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST] #Don't spam posts - comments = [comment for comment in comments if User.get_number_of_comments(comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users + comments = [comment for comment in comments if User.get_number_of_comments(0 if comment['author'] == '👻' else comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] #Double check that we haven't replied to the comment comments = remove_duplicates(comments) #Remove the duplicates + + for comment in comments: + user_id = 0 if comment['author'] == '👻' else comment['author']['id'] + comment_id = comment['id'] + text = comment['body'] + time = datetime.fromtimestamp(comment['created_utc']) + + CommentString.add_comment(comment_id, user_id, time, text, session) + return comments T = TypeVar('T') @@ -263,10 +275,10 @@ def main_processing_task(rdrama : RDramaAPIInterface, session : Session): for eligible_comment in eligible_comments: begin = datetime.now() under_post_limit = Post.get_number_of_replies(eligible_comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST - under_user_limit = User.get_number_of_comments(eligible_comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY + under_user_limit = User.get_number_of_comments(0 if eligible_comment['author'] == '👻' else eligible_comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY has_not_replied_to_comment = Comment.get_comment(eligible_comment['id'], session) is None - - if (not (under_post_limit and under_user_limit and has_not_replied_to_comment)): + is_not_restricted_post = eligible_comment['post_id'] not in RESTRICTED_POSTS + if (not (under_post_limit and under_user_limit and has_not_replied_to_comment and is_not_restricted_post)): continue comment_text = eligible_comment['body'] @@ -298,7 +310,6 @@ def main_processing_task(rdrama : RDramaAPIInterface, session : Session): image_lines_count = len(image_lines) image = None - print(f"ROLLED = {random_float}") if (dialog_lines_count == 2): print(f"[{eligible_comment['id']}] SOY_VS_CHAD") if (random_float <= SOY_VS_CHAD_TRIGGER_CHANGE): @@ -363,7 +374,7 @@ def main_processing_task(rdrama : RDramaAPIInterface, session : Session): bottom_caption = bottom_text_line.text image = meme_generator.create_classic_meme_from_url(full_image_url, top_caption, bottom_caption) - elif (argument_lines_count >= 1 or dialog_lines_count >= 1): + elif (argument_lines_count + dialog_lines_count >= 2): print(f"[{eligible_comment['id']}] WEBCOMIC") if (random_float <= WEBCOMIC_TRIGGER_CHANCE): panels : 'list[WebcomicPanel]' = [] @@ -395,7 +406,7 @@ def main_processing_task(rdrama : RDramaAPIInterface, session : Session): if image != None: print(f"[{eligible_comment['id']}] posting...") image = add_watermark(image, eligible_comment['author']['username']) - user_id = eligible_comment['author']['id'] + user_id = 0 if eligible_comment['author'] == '👻' else eligible_comment['author']['id'] parent_comment_id = eligible_comment['id'] post_id = eligible_comment['post_id'] message = create_comment_message(is_chudded, is_pizzad, is_birdsite, is_marseyed) @@ -404,9 +415,13 @@ def main_processing_task(rdrama : RDramaAPIInterface, session : Session): else: automeme_comment_id = None image.save(f"dry/{eligible_comment['id']}.webp") + Comment.create_new_comment(parent_comment_id, automeme_comment_id, session) - Post.increment_replies(post_id, session) - User.increase_number_of_comments(user_id, session) + if post_id not in FREE_POSTS: + Post.increment_replies(post_id, session) + User.increase_number_of_comments(user_id, session) + else: + print(f"[{eligible_comment['id']}] is a free post.") end = datetime.now() print(end-begin) else: diff --git a/meme_generator.py b/meme_generator.py index fee9b31..4b9ef09 100644 --- a/meme_generator.py +++ b/meme_generator.py @@ -15,8 +15,6 @@ from random import choice HIGHLIGHT_MODE = False CAPTION_FILENAME = "watermark_captions.txt" - - class ColorScheme: BLACK = 0 WHITE_WITH_BLACK_BORDER = 1 @@ -347,22 +345,24 @@ def get_emoji_from_rdrama(emoji_name) -> 'AnimatedImage': cleaned_emoji_name = cleaned_emoji_name.replace("!", "") should_flip = True - if (exists(f"emoji_cache/{cleaned_emoji_name}.webp")): + if (exists(get_real_filename(f"emoji_cache/{cleaned_emoji_name}.webp"))): image = AnimatedImage.from_image(Image.open(f"emoji_cache/{cleaned_emoji_name}.webp")) else: image = get_image_file_from_url(f"https://www.rdrama.net/e/{cleaned_emoji_name}.webp") - image.save(f"emoji_cache/{cleaned_emoji_name}.webp") + image.save(get_real_filename(f"emoji_cache/{cleaned_emoji_name}.webp")) if should_flip: image = image.flip() return image def get_image_file_from_url(url) -> 'AnimatedImage': - r = requests.get(url) - image_file = io.BytesIO(r.content) - im = Image.open(image_file) - return AnimatedImage.from_image(im) - + try: + r = requests.get(url) + image_file = io.BytesIO(r.content) + im = Image.open(image_file) + return AnimatedImage.from_image(im) + except: + print(f"ERROR GETTING FILE FROM {url}") def parse_caption_file(filename): if not exists(filename): return [] @@ -372,7 +372,7 @@ def parse_caption_file(filename): to_return.append(id.strip()) return to_return -watermark_captions = parse_caption_file(CAPTION_FILENAME) +watermark_captions = parse_caption_file(get_real_filename(CAPTION_FILENAME)) class AnimatedImage(): def __init__(self, frames : list[Image.Image]):