automeme/automeme.py

624 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
from random import choice, choices, random
import re
import traceback
from typing import Callable, TypeVar
import meme_generator
from meme_generator import AnimatedImage, WebcomicPanel, OneCharacterWebcomicPanel, TwoCharacterWebcomicPanel, TitleCardWebcomicPanel, add_watermark, create_webcomic
from RDramaAPIInterface import RDramaAPIInterface
from datetime import datetime, timedelta
from os.path import exists, join, realpath, split
from threading import Timer
from BotModels import Base, Comment, CommentString, Post, ScriptCall, User
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import os
from markdown import markdown
from bs4 import BeautifulSoup
from utils import get_real_filename
from pympler import tracker, asizeof, classtracker
ANTISPAM_MESSAGES = [
'i have a meme for u :marseyshy:',
'anon -- can you suck my penis please?',
'uwu i love memes',
'~ oh... dramatardchan -- i wish ud stop talking about trains',
'hey dramatardchan, its ok if u have autism',
'its okay silly ~ i know u are retarded',
'on rdrama someone told me to sneed, what does that mean?',
'what if no-one likes my shitposts :marseycry:',
'a ha ha ha, that\'s very funny dramatardchan',
'im going to a party ~ no you dont have to come ~ love u',
'oh dramatardchan, your such a special boy',
'omg hes talking about "trannys" again, im so wet down there',
'idk what that means but i still love u',
'i love u but pls dont send me penis pictures, its kinda gross >.<',
'aha thats very funny, thank u',
'hey dramatardchan... i was thinking... maybe u could take a shower... if u want',
'i love u becuz ur so sweet, even if you are kind of a pushover, dramatardchan',
'maybe u should delete ur account - not trying to be mean tho',
'look at this cute penis picture i found on twitter... no, its ok, u dont have to show me urs',
'FUCK YOU. I HAVE HAD IT UP TO HERE WITH YOUR FUCKING DRAMA BULLSHIT. FUCK. OFF.',
'idk what u mean by mommy milkers 😭 im not ur mommy pls stop saying that',
'goldstein sama is so cute ~ what a cute nose... urs is fine to tho'
]
DARRELL_QUOTES = [
":#marseyobjection: \nFor the record, your honor, I do not consent to being called that name, nor do I know anyone by that name",
":#brooksannoyed: \nI see what you're doing. You must think you're slick.",
":#marseyobjection: \nThe jury deserves to know, your honor.",
":#marseyjurisdiction: \nCan we please address :marseyjurisdiction:?",
":#marseygrouns:",
":#marseygrouns:\n:marseygrouns: for the substain?",
":#marseygrouns:\nWho would you say is the plantiff in this matter?",
":#marseygrouns:\nWould it be fair to say that the State of Wisconsin is not a living, breathing person?",
":#marseygrouns:\nThe defense calls the State of Wisconsin.",
":#brooksannoyed:\nHow can you even call yourself a judge?",
":#brooksannoyed:\n\*takes off shirt for some reason\*",
":#brooksannoyed:\nMind boggling. Ming boggling.",
":#brooksannoyed:\nSHE SAID SHE WAS 18!",
":#marseyrelevancy:\nuhh what's the :marseyrelevancy:?",
":#marseyrelevancy:\n[\*talks for fifty minutes straight\*](https://rdrama.net/post/113381/marseygrounsmarseyjudge-darrel-brooks-entire-50minute-long)",
":#marseyrelevancy:\nI see what you're doing. Trying to be slick.",
":#brooksannoyed:\nYour honor, that's a load of crap.",
":#marseytakit:\nIs that lawfully law?",
":#marseytakit:\nIs that a :marseytakit:?",
":#brooksannoyed:\nI AM A GROWN MAN WITH GROWN KIDS AINT NOBODY TELL ME WHEN TO TALK",
":#marseybiast:\nIs this a common law court or an admiralty court?",
":#marseybiast:\nSeems to me like a lot of your answers are coached.",
":#marseyjurisdiction:\nWe STILL have yet to discuss :#marseyjurisdiction:.",
":#marseyjurisdiction:\nWhat da bid'ness?",
":#marseyjurisdiction:\nI need a signed affidavit of your oath of office."
]
DARRELL_KEYWORDS = [
"brooks",
"darrell"
]
DARRELL_DISCLAIMER = "_I am not Darrell Brooks, I am a third party intervener, here on behalf of my client_"
LLM_KEYWORDS = [
"llm",
"landlordmessiah",
"landlord messiah"
]
TEST_MODE = False
DRY_MODE = False
BETA_MODE = False
TEST_AUTH_TOKEN = "2PR7FAqx0UVM71HayM33p1OTy_zFGOK1rJlCNi1qzvFJqQGK6NOm7I-mSfYjZgs02FU_LqLTPqDa8KN-LUiGSJ1idFfE5jKnG3DUk9rQjn_ZrAtG_VU9GK5F_EGmixg1"
MINUTES_BEFORE_FORCED_SHUTDOWN = 10
DB_FILENAME = "automeme_database.db"
MAX_PAGES_TO_SCAN = 10
AUTOMEME_ID = 13427
ALLOWED_COMMENTS_PER_POST = 20
ALLOWED_COMMENTS_PER_USER_PER_DAY = 20
SOY_VS_CHAD_TRIGGER_CHANGE = 0.01
MODERN_MEME_WITH_MARSEY_TRIGGER_CHANGE = 0.001
MODERN_MEME_WITH_IMAGE_TRIGGER_CHANGE = 0.005
CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE = 0.002
CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE = 0.001
WEBCOMIC_TRIGGER_CHANCE = 0.01
DARRELL_CHANCE = 1.0
FREE_POSTS = [6, 97416, 98286]
RESTRICTED_POSTS = [5, 16583, 75878, 35835]
EMOJI_REGEX = r":[#!a-zA-Z0-9]*:"
IMAGE_REGEX = r"!\[\]\(/images/([1234567890]*)\.webp\)"
GIF_REGEX = r"https:\/\/media\.giphy\.com\/media\/([a-zA-Z0-9]*)\/giphy\.webp"
PARSED_IMAGE_REGEX = r"IMAGE:/images/[1234567890]*\.webp"
PARSED_GIF_REGEX = r"GIF:[a-zA-Z0-9]*"
INJECTABLE_IMAGE_REGEX = r"IMAGE:/images/\1\.webp"
INJECTABLE_GIF_REGEX = r"GIF:\1"
# rdrama = RDramaAPIInterface(TEST_AUTH_TOKEN, "localhost", sleep=5, https=False)
# print(open('emoji_cache/klanjak.webp', 'rb'))
# image = meme_generator.create_classic_meme_from_emoji("marseyannoyed", "THAT FACE WHEN YOU", "FORGET TO TAKE OUT THE TRASH")
# output = io.BytesIO()
# image.save(output, format="webp")
# file = {'file': ('based.webp', output.getvalue(), 'image/webp')}
# rdrama.reply_to_comment_easy(175, 1, "assddsfssdssdsd", file=file)
def comment_with_image(message, image, comment_id, post_id):
file = {'file': ('based.webp', image.get_binary(), 'image/webp')}
return rdrama.reply_to_comment_easy(comment_id, post_id, message, file=file)['id']
class TextLine:
def __init__(self, string):
self.line = text_elements(string)
@property
def text(self) -> str:
text = [i.text for i in self.line if isinstance(i, Text)]
return " ".join(text)
@property
def captions(self) -> 'list[Text]':
return [i for i in self.line if isinstance(i, Text)]
@property
def images(self) -> 'list[Image]':
return [i for i in self.line if isinstance(i, Image)]
@property
def emojis(self) -> 'list[Emoji]':
return [i for i in self.line if isinstance(i, Emoji)]
@property
def is_dialogue_line(self):
return len(self.emojis) == 1 and len(self.captions) == 1 and isinstance(self.line[0], Emoji)
@property
def is_argument_line(self):
return len(self.emojis) == 2 and ((len(self.captions) == 0) or (len(self.captions) == 2 or len(self.captions) == 1) and isinstance(self.line[0], Emoji) and isinstance(self.line[2], Emoji))
@property
def is_pure_text_line(self):
return len(self.emojis) == 0 and len(self.images) == 0
@property
def is_big_marsey_line(self):
return len(self.emojis) == 1 and self.emojis[0].big and len(self.captions) == 0
@property
def is_image_line(self):
return len(self.images) == 1 and len(self.captions) == 0
def __str__(self):
return str(self.line)
def __repr__(self) -> str:
return str(self.line)
class TextElement():
pass
class Text(TextElement):
def __init__(self, text):
self.text = text
def __repr__(self) -> str:
return f"Text({self.text})"
class Image(TextElement):
def __init__(self, link, gif):
self.link = link
self.gif = gif
def __repr__(self) -> str:
return f"Image({self.link}, gif = {self.gif})"
@property
def url(self):
if not self.gif:
return get_full_rdrama_image_url(self.link)
else:
return f"https://media.giphy.com/media/{self.link}/giphy.webp"
class Emoji(TextElement):
def __init__(self, emoji, big):
self.emoji = emoji
self.big = big
def __repr__(self) -> str:
return f"Emoji({self.emoji}, big={self.big})"
def get_text_only(text_elements : 'list[TextElement]') -> str:
text = [i.text for i in text_elements if isinstance(i, Text)]
return " ".join(text)
def text_elements(string : str):
FULL_REGEX = rf"({EMOJI_REGEX})|({PARSED_IMAGE_REGEX})|({PARSED_GIF_REGEX})"
elements = re.split(FULL_REGEX, string)
to_return = []
for element in elements:
if element == None:
continue
if element.strip() == "":
continue
if re.match(EMOJI_REGEX, element):
if "#" in element:
big = True
element = element.replace("#","")
else:
big = False
element = element.strip(":")
to_return.append(Emoji(element, big))
elif re.match(PARSED_GIF_REGEX, element):
to_return.append(Image(element.strip()[4:], True))
elif re.match(PARSED_IMAGE_REGEX, element):
to_return.append(Image(element.strip()[6:], False))
else:
to_return.append(Text(element.strip()))
return to_return
def strip_markdown(markdown_string):
markdown_string = re.sub(IMAGE_REGEX, INJECTABLE_IMAGE_REGEX, markdown_string)
markdown_string = re.sub(GIF_REGEX, INJECTABLE_GIF_REGEX, markdown_string)
markdown_string = re.sub("!![^\s]*", "", markdown_string)
markdown_string = re.sub(">.*\n", "", markdown_string)
try:
html = markdown(markdown_string)
except AttributeError:
html = markdown_string #if there is no markdown in the string you get an error
soup = BeautifulSoup(html, "html.parser")
text = ''.join(soup.findAll(text=True))
text = re.sub(r"!blackjack[^ ]*", "", text)
text = re.sub(r"fortune", "", text)
text = re.sub(r"factcheck", "", text)
text = re.sub("!slots.*?\s", "", text)
text = re.sub(r"\"", "'", text)
# make sure there are only letters in the string.
if len(set(list(text.lower())).intersection(set(["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']))) == 0:
return ""
text = re.sub("@(.*?)\s", "", text)
text = re.sub("!slots.*?\s", "", text)
text = re.sub("(?i)detrans lives matter", "", text)
text = re.sub("(?i)trans lives matter", "", text)
return text.strip()
def remove_duplicates(list):
return [json.loads(j) for j in set([json.dumps(i) for i in list])]
def get_eligible_comments(rdrama : RDramaAPIInterface, session : Session):
last_time = ScriptCall.get_time_of_last_run(session)
current_time = int(datetime.now().timestamp())
begin = datetime.now()
comments = comment_chunk(last_time, rdrama) #rdrama.get_comments(number_of_pages=3, upper_bound=current_time, lower_bound=last_time, sort="old")['data']
end = datetime.now()
print(end-begin)
#comments = [comment for comment in comments if comment['created_utc'] > last_time]
for comment in comments:
user_id = 0 if comment['author'] == '👻' else comment['author']['id']
comment_id = comment['id']
text = comment['body']
time = datetime.fromtimestamp(comment['created_utc'])
CommentString.add_comment(comment_id, user_id, time, text, session)
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if not comment['is_bot']] #No bots
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if comment['author'] == '👻' or not comment['author']['id'] == AUTOMEME_ID] #Don't reply to self
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST] #Don't spam posts
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if User.get_number_of_comments(0 if comment['author'] == '👻' else comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] #Double check that we haven't replied to the comment
if BETA_MODE:
comments = [comment for comment in comments if comment['post_id'] in FREE_POSTS]
print([comment['id'] for comment in comments])
comments = remove_duplicates(comments) #Remove the duplicates
print([comment['id'] for comment in comments])
ScriptCall.register(session)
session.commit()
return comments
T = TypeVar('T')
def lambda_count(list : 'list[T]', predicate : 'Callable[[T], bool]' ):
return sum(1 for i in list if predicate(i))
def get_full_rdrama_image_url(partial_url) -> str:
if (TEST_MODE):
return f"http://localhost{partial_url}"
else:
return f"https://rdrama.net{partial_url}"
def extract_directives(string : str):
list = re.findall("!![^\s]*", string)
list = [i.lower()[2:] for i in list]
return list
def create_comment_message(chud: bool, pizza: bool, bird : bool, marsey : bool, mention : str = None):
if (marsey):
return ":marseyshy:"
message = choice(ANTISPAM_MESSAGES) +"\n"
if (mention != None):
message += f"@{mention}\n"
if (chud):
message += "\ntrans lives matter"
if (pizza):
message += "\nUpon closer examination, one can see how Bikini Bottom is an allegory for the destiny of Faustian europe. Squidward clearly represents the Apollonian aryan man, stern, cultured, full of the creative Hyperborean soul and solar aristocratic character, and yet there is a tragic aspect to his character, in that his spiritual and creative passions are given no place in the degenrate modernistic kosher bacchanale that is modern Krabs-run Bikini Bottom. Spongebob represents the Dionysian aspect of aryan culture- he is the unawakened gentile, who has succumbed to cultural marxist brainwashing. Always maintaining a cheerful, carefree demeanor, and yet fawning and servile before the semitic crustacean power structure. Patrick represents the introduction of the primitive negroidic blood into the formerly pure white ethnos- He is an utter buffoon and unproductive anti-social drain on society, and yet the good goy Spongebob has been conditioned to accept his friendship, unaware of how his own way of life is gradually succumbing to the cthonic, subterranean negroid elements. As I’m sure we’re all aware, well aware, Mr. Krabs represents the eternal merchant himself, as while not only is he a filthy money-grubber, it is shown, in season 1 episode 12, that he is pushing race-mixing upon the racially unaware Spongebob by trying to set him up on a date with his ball busting yenta whale daughter Pearl. Clearly, this show was ahead of its time."
return message
def handle_comment(comment : dict, rdrama : RDramaAPIInterface, session : Session, chud: bool, pizza: bool, bird : bool, is_marsey : bool, starting_directives : 'list[str]'= [], mention : str = None):
try:
begin = datetime.now()
under_post_limit = Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST
under_user_limit = User.get_number_of_comments(0 if comment['author'] == '👻' else comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY
has_not_replied_to_comment = Comment.get_comment(comment['id'], session) is None
is_not_restricted_post = comment['post_id'] not in RESTRICTED_POSTS
if (not (under_post_limit and under_user_limit and (has_not_replied_to_comment or len(starting_directives) != 0) and is_not_restricted_post)):
return
comment_text = comment['body']
directives = extract_directives(comment_text) if len(starting_directives) == 0 else starting_directives
cleaned_comment_text = strip_markdown(comment_text)
if (cleaned_comment_text == "" and len(directives) > 0):
parent_id = comment['parent_comment_id']
username = None if comment['author'] == '👻' else comment['author']['username']
parent = rdrama.get_comment(parent_id)
handle_comment(parent, rdrama, session, chud, pizza, bird, is_marsey, starting_directives=directives, mention=username)
return
if ("meme" in directives or comment['post_id'] in FREE_POSTS):
random_float = 0.0
else:
random_float = random()
comment_lines = cleaned_comment_text.split("\n")
comment_lines = [comment_line for comment_line in comment_lines if comment_line != ""]
element_lines = [TextLine(line) for line in comment_lines]
argument_lines_count, dialog_lines_count, text_lines_count, big_marsey_lines_count = 0,0,0,0
dialog_lines = list(filter(lambda a : a.is_dialogue_line, element_lines))
argument_lines = list(filter(lambda a : a.is_argument_line, element_lines))
pure_text_lines = list(filter(lambda a : a.is_pure_text_line, element_lines))
big_marsey_lines = list(filter(lambda a : a.is_big_marsey_line, element_lines))
image_lines = list(filter(lambda a : a.is_image_line, element_lines))
argument_lines_count = len(argument_lines)
dialog_lines_count = len(dialog_lines)
pure_text_lines_count = len(pure_text_lines)
big_marsey_lines_count = len(big_marsey_lines)
image_lines_count = len(image_lines)
image = None
if (dialog_lines_count == 2):
print(f"[{comment['id']}] SOY_VS_CHAD")
if (random_float <= SOY_VS_CHAD_TRIGGER_CHANGE):
#Soy vs Chad
line1 = dialog_lines[0]
line2 = dialog_lines[1]
emoji1 = line1.emojis[0].emoji
emoji2 = line2.emojis[0].emoji
caption1 = line1.text
caption2 = line2.text
image = meme_generator.create_soy_vs_chad_meme(emoji1, emoji2, caption1, caption2)
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 1):
print(f"[{comment['id']}] MODERN_MEME_WITH_MARSEY")
if (random_float <= MODERN_MEME_WITH_MARSEY_TRIGGER_CHANGE):
# Modern Meme with Marsey
text_line = pure_text_lines[0]
marsey_line = big_marsey_lines[0]
marsey = marsey_line.emojis[0].emoji
caption = text_line.text
image = meme_generator.create_modern_meme_from_emoji(marsey, caption)
elif (image_lines_count == 1 and pure_text_lines_count == 1):
print(f"[{comment['id']}] MODERN_MEME_WITH_IMAGE")
if (random_float <= MODERN_MEME_WITH_IMAGE_TRIGGER_CHANGE):
# Modern Meme with Image
text_line = pure_text_lines[0]
image_line = image_lines[0]
image = image_line.images[0]
full_image_url = image.url
caption = text_line.text
image = meme_generator.create_modern_meme_from_url(full_image_url, caption)
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 2):
print(f"[{comment['id']}] CLASSIC_MEME_WITH_MARSEY")
if (random_float <= CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE):
# Classic Meme with big marsey
top_text_line = pure_text_lines[0]
bottom_text_line = pure_text_lines[1]
marsey_line = big_marsey_lines[0]
emoji = marsey_line.emojis[0].emoji
top_caption = top_text_line.text
bottom_caption = bottom_text_line.text
image = meme_generator.create_classic_meme_from_emoji(emoji, top_caption, bottom_caption)
elif (image_lines_count == 1 and pure_text_lines_count == 2):
print(f"[{comment['id']}] CLASSIC_MEME_WITH_IMAGE")
if (random_float <= CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE):
# Classic Meme with Image
top_text_line = pure_text_lines[0]
bottom_text_line = pure_text_lines[1]
image_line = image_lines[0]
image = image_line.images[0]
full_image_url = image.url
top_caption = top_text_line.text
bottom_caption = bottom_text_line.text
image = meme_generator.create_classic_meme_from_url(full_image_url, top_caption, bottom_caption)
elif (argument_lines_count + dialog_lines_count >= 2):
print(f"[{comment['id']}] WEBCOMIC")
if (random_float <= WEBCOMIC_TRIGGER_CHANCE):
panels : 'list[WebcomicPanel]' = []
for element_line in element_lines:
if element_line.is_dialogue_line:
caption = element_line.text
emoji = element_line.emojis[0].emoji
if len(caption) > 100:
in_background = True
else:
in_background = False
oneCharacterWebcomicPanel = OneCharacterWebcomicPanel(emoji, caption, in_background)
panels.append(oneCharacterWebcomicPanel)
elif element_line.is_argument_line:
if len(element_line.captions) == 2:
left_caption = element_line.captions[0].text
right_caption = element_line.captions[1].text
elif (len(element_line.captions) == 1):
left_caption = element_line.captions[0].text
right_caption = ""
else:
left_caption = ""
right_caption = ""
left_emoji = element_line.emojis[0].emoji
right_emoji = element_line.emojis[1].emoji
twoCharacterWebcomicPanel = TwoCharacterWebcomicPanel(left_emoji, left_caption, right_emoji, right_caption)
panels.append(twoCharacterWebcomicPanel)
else:
panels.append(TitleCardWebcomicPanel(element_line.text))
image = create_webcomic(panels)
print(asizeof.asized(image, detail=1).format())
if image != None:
print(f"[{comment['id']}] posting...")
image = add_watermark(image, comment['author']['username'])
user_id = 0 if comment['author'] == '👻' else comment['author']['id']
parent_comment_id = comment['id']
post_id = comment['post_id']
message = create_comment_message(chud, pizza, bird, is_marsey, mention=mention)
if not DRY_MODE:
automeme_comment_id = comment_with_image(message, image, comment['id'], comment['post_id'])
else:
automeme_comment_id = None
image.save(get_real_filename(f"dry/{comment['id']}.webp"))
Comment.create_new_comment(parent_comment_id, automeme_comment_id, session)
if post_id not in FREE_POSTS:
Post.increment_replies(post_id, session)
User.increase_number_of_comments(user_id, session)
else:
print(f"[{comment['id']}] is a free post.")
end = datetime.now()
print(end-begin)
elif comment_contains_keyword(comment_text, DARRELL_KEYWORDS):
if random_float <= DARRELL_CHANCE:
message = choice(DARRELL_QUOTES)
message += "\n\n"
message += DARRELL_DISCLAIMER
post_id = comment['post_id']
user_id = 0 if comment['author'] == '👻' else comment['author']['id']
new_comment_id = rdrama.reply_to_comment_easy(comment['id'], post_id, message)['id']
Comment.create_new_comment(comment['id'], new_comment_id, session)
Post.increment_replies(post_id, session)
User.increase_number_of_comments(user_id, session)
elif comment_contains_keyword(comment_text, LLM_KEYWORDS):
message = "![](/images/1669576960061573.webp)"
post_id = comment['post_id']
user_id = 0 if comment['author'] == '👻' else comment['author']['id']
new_comment_id = rdrama.reply_to_comment_easy(comment['id'], post_id, message)['id']
Comment.create_new_comment(comment['id'], new_comment_id, session)
Post.increment_replies(post_id, session)
User.increase_number_of_comments(user_id, session)
else:
Comment.create_new_comment(comment['id'], None, session)
except BaseException as e:
print(f"YIKERINOS! GOT AN EXCEPTION: {e}")
traceback.print_exc()
def comment_contains_keyword(comment, keywords):
comment_words = comment.lower().split(" ")
return len(set.intersection(set(comment_words), set(keywords))) != 0
def main_processing_task(rdrama : RDramaAPIInterface, session : Session):
is_chudded = False #Do we have the chud award?
can_communicate = True #Can we send any message at all?
is_pizzad = False
is_marseyed = False
is_birdsite = False
rdrama.get_front_page()
automeme_information = rdrama.get_user_information(AUTOMEME_ID)
print(f"coins: {automeme_information['coins']} comments: {automeme_information['comment_count']}")
for badge in automeme_information['badges']:
if (badge['name'] == "Marsey Award"):
print("We have the marsey award. STOP.")
is_marseyed = True
can_communicate = True
if (badge['name'] == "Chud"):
print("We have the CHUD award. CONTINUE")
is_chudded = True
if (badge['name'] == "Bird Site Award"):
print("We have the Bird Site Award. STOP.")
can_communicate = False
is_birdsite = True
if (badge['name'] == "Pizzashill Award"):
print("We have the Pizzashill Award. CONTINUE.")
is_pizzad = True
if automeme_information['is_banned']:
print("We are banned. STOP.")
can_communicate = False
if not can_communicate:
print("I can't communicate. Why????")
if can_communicate:
eligible_comments = get_eligible_comments(rdrama, session)
for eligible_comment in eligible_comments:
handle_comment(eligible_comment, rdrama, session, is_chudded, is_pizzad, is_birdsite, is_marseyed)
def comment_chunk(time : datetime, api: RDramaAPIInterface):
current_time_cur = int(time.timestamp()) # int(time.time() - 60*60*8)
comments = []
pages = 0
while True:
pages+=1
res = api.get_comments(sort="old", lower_bound=current_time_cur)
if len(res['data']) == 0:
break
if pages > MAX_PAGES_TO_SCAN:
print("WARNING: Had to drop some pages. Can't make an omelette without cracking a few eggs")
break
comments = comments + res['data']
current_time_cur = res['data'][-1]['created_utc']
return comments
def get_rdrama():
global AUTOMEME_ID, OPERATOR_ID
if TEST_MODE:
website = "localhost"
auth = TEST_AUTH_TOKEN
https = False
timeout = 1
AUTOMEME_ID = 6
OPERATOR_ID = 9
else:
website = "rdrama.net"
with open(get_real_filename("rdrama_auth_token"), "r") as f:
auth = f.read()
https = True
timeout = 10
rdrama = RDramaAPIInterface(auth, website, https=https)
return rdrama
if __name__ == "__main__":
print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ")
rdrama = get_rdrama()
#Set up fail safe
def exitfunc():
print("*That's it, you're going in the retard squisher.*")
print("FAILSAFE FORCED SHUTDOWN", datetime.now())
os._exit(0)
timer = Timer(60*MINUTES_BEFORE_FORCED_SHUTDOWN, exitfunc)
timer.start()
engine = create_engine(f"sqlite:///{get_real_filename(DB_FILENAME)}")
Base.metadata.create_all(engine)
with Session(engine) as session:
main_processing_task(rdrama, session)
session.commit()
timer.cancel()