463 lines
21 KiB
Python
463 lines
21 KiB
Python
import base64
|
||
import io
|
||
import json
|
||
from random import random
|
||
import re
|
||
from typing import Callable, TypeVar
|
||
import meme_generator
|
||
from meme_generator import WebcomicPanel, OneCharacterWebcomicPanel, TwoCharacterWebcomicPanel, TitleCardWebcomicPanel, add_watermark, create_webcomic
|
||
from RDramaAPIInterface import RDramaAPIInterface
|
||
from datetime import datetime
|
||
from os.path import exists, join, realpath, split
|
||
from threading import Timer
|
||
from BotModels import Base, Comment, CommentString, Post, User
|
||
from sqlalchemy import create_engine
|
||
from sqlalchemy.orm import Session
|
||
import os
|
||
from markdown import markdown
|
||
from bs4 import BeautifulSoup
|
||
from utils import get_real_filename
|
||
|
||
TEST_MODE = True
|
||
DRY_MODE = False
|
||
TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4"
|
||
MINUTES_BEFORE_FORCED_SHUTDOWN = 10
|
||
DB_FILENAME = "automeme_database.db"
|
||
PAGES_TO_SCAN = 5
|
||
AUTOMEME_ID = 13427
|
||
ALLOWED_COMMENTS_PER_POST = 20
|
||
ALLOWED_COMMENTS_PER_USER_PER_DAY = 20
|
||
MESSAGE = "i have a meme for u :marseyshy: \nPROTIP: I reply randomly. Put !!meme in your message to make sure I reply"
|
||
|
||
SOY_VS_CHAD_TRIGGER_CHANGE = 1.0
|
||
MODERN_MEME_WITH_MARSEY_TRIGGER_CHANGE = 1.0 #0.1
|
||
MODERN_MEME_WITH_IMAGE_TRIGGER_CHANGE = 1.0
|
||
CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE = 1.0 #0.5
|
||
CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE = 1.0
|
||
WEBCOMIC_TRIGGER_CHANCE = 1.0
|
||
|
||
FREE_POSTS = [6]
|
||
RESTRICTED_POSTS = [5, 16583, 75878, 35835]
|
||
|
||
EMOJI_REGEX = r":[#!a-zA-Z0-9]*:"
|
||
IMAGE_REGEX = r"!\[\]\(/images/([1234567890]*)\.webp\)"
|
||
GIF_REGEX = r"https:\/\/media\.giphy\.com\/media\/([a-zA-Z0-9]*)\/giphy\.webp"
|
||
PARSED_IMAGE_REGEX = r"IMAGE:/images/[1234567890]*\.webp"
|
||
PARSED_GIF_REGEX = r"GIF:[a-zA-Z0-9]*"
|
||
INJECTABLE_IMAGE_REGEX = r"IMAGE:/images/\1\.webp"
|
||
INJECTABLE_GIF_REGEX = r"GIF:\1"
|
||
|
||
# rdrama = RDramaAPIInterface(TEST_AUTH_TOKEN, "localhost", sleep=5, https=False)
|
||
# print(open('emoji_cache/klanjak.webp', 'rb'))
|
||
|
||
# image = meme_generator.create_classic_meme_from_emoji("marseyannoyed", "THAT FACE WHEN YOU", "FORGET TO TAKE OUT THE TRASH")
|
||
# output = io.BytesIO()
|
||
# image.save(output, format="webp")
|
||
# file = {'file': ('based.webp', output.getvalue(), 'image/webp')}
|
||
# rdrama.reply_to_comment_easy(175, 1, "assddsfssdssdsd", file=file)
|
||
|
||
def comment_with_image(message, image, comment_id, post_id):
|
||
file = {'file': ('based.webp', image.get_binary(), 'image/webp')}
|
||
return rdrama.reply_to_comment_easy(comment_id, post_id, message, file=file)['id']
|
||
|
||
class TextLine:
|
||
def __init__(self, string):
|
||
self.line = text_elements(string)
|
||
|
||
@property
|
||
def text(self) -> str:
|
||
text = [i.text for i in self.line if isinstance(i, Text)]
|
||
return " ".join(text)
|
||
|
||
@property
|
||
def captions(self) -> 'list[Text]':
|
||
return [i for i in self.line if isinstance(i, Text)]
|
||
|
||
@property
|
||
def images(self) -> 'list[Image]':
|
||
return [i for i in self.line if isinstance(i, Image)]
|
||
|
||
@property
|
||
def emojis(self) -> 'list[Emoji]':
|
||
return [i for i in self.line if isinstance(i, Emoji)]
|
||
|
||
@property
|
||
def is_dialogue_line(self):
|
||
return len(self.emojis) == 1 and len(self.captions) == 1 and isinstance(self.line[0], Emoji)
|
||
|
||
@property
|
||
def is_argument_line(self):
|
||
return len(self.emojis) == 2 and (len(self.captions) == 2 or len(self.captions) == 1) and isinstance(self.line[0], Emoji) and isinstance(self.line[2], Emoji)
|
||
|
||
@property
|
||
def is_pure_text_line(self):
|
||
return len(self.emojis) == 0 and len(self.images) == 0
|
||
|
||
@property
|
||
def is_big_marsey_line(self):
|
||
return len(self.emojis) == 1 and self.emojis[0].big and len(self.captions) == 0
|
||
|
||
@property
|
||
def is_image_line(self):
|
||
return len(self.images) == 1 and len(self.captions) == 0
|
||
|
||
class TextElement():
|
||
pass
|
||
|
||
class Text(TextElement):
|
||
def __init__(self, text):
|
||
self.text = text
|
||
|
||
def __repr__(self) -> str:
|
||
return f"Text({self.text})"
|
||
|
||
class Image(TextElement):
|
||
def __init__(self, link, gif):
|
||
self.link = link
|
||
self.gif = gif
|
||
|
||
def __repr__(self) -> str:
|
||
return f"Image({self.link}, gif = {self.gif})"
|
||
|
||
@property
|
||
def url(self):
|
||
if not self.gif:
|
||
return get_full_rdrama_image_url(self.link)
|
||
else:
|
||
return f"https://media.giphy.com/media/{self.link}/giphy.webp"
|
||
class Emoji(TextElement):
|
||
def __init__(self, emoji, big):
|
||
self.emoji = emoji
|
||
self.big = big
|
||
|
||
def __repr__(self) -> str:
|
||
return f"Emoji({self.emoji}, big={self.big})"
|
||
|
||
def get_text_only(text_elements : list[TextElement]) -> str:
|
||
text = [i.text for i in text_elements if isinstance(i, Text)]
|
||
return " ".join(text)
|
||
|
||
def text_elements(string : str):
|
||
FULL_REGEX = rf"({EMOJI_REGEX})|({PARSED_IMAGE_REGEX})|({PARSED_GIF_REGEX})"
|
||
elements = re.split(FULL_REGEX, string)
|
||
to_return = []
|
||
for element in elements:
|
||
if element == None:
|
||
continue
|
||
if element.strip() == "":
|
||
continue
|
||
if re.match(EMOJI_REGEX, element):
|
||
if "#" in element:
|
||
big = True
|
||
element = element.replace("#","")
|
||
else:
|
||
big = False
|
||
element = element.strip(":")
|
||
to_return.append(Emoji(element, big))
|
||
elif re.match(PARSED_GIF_REGEX, element):
|
||
to_return.append(Image(element.strip()[4:], True))
|
||
elif re.match(PARSED_IMAGE_REGEX, element):
|
||
to_return.append(Image(element.strip()[6:], False))
|
||
else:
|
||
to_return.append(Text(element.strip()))
|
||
return to_return
|
||
|
||
def strip_markdown(markdown_string):
|
||
markdown_string = re.sub(IMAGE_REGEX, INJECTABLE_IMAGE_REGEX, markdown_string)
|
||
markdown_string = re.sub(GIF_REGEX, INJECTABLE_GIF_REGEX, markdown_string)
|
||
markdown_string = re.sub("!![^\s]*", "", markdown_string)
|
||
markdown_string = re.sub(">.*\n", "", markdown_string)
|
||
try:
|
||
html = markdown(markdown_string)
|
||
except AttributeError:
|
||
html = markdown_string #if there is no markdown in the string you get an error
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
text = ''.join(soup.findAll(text=True))
|
||
text = re.sub(r"!blackjack[^ ]*", "", text)
|
||
text = re.sub(r"fortune", "", text)
|
||
text = re.sub(r"factcheck", "", text)
|
||
text = re.sub("!slots.*?\s", "", text)
|
||
|
||
text = re.sub(r"\"", "'", text)
|
||
|
||
# make sure there are only letters in the string.
|
||
if len(set(list(text.lower())).intersection(set(["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']))) == 0:
|
||
return ""
|
||
|
||
text = re.sub("@(.*?)\s", "", text)
|
||
text = re.sub("!slots.*?\s", "", text)
|
||
|
||
text = re.sub("(?i)trans lives matter", "", text)
|
||
|
||
return text
|
||
|
||
def remove_duplicates(list):
|
||
return [json.loads(j) for j in set([json.dumps(i) for i in list])]
|
||
|
||
def get_eligible_comments(rdrama : RDramaAPIInterface, session : Session):
|
||
comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data']
|
||
comments = [comment for comment in comments if not comment['is_bot']] #No bots
|
||
comments = [comment for comment in comments if comment['author'] == '👻' or not comment['author']['id'] == AUTOMEME_ID] #Don't reply to self
|
||
comments = [comment for comment in comments if Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST] #Don't spam posts
|
||
comments = [comment for comment in comments if User.get_number_of_comments(0 if comment['author'] == '👻' else comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users
|
||
comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] #Double check that we haven't replied to the comment
|
||
comments = remove_duplicates(comments) #Remove the duplicates
|
||
|
||
for comment in comments:
|
||
user_id = 0 if comment['author'] == '👻' else comment['author']['id']
|
||
comment_id = comment['id']
|
||
text = comment['body']
|
||
time = datetime.fromtimestamp(comment['created_utc'])
|
||
|
||
CommentString.add_comment(comment_id, user_id, time, text, session)
|
||
|
||
return comments
|
||
|
||
T = TypeVar('T')
|
||
def lambda_count(list : list[T], predicate : 'Callable[[T], bool]' ):
|
||
return sum(1 for i in list if predicate(i))
|
||
|
||
def get_full_rdrama_image_url(partial_url) -> str:
|
||
if (TEST_MODE):
|
||
return f"http://localhost{partial_url}"
|
||
else:
|
||
return f"https://rdrama.net{partial_url}"
|
||
|
||
def extract_directives(string : str):
|
||
list = re.findall("!![^\s]*", string)
|
||
list = [i.lower()[2:] for i in list]
|
||
return list
|
||
|
||
def create_comment_message(chud: bool, pizza: bool, bird : bool, marsey : bool):
|
||
if (marsey):
|
||
return ":marseyshy:"
|
||
|
||
message = MESSAGE
|
||
|
||
if (chud):
|
||
message += "\ntrans lives matter"
|
||
if (pizza):
|
||
message += "\nUpon closer examination, one can see how Bikini Bottom is an allegory for the destiny of Faustian europe. Squidward clearly represents the Apollonian aryan man, stern, cultured, full of the creative Hyperborean soul and solar aristocratic character, and yet there is a tragic aspect to his character, in that his spiritual and creative passions are given no place in the degenrate modernistic kosher bacchanale that is modern Krabs-run Bikini Bottom. Spongebob represents the Dionysian aspect of aryan culture- he is the unawakened gentile, who has succumbed to cultural marxist brainwashing. Always maintaining a cheerful, carefree demeanor, and yet fawning and servile before the semitic crustacean power structure. Patrick represents the introduction of the primitive negroidic blood into the formerly pure white ethnos- He is an utter buffoon and unproductive anti-social drain on society, and yet the good goy Spongebob has been conditioned to accept his friendship, unaware of how his own way of life is gradually succumbing to the cthonic, subterranean negroid elements. As I’m sure we’re all aware, well aware, Mr. Krabs represents the eternal merchant himself, as while not only is he a filthy money-grubber, it is shown, in season 1 episode 12, that he is pushing race-mixing upon the racially unaware Spongebob by trying to set him up on a date with his ball busting yenta whale daughter Pearl. Clearly, this show was ahead of its time."
|
||
|
||
return message
|
||
|
||
def main_processing_task(rdrama : RDramaAPIInterface, session : Session):
|
||
is_chudded = False #Do we have the chud award?
|
||
can_communicate = True #Can we send any message at all?
|
||
is_pizzad = False
|
||
is_marseyed = False
|
||
is_birdsite = False
|
||
rdrama.get_front_page()
|
||
automeme_information = rdrama.get_user_information(AUTOMEME_ID)
|
||
print(f"coins: {automeme_information['coins']} comments: {automeme_information['comment_count']}")
|
||
for badge in automeme_information['badges']:
|
||
if (badge['name'] == "Marsey Award"):
|
||
print("We have the marsey award. STOP.")
|
||
is_marseyed = True
|
||
can_communicate = True
|
||
if (badge['name'] == "Chud"):
|
||
print("We have the CHUD award. CONTINUE")
|
||
is_chudded = True
|
||
if (badge['name'] == "Bird Site Award"):
|
||
print("We have the Bird Site Award. STOP.")
|
||
can_communicate = False
|
||
is_birdsite = True
|
||
if (badge['name'] == "Pizzashill Award"):
|
||
print("We have the Pizzashill Award. CONTINUE.")
|
||
is_pizzad = True
|
||
|
||
if automeme_information['is_banned']:
|
||
print("We are banned. STOP.")
|
||
can_communicate = False
|
||
|
||
if can_communicate:
|
||
eligible_comments = get_eligible_comments(rdrama, session)
|
||
for eligible_comment in eligible_comments:
|
||
begin = datetime.now()
|
||
under_post_limit = Post.get_number_of_replies(eligible_comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST
|
||
under_user_limit = User.get_number_of_comments(0 if eligible_comment['author'] == '👻' else eligible_comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY
|
||
has_not_replied_to_comment = Comment.get_comment(eligible_comment['id'], session) is None
|
||
is_not_restricted_post = eligible_comment['post_id'] not in RESTRICTED_POSTS
|
||
if (not (under_post_limit and under_user_limit and has_not_replied_to_comment and is_not_restricted_post)):
|
||
continue
|
||
|
||
comment_text = eligible_comment['body']
|
||
directives = extract_directives(comment_text)
|
||
cleaned_comment_text = strip_markdown(comment_text)
|
||
|
||
|
||
if ("meme" in directives):
|
||
random_float = 0.0
|
||
else:
|
||
random_float = random()
|
||
|
||
comment_lines = cleaned_comment_text.split("\n")
|
||
comment_lines = [comment_line for comment_line in comment_lines if comment_line != ""]
|
||
element_lines = [TextLine(line) for line in comment_lines]
|
||
|
||
argument_lines_count, dialog_lines_count, text_lines_count, big_marsey_lines_count = 0,0,0,0
|
||
|
||
dialog_lines = list(filter(lambda a : a.is_dialogue_line, element_lines))
|
||
argument_lines = list(filter(lambda a : a.is_argument_line, element_lines))
|
||
pure_text_lines = list(filter(lambda a : a.is_pure_text_line, element_lines))
|
||
big_marsey_lines = list(filter(lambda a : a.is_big_marsey_line, element_lines))
|
||
image_lines = list(filter(lambda a : a.is_image_line, element_lines))
|
||
|
||
argument_lines_count = len(argument_lines)
|
||
dialog_lines_count = len(dialog_lines)
|
||
pure_text_lines_count = len(pure_text_lines)
|
||
big_marsey_lines_count = len(big_marsey_lines)
|
||
image_lines_count = len(image_lines)
|
||
|
||
image = None
|
||
if (dialog_lines_count == 2):
|
||
print(f"[{eligible_comment['id']}] SOY_VS_CHAD")
|
||
if (random_float <= SOY_VS_CHAD_TRIGGER_CHANGE):
|
||
#Soy vs Chad
|
||
|
||
line1 = dialog_lines[0]
|
||
line2 = dialog_lines[1]
|
||
|
||
emoji1 = line1.emojis[0].emoji
|
||
emoji2 = line2.emojis[0].emoji
|
||
caption1 = line1.text
|
||
caption2 = line2.text
|
||
|
||
image = meme_generator.create_soy_vs_chad_meme(emoji1, emoji2, caption1, caption2)
|
||
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 1):
|
||
print(f"[{eligible_comment['id']}] MODERN_MEME_WITH_MARSEY")
|
||
if (random_float <= MODERN_MEME_WITH_MARSEY_TRIGGER_CHANGE):
|
||
|
||
# Modern Meme with Marsey
|
||
text_line = pure_text_lines[0]
|
||
marsey_line = big_marsey_lines[0]
|
||
|
||
marsey = marsey_line.emojis[0].emoji
|
||
caption = text_line.text
|
||
|
||
image = meme_generator.create_modern_meme_from_emoji(marsey, caption)
|
||
elif (image_lines_count == 1 and pure_text_lines_count == 1):
|
||
print(f"[{eligible_comment['id']}] MODERN_MEME_WITH_IMAGE")
|
||
if (random_float <= MODERN_MEME_WITH_IMAGE_TRIGGER_CHANGE):
|
||
# Modern Meme with Image
|
||
text_line = pure_text_lines[0]
|
||
image_line = image_lines[0]
|
||
|
||
image = image_line.images[0]
|
||
full_image_url = image.url
|
||
caption = text_line.text
|
||
|
||
image = meme_generator.create_modern_meme_from_url(full_image_url, caption)
|
||
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 2):
|
||
print(f"[{eligible_comment['id']}] CLASSIC_MEME_WITH_MARSEY")
|
||
if (random_float <= CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE):
|
||
# Classic Meme with big marsey
|
||
top_text_line = pure_text_lines[0]
|
||
bottom_text_line = pure_text_lines[1]
|
||
marsey_line = big_marsey_lines[0]
|
||
|
||
emoji = marsey_line.emojis[0].emoji
|
||
top_caption = top_text_line.text
|
||
bottom_caption = bottom_text_line.text
|
||
image = meme_generator.create_classic_meme_from_emoji(emoji, top_caption, bottom_caption)
|
||
elif (image_lines_count == 1 and pure_text_lines_count == 2):
|
||
print(f"[{eligible_comment['id']}] CLASSIC_MEME_WITH_IMAGE")
|
||
if (random_float <= CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE):
|
||
# Classic Meme with Image
|
||
top_text_line = pure_text_lines[0]
|
||
bottom_text_line = pure_text_lines[1]
|
||
image_line = image_lines[0]
|
||
|
||
image = image_line.images[0]
|
||
full_image_url = image.url
|
||
top_caption = top_text_line.text
|
||
bottom_caption = bottom_text_line.text
|
||
|
||
image = meme_generator.create_classic_meme_from_url(full_image_url, top_caption, bottom_caption)
|
||
elif (argument_lines_count + dialog_lines_count >= 2):
|
||
print(f"[{eligible_comment['id']}] WEBCOMIC")
|
||
if (random_float <= WEBCOMIC_TRIGGER_CHANCE):
|
||
panels : 'list[WebcomicPanel]' = []
|
||
|
||
for element_line in element_lines:
|
||
if element_line.is_dialogue_line:
|
||
caption = element_line.text
|
||
emoji = element_line.emojis[0].emoji
|
||
if len(caption) > 100:
|
||
in_background = True
|
||
else:
|
||
in_background = False
|
||
oneCharacterWebcomicPanel = OneCharacterWebcomicPanel(emoji, caption, in_background)
|
||
panels.append(oneCharacterWebcomicPanel)
|
||
elif element_line.is_argument_line:
|
||
left_caption = element_line.captions[0].text
|
||
if len(element_line.captions) == 2:
|
||
right_caption = element_line.captions[1].text
|
||
else:
|
||
right_caption = ""
|
||
left_emoji = element_line.emojis[0].emoji
|
||
right_emoji = element_line.emojis[1].emoji
|
||
twoCharacterWebcomicPanel = TwoCharacterWebcomicPanel(left_emoji, left_caption, right_emoji, right_caption)
|
||
panels.append(twoCharacterWebcomicPanel)
|
||
elif element_line.is_pure_text_line:
|
||
panels.append(TitleCardWebcomicPanel(element_line.text))
|
||
|
||
image = create_webcomic(panels)
|
||
if image != None:
|
||
print(f"[{eligible_comment['id']}] posting...")
|
||
image = add_watermark(image, eligible_comment['author']['username'])
|
||
user_id = 0 if eligible_comment['author'] == '👻' else eligible_comment['author']['id']
|
||
parent_comment_id = eligible_comment['id']
|
||
post_id = eligible_comment['post_id']
|
||
message = create_comment_message(is_chudded, is_pizzad, is_birdsite, is_marseyed)
|
||
if not DRY_MODE:
|
||
automeme_comment_id = comment_with_image(message, image, eligible_comment['id'], eligible_comment['post_id'])
|
||
else:
|
||
automeme_comment_id = None
|
||
image.save(f"dry/{eligible_comment['id']}.webp")
|
||
|
||
Comment.create_new_comment(parent_comment_id, automeme_comment_id, session)
|
||
if post_id not in FREE_POSTS:
|
||
Post.increment_replies(post_id, session)
|
||
User.increase_number_of_comments(user_id, session)
|
||
else:
|
||
print(f"[{eligible_comment['id']}] is a free post.")
|
||
end = datetime.now()
|
||
print(end-begin)
|
||
else:
|
||
Comment.create_new_comment(eligible_comment['id'], None, session)
|
||
|
||
if __name__ == "__main__":
|
||
TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4"
|
||
print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ")
|
||
if TEST_MODE:
|
||
website = "localhost"
|
||
auth = TEST_AUTH_TOKEN
|
||
https = False
|
||
timeout = 1
|
||
AUTOMEME_ID = 7
|
||
OPERATOR_ID = 9
|
||
ACTUALLY_CALL_OPEN_AI = False
|
||
else:
|
||
website = "rdrama.net"
|
||
with open(get_real_filename("rdrama_auth_token"), "r") as f:
|
||
auth = f.read()
|
||
https = True
|
||
timeout = 10
|
||
rdrama = RDramaAPIInterface(auth, website, timeout, https=https)
|
||
|
||
#Set up fail safe
|
||
def exitfunc():
|
||
print("*That's it, you're going in the retard squisher.*")
|
||
print("FAILSAFE FORCED SHUTDOWN", datetime.now())
|
||
os._exit(0)
|
||
|
||
timer = Timer(60*MINUTES_BEFORE_FORCED_SHUTDOWN, exitfunc)
|
||
timer.start()
|
||
engine = create_engine(f"sqlite:///{get_real_filename(DB_FILENAME)}")
|
||
Base.metadata.create_all(engine)
|
||
|
||
with Session(engine) as session:
|
||
main_processing_task(rdrama, session)
|
||
session.commit()
|
||
timer.cancel() |