automeme/automeme.py

370 lines
15 KiB
Python

import base64
import io
import json
import re
from typing import Callable, TypeVar
import meme_generator
from meme_generator import WebcomicPanel, OneCharacterWebcomicPanel, TwoCharacterWebcomicPanel, TitleCardWebcomicPanel, add_watermark, create_webcomic
from RDramaAPIInterface import RDramaAPIInterface
from datetime import datetime
from os.path import exists, join, realpath, split
from threading import Timer
from BotModels import Base, Comment, Post, User
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import os
from markdown import markdown
from bs4 import BeautifulSoup
from utils import get_real_filename
TEST_MODE = True
TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4"
MINUTES_BEFORE_FORCED_SHUTDOWN = 10
DB_FILENAME = "automeme_database.db"
PAGES_TO_SCAN = 5
AUTOMEME_ID = 13427
ALLOWED_COMMENTS_PER_POST = 20
ALLOWED_COMMENTS_PER_USER_PER_DAY = 20
EMOJI_REGEX = r":[^ ]*:"
IMAGE_REGEX = r"!\[\]\(/images/([1234567890]*)\.webp\)"
PARSED_IMAGE_REGEX = r"IMAGE:/images/[1234567890]*\.webp"
INJECTABLE_IMAGE_REGEX = r"IMAGE:/images/\1\.webp"
# rdrama = RDramaAPIInterface(TEST_AUTH_TOKEN, "localhost", sleep=5, https=False)
# print(open('emoji_cache/klanjak.webp', 'rb'))
# image = meme_generator.create_classic_meme_from_emoji("marseyannoyed", "THAT FACE WHEN YOU", "FORGET TO TAKE OUT THE TRASH")
# output = io.BytesIO()
# image.save(output, format="webp")
# file = {'file': ('based.webp', output.getvalue(), 'image/webp')}
# rdrama.reply_to_comment_easy(175, 1, "assddsfssdssdsd", file=file)
def comment_with_image(message, image, comment_id, post_id):
output = io.BytesIO()
image.save(output, format="webp")
file = {'file': ('based.webp', output.getvalue(), 'image/webp')}
return rdrama.reply_to_comment_easy(comment_id, post_id, message, file=file)['id']
class TextLine:
def __init__(self, string):
self.line = text_elements(string)
@property
def text(self) -> str:
text = [i.text for i in self.line if isinstance(i, Text)]
return " ".join(text)
@property
def captions(self) -> 'list[Text]':
return [i for i in self.line if isinstance(i, Text)]
@property
def images(self) -> 'list[Image]':
return [i for i in self.line if isinstance(i, Image)]
@property
def emojis(self) -> 'list[Emoji]':
return [i for i in self.line if isinstance(i, Emoji)]
@property
def is_dialogue_line(self):
return len(self.emojis) == 1 and len(self.captions) == 1
@property
def is_argument_line(self):
return len(self.emojis) == 2 and (len(self.captions) == 2 or len(self.captions) == 1)
@property
def is_pure_text_line(self):
return len(self.emojis) == 0 and len(self.images) == 0
@property
def is_big_marsey_line(self):
return len(self.emojis) == 1 and self.emojis[0].big and len(self.captions) == 0
@property
def is_image_line(self):
return len(self.images) == 1 and len(self.captions) == 0
class TextElement():
pass
class Text(TextElement):
def __init__(self, text):
self.text = text
def __repr__(self) -> str:
return f"Text({self.text})"
class Image(TextElement):
def __init__(self, link):
self.link = link
def __repr__(self) -> str:
return f"Link({self.link})"
class Emoji(TextElement):
def __init__(self, emoji, big):
self.emoji = emoji
self.big = big
def __repr__(self) -> str:
return f"Emoji({self.emoji}, big={self.big})"
def get_text_only(text_elements : list[TextElement]) -> str:
text = [i.text for i in text_elements if isinstance(i, Text)]
return " ".join(text)
def text_elements(string : str):
FULL_REGEX = rf"({EMOJI_REGEX})|({PARSED_IMAGE_REGEX})"
elements = re.split(FULL_REGEX, string)
to_return = []
for element in elements:
if element == None:
continue
if element.strip() == "":
continue
if re.match(EMOJI_REGEX, element):
if "#" in element:
big = True
element = element.replace("#","")
else:
big = False
element = element.strip(":")
to_return.append(Emoji(element, big))
elif re.match(PARSED_IMAGE_REGEX, element):
to_return.append(Image(element.strip()[6:]))
else:
to_return.append(Text(element.strip()))
return to_return
def strip_markdown(markdown_string):
markdown_string = re.sub(IMAGE_REGEX, INJECTABLE_IMAGE_REGEX, markdown_string)
markdown_string = re.sub(">.*\n", "", markdown_string)
try:
html = markdown(markdown_string)
except AttributeError:
html = markdown_string #if there is no markdown in the string you get an error
soup = BeautifulSoup(html, "html.parser")
text = ''.join(soup.findAll(text=True))
text = re.sub(r"!blackjack[^ ]*", "", text)
text = re.sub(r"fortune", "", text)
text = re.sub(r"factcheck", "", text)
text = re.sub("!slots.*?\s", "", text)
text = re.sub(r"\"", "'", text)
# make sure there are only letters in the string.
if len(set(list(text.lower())).intersection(set(["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']))) == 0:
return ""
text = re.sub("@(.*?)\s", "", text)
text = re.sub("!slots.*?\s", "", text)
text = re.sub("(?i)trans lives matter", "", text)
return text
def remove_duplicates(list):
return [json.loads(j) for j in set([json.dumps(i) for i in list])]
def get_eligible_comments(rdrama : RDramaAPIInterface, session : Session):
comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data']
comments = [comment for comment in comments if not comment['is_bot']] #No bots
comments = [comment for comment in comments if not comment['author']['id'] == AUTOMEME_ID] #Don't reply to self
comments = [comment for comment in comments if Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST] #Don't spam posts
comments = [comment for comment in comments if User.get_number_of_comments(comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users
comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] #Double check that we haven't replied to the comment
comments = remove_duplicates(comments) #Remove the duplicates
return comments
T = TypeVar('T')
def lambda_count(list : list[T], predicate : 'Callable[[T], bool]' ):
return sum(1 for i in list if predicate(i))
def get_full_rdrama_image_url(partial_url) -> str:
if (TEST_MODE):
return f"http://localhost{partial_url}"
else:
return f"https://rdrama.net{partial_url}"
def main_processing_task(rdrama : RDramaAPIInterface, session : Session):
is_chudded = False #Do we have the chud award?
can_communicate = True #Can we send any message at all?
is_pizzad = False
rdrama.get_front_page()
automeme_information = rdrama.get_user_information(AUTOMEME_ID)
print(f"coins: {automeme_information['coins']} comments: {automeme_information['comment_count']}")
for badge in automeme_information['badges']:
if (badge['name'] == "Marsey Award"):
print("We have the marsey award. STOP.")
can_communicate = False
if (badge['name'] == "Chud"):
print("We have the CHUD award. CONTINUE")
is_chudded = True
if (badge['name'] == "Bird Site Award"):
print("We have the Bird Site Award. STOP.")
can_communicate = False
if (badge['name'] == "Pizzashill Award"):
print("We have the Pizzashill Award. CONTINUE.")
is_pizzad = True
if automeme_information['is_banned']:
print("We are banned. STOP.")
can_communicate = False
if can_communicate:
eligible_comments = get_eligible_comments(rdrama, session)
for eligible_comment in eligible_comments:
under_post_limit = Post.get_number_of_replies(eligible_comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST
under_user_limit = User.get_number_of_comments(eligible_comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY
has_not_replied_to_comment = Comment.get_comment(eligible_comment['id'], session) is None
if (not (under_post_limit and under_user_limit and has_not_replied_to_comment)):
continue
comment_text = eligible_comment['body']
cleaned_comment_text = strip_markdown(comment_text)
comment_lines = cleaned_comment_text.split("\n")
comment_lines = [comment_line for comment_line in comment_lines if comment_line != ""]
element_lines = [TextLine(line) for line in comment_lines]
argument_lines_count, dialog_lines_count, text_lines_count, big_marsey_lines_count = 0,0,0,0
dialog_lines = list(filter(lambda a : a.is_dialogue_line, element_lines))
argument_lines = list(filter(lambda a : a.is_argument_line, element_lines))
pure_text_lines = list(filter(lambda a : a.is_pure_text_line, element_lines))
big_marsey_lines = list(filter(lambda a : a.is_big_marsey_line, element_lines))
image_lines = list(filter(lambda a : a.is_image_line, element_lines))
argument_lines_count = len(argument_lines)
dialog_lines_count = len(dialog_lines)
pure_text_lines_count = len(pure_text_lines)
big_marsey_lines_count = len(big_marsey_lines)
image_lines_count = len(image_lines)
image = None
if (dialog_lines_count == 2):
#Soy vs Chad
line1 = dialog_lines[0]
line2 = dialog_lines[1]
emoji1 = line1.emojis[0].emoji
emoji2 = line2.emojis[0].emoji
caption1 = line1.text
caption2 = line2.text
image = meme_generator.create_soy_vs_chad_meme(emoji1, emoji2, caption1, caption2)
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 1):
# Modern Meme with Marsey
text_line = pure_text_lines[0]
marsey_line = big_marsey_lines[0]
marsey = marsey_line.emojis[0].emoji
caption = text_line.text
image = meme_generator.create_modern_meme_from_emoji(marsey, caption)
elif (image_lines_count == 1 and pure_text_lines_count == 1):
# Modern Meme with Image
text_line = pure_text_lines[0]
image_line = image_lines[0]
image = image_line.images[0].link
full_image_url = get_full_rdrama_image_url(image)
caption = text_line.text
image = meme_generator.create_modern_meme_from_url(full_image_url, caption)
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 2):
# Classic Meme with big marsey
top_text_line = pure_text_lines[0]
bottom_text_line = pure_text_lines[1]
marsey_line = big_marsey_lines[0]
emoji = marsey_line.emojis[0].emoji
top_caption = top_text_line.text
bottom_caption = bottom_text_line.text
image = meme_generator.create_classic_meme_from_emoji(emoji, top_caption, bottom_caption)
elif (image_lines_count == 1 and pure_text_lines_count == 2):
# Classic Meme with Image
top_text_line = pure_text_lines[0]
bottom_text_line = pure_text_lines[1]
image_line = image_lines[0]
image = image_line.images[0].link
full_image_url = get_full_rdrama_image_url(image)
top_caption = top_text_line.text
bottom_caption = bottom_text_line.text
image = meme_generator.create_classic_meme_from_url(full_image_url, top_caption, bottom_caption)
elif (argument_lines_count >= 1 or dialog_lines_count >= 1):
panels : 'list[WebcomicPanel]' = []
for element_line in element_lines:
if element_line.is_dialogue_line:
caption = element_line.text
emoji = element_line.emojis[0].emoji
if len(caption) > 100:
in_background = True
else:
in_background = False
oneCharacterWebcomicPanel = OneCharacterWebcomicPanel(emoji, caption, in_background)
panels.append(oneCharacterWebcomicPanel)
elif element_line.is_argument_line:
left_caption = element_line.captions[0].text
if len(element_line.captions) == 2:
right_caption = element_line.captions[1].text
else:
right_caption = ""
left_emoji = element_line.emojis[0].emoji
right_emoji = element_line.emojis[1].emoji
twoCharacterWebcomicPanel = TwoCharacterWebcomicPanel(left_emoji, left_caption, right_emoji, right_caption)
panels.append(twoCharacterWebcomicPanel)
elif element_line.is_pure_text_line:
panels.append(TitleCardWebcomicPanel(element_line.text))
image = create_webcomic(panels)
if image != None:
image = add_watermark(image, eligible_comment['author']['username'])
user_id = eligible_comment['author']['id']
parent_comment_id = eligible_comment['id']
post_id = eligible_comment['post_id']
automeme_comment_id = comment_with_image("yo got a meme for ya nigga", image, eligible_comment['id'], eligible_comment['post_id'])
Comment.create_new_comment(parent_comment_id, automeme_comment_id, session)
Post.increment_replies(post_id, session)
User.increase_number_of_comments(user_id, session)
if __name__ == "__main__":
TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4"
print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ")
if TEST_MODE:
website = "localhost"
auth = TEST_AUTH_TOKEN
https = False
timeout = 1
AUTOMEME_ID = 7
OPERATOR_ID = 9
ACTUALLY_CALL_OPEN_AI = False
else:
website = "rdrama.net"
with open(get_real_filename("rdrama_auth_token"), "r") as f:
auth = f.read()
https = True
timeout = 10
rdrama = RDramaAPIInterface(auth, website, timeout, https=https)
#Set up fail safe
def exitfunc():
print("*That's it, you're going in the retard squisher.*")
print("FAILSAFE FORCED SHUTDOWN", datetime.now())
os._exit(0)
timer = Timer(60*MINUTES_BEFORE_FORCED_SHUTDOWN, exitfunc)
timer.start()
engine = create_engine(f"sqlite:///{get_real_filename(DB_FILENAME)}")
Base.metadata.create_all(engine)
with Session(engine) as session:
main_processing_task(rdrama, session)
session.commit()
timer.cancel()