automeme/automeme.py

207 lines
8.1 KiB
Python

import base64
import io
import json
import re
import meme_generator
from RDramaAPIInterface import RDramaAPIInterface
from datetime import datetime
from os.path import exists, join, realpath, split
from threading import Timer
from BotModels import Base, Comment, Post, User
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import os
from markdown import markdown
from bs4 import BeautifulSoup
TEST_MODE = True
TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4"
MINUTES_BEFORE_FORCED_SHUTDOWN = 10
DB_FILENAME = "automeme_database.db"
PAGES_TO_SCAN = 5
AUTOMEME_ID = 3 #TODO
ALLOWED_COMMENTS_PER_POST = 20
ALLOWED_COMMENTS_PER_USER_PER_DAY = 20
def get_real_filename(filename : str):
path_to_script = realpath(__file__)
path_to_script_directory, _ = split(path_to_script)
return join(path_to_script_directory, filename)
# rdrama = RDramaAPIInterface(TEST_AUTH_TOKEN, "localhost", sleep=5, https=False)
# print(open('emoji_cache/klanjak.webp', 'rb'))
# image = meme_generator.create_classic_meme_from_emoji("marseyannoyed", "THAT FACE WHEN YOU", "FORGET TO TAKE OUT THE TRASH")
# output = io.BytesIO()
# image.save(output, format="webp")
# file = {'file': ('based.webp', output.getvalue(), 'image/webp')}
# rdrama.reply_to_comment_easy(175, 1, "assddsfssdssdsd", file=file)
def comment_with_image(image, comment_id, post_id):
output = io.BytesIO()
image.save(output, format="webp")
file = {'file': ('based.webp', output.getvalue(), 'image/webp')}
rdrama.reply_to_comment_easy(comment_id, post_id, "ffffff", file=file)
class TextElement():
pass
class Text(TextElement):
def __init__(self, text):
self.text = text
def __repr__(self) -> str:
return f"Text({self.text})"
class Emoji(TextElement):
def __init__(self, emoji, big):
self.emoji = emoji
self.big = big
def __repr__(self) -> str:
return f"Emoji({self.emoji}, big={self.big})"
def get_text_only(text_elements : list[TextElement]) -> str:
text = [i.text for i in text_elements if isinstance(i, Text)]
return " ".join(text)
def text_elements(string : str):
EMOJI_REGEX = r"(:[^ ]*:)"
elements = re.split(EMOJI_REGEX, string)
to_return = []
for element in elements:
if element == "":
continue
if not re.match(EMOJI_REGEX, element):
to_return.append(Text(element.strip()))
else:
if "#" in element:
big = True
element = element.replace("#","")
else:
big = False
element = element.strip(":")
to_return.append(Emoji(element, big))
return to_return
def strip_markdown(markdown_string):
markdown_string = re.sub(">.*\n", "", markdown_string)
try:
html = markdown(markdown_string)
except AttributeError:
html = markdown_string #if there is no markdown in the string you get an error
soup = BeautifulSoup(html, "html.parser")
text = ''.join(soup.findAll(text=True))
text = re.sub(r"!blackjack[^ ]*", "", text)
text = re.sub(r"fortune", "", text)
text = re.sub(r"factcheck", "", text)
text = re.sub("!slots.*?\s", "", text)
text = re.sub(r"\"", "'", text)
# make sure there are only letters in the string.
if len(set(list(text.lower())).intersection(set(["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']))) == 0:
return ""
text = re.sub("@(.*?)\s", "", text)
text = re.sub("!slots.*?\s", "", text)
text = re.sub("(?i)trans lives matter", "", text)
return text
def remove_duplicates(list):
return [json.loads(j) for j in set([json.dumps(i) for i in list])]
def get_eligible_comments(rdrama : RDramaAPIInterface, session : Session):
comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data']
comments = [comment for comment in comments if not comment['is_bot']] #No bots
comments = [comment for comment in comments if not comment['author']['id'] == BBBB_ID] #Don't reply to self
comments = [comment for comment in comments if Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST] #Don't spam posts
comments = [comment for comment in comments if User.get_number_of_comments(comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users
comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] #Double check that we haven't replied to the comment
comments = remove_duplicates(comments) #Remove the duplicates
return comments
def main_processing_task(rdrama : RDramaAPIInterface, session : Session):
is_chudded = False #Do we have the chud award?
can_communicate = True #Can we send any message at all?
is_pizzad = False
rdrama.get_front_page()
bbbb_information = rdrama.get_user_information(BBBB_ID)
print(f"coins: {bbbb_information['coins']} comments: {bbbb_information['comment_count']}")
for badge in bbbb_information['badges']:
if (badge['name'] == "Marsey Award"):
print("We have the marsey award. STOP.")
can_communicate = False
if (badge['name'] == "Chud"):
print("We have the CHUD award. CONTINUE")
is_chudded = True
if (badge['name'] == "Bird Site Award"):
print("We have the Bird Site Award. STOP.")
can_communicate = False
if (badge['name'] == "Pizzashill Award"):
print("We have the Pizzashill Award. CONTINUE.")
is_pizzad = True
if bbbb_information['is_banned']:
print("We are banned. STOP.")
can_communicate = False
if can_communicate:
eligible_comments = get_eligible_comments(rdrama, session)
for eligible_comment in eligible_comments:
comment_text = eligible_comment['body']
cleaned_comment_text = strip_markdown(comment_text)
comment_lines = cleaned_comment_text.split("\n")
comment_lines = [comment_line for comment_line in comment_lines if comment_line != ""]
element_lines = [text_elements(line) for line in comment_lines]
image = None
if (len(element_lines) == 2):
if isinstance(element_lines[0][0], Emoji) and isinstance(element_lines[1][0], Emoji):
emoji1 = element_lines[0][0].emoji
emoji2 = element_lines[1][0].emoji
caption1 = get_text_only(element_lines[0][1:])
caption2 = get_text_only(element_lines[1][1:])
image = meme_generator.create_soy_vs_chad_meme(emoji1, emoji2, caption1, caption2)
if image != None:
comment_with_image(image, eligible_comment['id'], eligible_comment['post_id'])
if __name__ == "__main__":
TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4"
print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ")
if TEST_MODE:
website = "localhost"
auth = TEST_AUTH_TOKEN
https = False
timeout = 1
BBBB_ID = 6
OPERATOR_ID = 9
ACTUALLY_CALL_OPEN_AI = False
else:
website = "rdrama.net"
with open(get_real_filename("rdrama_auth_token"), "r") as f:
auth = f.read()
https = True
timeout = 10
rdrama = RDramaAPIInterface(auth, website, timeout, https=https)
#Set up fail safe
def exitfunc():
print("*That's it, you're going in the retard squisher.*")
print("FAILSAFE FORCED SHUTDOWN", datetime.now())
os._exit(0)
timer = Timer(60*MINUTES_BEFORE_FORCED_SHUTDOWN, exitfunc)
timer.start()
engine = create_engine(f"sqlite:///{get_real_filename(DB_FILENAME)}")
Base.metadata.create_all(engine)
with Session(engine) as session:
main_processing_task(rdrama, session)
session.commit()
timer.cancel()