getting it running for the first time

master
HeyMoon 2022-08-28 17:38:40 -05:00
parent 9b3d96fb7e
commit e26e7056cd
3 changed files with 282 additions and 189 deletions

View File

@ -139,5 +139,24 @@ class CommentString(Base):
''' '''
Mostly for keeping track of how long it's been since the last run, so we can run things without worrying about set number of pages Mostly for keeping track of how long it's been since the last run, so we can run things without worrying about set number of pages
''' '''
class ScriptCalls(Base): class ScriptCall(Base):
pass __tablename__ = "script_call"
id = Column(Integer, primary_key = True)
time = Column(DateTime)
def register(session : Session):
sc = ScriptCall(time = datetime.now())
session.add(sc)
def get_time_of_last_run(session) -> datetime:
stmt = sqlalchemy.select(ScriptCall).order_by(ScriptCall.time.desc()).limit(1)
sc = session.execute(stmt).scalar_one_or_none()
if (sc == None):
return datetime.now()
else:
return sc.time

View File

@ -1,16 +1,15 @@
import base64
import io
import json import json
from random import random from random import choice, choices, random
import re import re
import traceback
from typing import Callable, TypeVar from typing import Callable, TypeVar
import meme_generator import meme_generator
from meme_generator import WebcomicPanel, OneCharacterWebcomicPanel, TwoCharacterWebcomicPanel, TitleCardWebcomicPanel, add_watermark, create_webcomic from meme_generator import WebcomicPanel, OneCharacterWebcomicPanel, TwoCharacterWebcomicPanel, TitleCardWebcomicPanel, add_watermark, create_webcomic
from RDramaAPIInterface import RDramaAPIInterface from RDramaAPIInterface import RDramaAPIInterface
from datetime import datetime from datetime import datetime, timedelta
from os.path import exists, join, realpath, split from os.path import exists, join, realpath, split
from threading import Timer from threading import Timer
from BotModels import Base, Comment, CommentString, Post, User from BotModels import Base, Comment, CommentString, Post, ScriptCall, User
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
import os import os
@ -18,25 +17,50 @@ from markdown import markdown
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from utils import get_real_filename from utils import get_real_filename
TEST_MODE = True ANTISPAM_MESSAGES = [
'i have a meme for u :marseyshy:',
'anon -- can you suck my penis please?',
'uwu i love memes',
'~ oh... dramatardchan -- i wish ud stop talking about trains',
'hey dramatardchan, its ok if u have autism',
'its okay silly ~ i know u are retarded',
'on rdrama someone told me to sneed, what does that mean?',
'what if no-one likes my shitposts :marseycry:',
'a ha ha ha, that\'s very funny dramatardchan',
'im going to a party ~ no you dont have to come ~ love u',
'oh dramatardchan, your such a special boy',
'omg hes talking about "trannys" again, im so wet down there',
'idk what that means but i still love u',
'i love u but pls dont send me penis pictures, its kinda gross >.<',
'aha thats very funny, thank u',
'hey dramatardchan... i was thinking... maybe u could take a shower... if u want',
'i love u becuz ur so sweet, even if you are kind of a pushover, dramatardchan',
'maybe u should delete ur account - not trying to be mean tho',
'look at this cute penis picture i found on twitter... no, its ok, u dont have to show me urs',
'FUCK YOU. I HAVE HAD IT UP TO HERE WITH YOUR FUCKING DRAMA BULLSHIT. FUCK. OFF.',
'idk what u mean by mommy milkers 😭 im not ur mommy pls stop saying that',
'goldstein sama is so cute ~ what a cute nose... urs is fine to tho'
]
TEST_MODE = False
DRY_MODE = False DRY_MODE = False
TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4" BETA_MODE = False
TEST_AUTH_TOKEN = "lawoSNzuNeBRJBld0boApOceCNSEqBhiRu0aoUWh9kTK7AV37NECZoAK-mEUJ1PM1SsTfTY4f3t_LjooMq4QgPLqPC-F7LNGHQ6_0RFacZmIvC2ixOHPKM821RroJexn"
MINUTES_BEFORE_FORCED_SHUTDOWN = 10 MINUTES_BEFORE_FORCED_SHUTDOWN = 10
DB_FILENAME = "automeme_database.db" DB_FILENAME = "automeme_database.db"
PAGES_TO_SCAN = 5 PAGES_TO_SCAN = 5
AUTOMEME_ID = 13427 AUTOMEME_ID = 13427
ALLOWED_COMMENTS_PER_POST = 20 ALLOWED_COMMENTS_PER_POST = 20
ALLOWED_COMMENTS_PER_USER_PER_DAY = 20 ALLOWED_COMMENTS_PER_USER_PER_DAY = 20
MESSAGE = "i have a meme for u :marseyshy: \nPROTIP: I reply randomly. Put !!meme in your message to make sure I reply"
SOY_VS_CHAD_TRIGGER_CHANGE = 1.0 SOY_VS_CHAD_TRIGGER_CHANGE = 1.0
MODERN_MEME_WITH_MARSEY_TRIGGER_CHANGE = 1.0 #0.1 MODERN_MEME_WITH_MARSEY_TRIGGER_CHANGE = 0.01
MODERN_MEME_WITH_IMAGE_TRIGGER_CHANGE = 1.0 MODERN_MEME_WITH_IMAGE_TRIGGER_CHANGE = 0.05
CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE = 1.0 #0.5 CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE = 0.02
CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE = 1.0 CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE = 0.1
WEBCOMIC_TRIGGER_CHANCE = 1.0 WEBCOMIC_TRIGGER_CHANCE = 1.0
FREE_POSTS = [6] FREE_POSTS = [6, 97416, 98286]
RESTRICTED_POSTS = [5, 16583, 75878, 35835] RESTRICTED_POSTS = [5, 16583, 75878, 35835]
EMOJI_REGEX = r":[#!a-zA-Z0-9]*:" EMOJI_REGEX = r":[#!a-zA-Z0-9]*:"
@ -133,7 +157,7 @@ class Emoji(TextElement):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Emoji({self.emoji}, big={self.big})" return f"Emoji({self.emoji}, big={self.big})"
def get_text_only(text_elements : list[TextElement]) -> str: def get_text_only(text_elements : 'list[TextElement]') -> str:
text = [i.text for i in text_elements if isinstance(i, Text)] text = [i.text for i in text_elements if isinstance(i, Text)]
return " ".join(text) return " ".join(text)
@ -195,26 +219,46 @@ def remove_duplicates(list):
return [json.loads(j) for j in set([json.dumps(i) for i in list])] return [json.loads(j) for j in set([json.dumps(i) for i in list])]
def get_eligible_comments(rdrama : RDramaAPIInterface, session : Session): def get_eligible_comments(rdrama : RDramaAPIInterface, session : Session):
comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data'] last_time = ScriptCall.get_time_of_last_run(session)
comments = [comment for comment in comments if not comment['is_bot']] #No bots current_time = int(datetime.now().timestamp())
comments = [comment for comment in comments if comment['author'] == '👻' or not comment['author']['id'] == AUTOMEME_ID] #Don't reply to self
comments = [comment for comment in comments if Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST] #Don't spam posts begin = datetime.now()
comments = [comment for comment in comments if User.get_number_of_comments(0 if comment['author'] == '👻' else comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users comments = comment_chunk(last_time, rdrama) #rdrama.get_comments(number_of_pages=3, upper_bound=current_time, lower_bound=last_time, sort="old")['data']
comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] #Double check that we haven't replied to the comment end = datetime.now()
comments = remove_duplicates(comments) #Remove the duplicates print(end-begin)
#comments = [comment for comment in comments if comment['created_utc'] > last_time]
for comment in comments: for comment in comments:
user_id = 0 if comment['author'] == '👻' else comment['author']['id'] user_id = 0 if comment['author'] == '👻' else comment['author']['id']
comment_id = comment['id'] comment_id = comment['id']
text = comment['body'] text = comment['body']
time = datetime.fromtimestamp(comment['created_utc']) time = datetime.fromtimestamp(comment['created_utc'])
CommentString.add_comment(comment_id, user_id, time, text, session) CommentString.add_comment(comment_id, user_id, time, text, session)
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if not comment['is_bot']] #No bots
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if comment['author'] == '👻' or not comment['author']['id'] == AUTOMEME_ID] #Don't reply to self
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if Post.get_number_of_replies(comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST] #Don't spam posts
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if User.get_number_of_comments(0 if comment['author'] == '👻' else comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY] #Don't spam users
print([comment['id'] for comment in comments])
comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None] #Double check that we haven't replied to the comment
if BETA_MODE:
comments = [comment for comment in comments if comment['post_id'] in FREE_POSTS]
print([comment['id'] for comment in comments])
comments = remove_duplicates(comments) #Remove the duplicates
print([comment['id'] for comment in comments])
return comments return comments
T = TypeVar('T') T = TypeVar('T')
def lambda_count(list : list[T], predicate : 'Callable[[T], bool]' ): def lambda_count(list : 'list[T]', predicate : 'Callable[[T], bool]' ):
return sum(1 for i in list if predicate(i)) return sum(1 for i in list if predicate(i))
def get_full_rdrama_image_url(partial_url) -> str: def get_full_rdrama_image_url(partial_url) -> str:
@ -232,8 +276,12 @@ def create_comment_message(chud: bool, pizza: bool, bird : bool, marsey : bool):
if (marsey): if (marsey):
return ":marseyshy:" return ":marseyshy:"
message = MESSAGE
message = choice(ANTISPAM_MESSAGES) +"\n"
message += f"<source src=\"{''.join(choices(['a','b','c','d','f','g','h','j','k','l','m','n','p','q','r','s','t','v','w','x','z'], k=500))}.mp3\">"
if (chud): if (chud):
message += "\ntrans lives matter" message += "\ntrans lives matter"
if (pizza): if (pizza):
@ -269,173 +317,192 @@ def main_processing_task(rdrama : RDramaAPIInterface, session : Session):
if automeme_information['is_banned']: if automeme_information['is_banned']:
print("We are banned. STOP.") print("We are banned. STOP.")
can_communicate = False can_communicate = False
if not can_communicate:
print("I can't communicate. Why????")
if can_communicate: if can_communicate:
eligible_comments = get_eligible_comments(rdrama, session) eligible_comments = get_eligible_comments(rdrama, session)
for eligible_comment in eligible_comments: for eligible_comment in eligible_comments:
begin = datetime.now() try:
under_post_limit = Post.get_number_of_replies(eligible_comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST begin = datetime.now()
under_user_limit = User.get_number_of_comments(0 if eligible_comment['author'] == '👻' else eligible_comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY under_post_limit = Post.get_number_of_replies(eligible_comment['post_id'], session) < ALLOWED_COMMENTS_PER_POST
has_not_replied_to_comment = Comment.get_comment(eligible_comment['id'], session) is None under_user_limit = User.get_number_of_comments(0 if eligible_comment['author'] == '👻' else eligible_comment['author']['id'], session) < ALLOWED_COMMENTS_PER_USER_PER_DAY
is_not_restricted_post = eligible_comment['post_id'] not in RESTRICTED_POSTS has_not_replied_to_comment = Comment.get_comment(eligible_comment['id'], session) is None
if (not (under_post_limit and under_user_limit and has_not_replied_to_comment and is_not_restricted_post)): is_not_restricted_post = eligible_comment['post_id'] not in RESTRICTED_POSTS
continue if (not (under_post_limit and under_user_limit and has_not_replied_to_comment and is_not_restricted_post)):
continue
comment_text = eligible_comment['body'] comment_text = eligible_comment['body']
directives = extract_directives(comment_text) directives = extract_directives(comment_text)
cleaned_comment_text = strip_markdown(comment_text) cleaned_comment_text = strip_markdown(comment_text)
if ("meme" in directives): if ("meme" in directives or eligible_comment['post_id'] in FREE_POSTS):
random_float = 0.0 random_float = 0.0
else:
random_float = random()
comment_lines = cleaned_comment_text.split("\n")
comment_lines = [comment_line for comment_line in comment_lines if comment_line != ""]
element_lines = [TextLine(line) for line in comment_lines]
argument_lines_count, dialog_lines_count, text_lines_count, big_marsey_lines_count = 0,0,0,0
dialog_lines = list(filter(lambda a : a.is_dialogue_line, element_lines))
argument_lines = list(filter(lambda a : a.is_argument_line, element_lines))
pure_text_lines = list(filter(lambda a : a.is_pure_text_line, element_lines))
big_marsey_lines = list(filter(lambda a : a.is_big_marsey_line, element_lines))
image_lines = list(filter(lambda a : a.is_image_line, element_lines))
argument_lines_count = len(argument_lines)
dialog_lines_count = len(dialog_lines)
pure_text_lines_count = len(pure_text_lines)
big_marsey_lines_count = len(big_marsey_lines)
image_lines_count = len(image_lines)
image = None
if (dialog_lines_count == 2):
print(f"[{eligible_comment['id']}] SOY_VS_CHAD")
if (random_float <= SOY_VS_CHAD_TRIGGER_CHANGE):
#Soy vs Chad
line1 = dialog_lines[0]
line2 = dialog_lines[1]
emoji1 = line1.emojis[0].emoji
emoji2 = line2.emojis[0].emoji
caption1 = line1.text
caption2 = line2.text
image = meme_generator.create_soy_vs_chad_meme(emoji1, emoji2, caption1, caption2)
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 1):
print(f"[{eligible_comment['id']}] MODERN_MEME_WITH_MARSEY")
if (random_float <= MODERN_MEME_WITH_MARSEY_TRIGGER_CHANGE):
# Modern Meme with Marsey
text_line = pure_text_lines[0]
marsey_line = big_marsey_lines[0]
marsey = marsey_line.emojis[0].emoji
caption = text_line.text
image = meme_generator.create_modern_meme_from_emoji(marsey, caption)
elif (image_lines_count == 1 and pure_text_lines_count == 1):
print(f"[{eligible_comment['id']}] MODERN_MEME_WITH_IMAGE")
if (random_float <= MODERN_MEME_WITH_IMAGE_TRIGGER_CHANGE):
# Modern Meme with Image
text_line = pure_text_lines[0]
image_line = image_lines[0]
image = image_line.images[0]
full_image_url = image.url
caption = text_line.text
image = meme_generator.create_modern_meme_from_url(full_image_url, caption)
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 2):
print(f"[{eligible_comment['id']}] CLASSIC_MEME_WITH_MARSEY")
if (random_float <= CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE):
# Classic Meme with big marsey
top_text_line = pure_text_lines[0]
bottom_text_line = pure_text_lines[1]
marsey_line = big_marsey_lines[0]
emoji = marsey_line.emojis[0].emoji
top_caption = top_text_line.text
bottom_caption = bottom_text_line.text
image = meme_generator.create_classic_meme_from_emoji(emoji, top_caption, bottom_caption)
elif (image_lines_count == 1 and pure_text_lines_count == 2):
print(f"[{eligible_comment['id']}] CLASSIC_MEME_WITH_IMAGE")
if (random_float <= CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE):
# Classic Meme with Image
top_text_line = pure_text_lines[0]
bottom_text_line = pure_text_lines[1]
image_line = image_lines[0]
image = image_line.images[0]
full_image_url = image.url
top_caption = top_text_line.text
bottom_caption = bottom_text_line.text
image = meme_generator.create_classic_meme_from_url(full_image_url, top_caption, bottom_caption)
elif (argument_lines_count + dialog_lines_count >= 2):
print(f"[{eligible_comment['id']}] WEBCOMIC")
if (random_float <= WEBCOMIC_TRIGGER_CHANCE):
panels : 'list[WebcomicPanel]' = []
for element_line in element_lines:
if element_line.is_dialogue_line:
caption = element_line.text
emoji = element_line.emojis[0].emoji
if len(caption) > 100:
in_background = True
else:
in_background = False
oneCharacterWebcomicPanel = OneCharacterWebcomicPanel(emoji, caption, in_background)
panels.append(oneCharacterWebcomicPanel)
elif element_line.is_argument_line:
left_caption = element_line.captions[0].text
if len(element_line.captions) == 2:
right_caption = element_line.captions[1].text
else:
right_caption = ""
left_emoji = element_line.emojis[0].emoji
right_emoji = element_line.emojis[1].emoji
twoCharacterWebcomicPanel = TwoCharacterWebcomicPanel(left_emoji, left_caption, right_emoji, right_caption)
panels.append(twoCharacterWebcomicPanel)
elif element_line.is_pure_text_line:
panels.append(TitleCardWebcomicPanel(element_line.text))
image = create_webcomic(panels)
if image != None:
print(f"[{eligible_comment['id']}] posting...")
image = add_watermark(image, eligible_comment['author']['username'])
user_id = 0 if eligible_comment['author'] == '👻' else eligible_comment['author']['id']
parent_comment_id = eligible_comment['id']
post_id = eligible_comment['post_id']
message = create_comment_message(is_chudded, is_pizzad, is_birdsite, is_marseyed)
if not DRY_MODE:
automeme_comment_id = comment_with_image(message, image, eligible_comment['id'], eligible_comment['post_id'])
else: else:
automeme_comment_id = None random_float = random()
image.save(f"dry/{eligible_comment['id']}.webp")
Comment.create_new_comment(parent_comment_id, automeme_comment_id, session) comment_lines = cleaned_comment_text.split("\n")
if post_id not in FREE_POSTS: comment_lines = [comment_line for comment_line in comment_lines if comment_line != ""]
Post.increment_replies(post_id, session) element_lines = [TextLine(line) for line in comment_lines]
User.increase_number_of_comments(user_id, session)
argument_lines_count, dialog_lines_count, text_lines_count, big_marsey_lines_count = 0,0,0,0
dialog_lines = list(filter(lambda a : a.is_dialogue_line, element_lines))
argument_lines = list(filter(lambda a : a.is_argument_line, element_lines))
pure_text_lines = list(filter(lambda a : a.is_pure_text_line, element_lines))
big_marsey_lines = list(filter(lambda a : a.is_big_marsey_line, element_lines))
image_lines = list(filter(lambda a : a.is_image_line, element_lines))
argument_lines_count = len(argument_lines)
dialog_lines_count = len(dialog_lines)
pure_text_lines_count = len(pure_text_lines)
big_marsey_lines_count = len(big_marsey_lines)
image_lines_count = len(image_lines)
image = None
if (dialog_lines_count == 2):
print(f"[{eligible_comment['id']}] SOY_VS_CHAD")
if (random_float <= SOY_VS_CHAD_TRIGGER_CHANGE):
#Soy vs Chad
line1 = dialog_lines[0]
line2 = dialog_lines[1]
emoji1 = line1.emojis[0].emoji
emoji2 = line2.emojis[0].emoji
caption1 = line1.text
caption2 = line2.text
image = meme_generator.create_soy_vs_chad_meme(emoji1, emoji2, caption1, caption2)
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 1):
print(f"[{eligible_comment['id']}] MODERN_MEME_WITH_MARSEY")
if (random_float <= MODERN_MEME_WITH_MARSEY_TRIGGER_CHANGE):
# Modern Meme with Marsey
text_line = pure_text_lines[0]
marsey_line = big_marsey_lines[0]
marsey = marsey_line.emojis[0].emoji
caption = text_line.text
image = meme_generator.create_modern_meme_from_emoji(marsey, caption)
elif (image_lines_count == 1 and pure_text_lines_count == 1):
print(f"[{eligible_comment['id']}] MODERN_MEME_WITH_IMAGE")
if (random_float <= MODERN_MEME_WITH_IMAGE_TRIGGER_CHANGE):
# Modern Meme with Image
text_line = pure_text_lines[0]
image_line = image_lines[0]
image = image_line.images[0]
full_image_url = image.url
caption = text_line.text
image = meme_generator.create_modern_meme_from_url(full_image_url, caption)
elif (big_marsey_lines_count == 1 and pure_text_lines_count == 2):
print(f"[{eligible_comment['id']}] CLASSIC_MEME_WITH_MARSEY")
if (random_float <= CLASSIC_MEME_WITH_MARSEY_TRIGGER_CHANGE):
# Classic Meme with big marsey
top_text_line = pure_text_lines[0]
bottom_text_line = pure_text_lines[1]
marsey_line = big_marsey_lines[0]
emoji = marsey_line.emojis[0].emoji
top_caption = top_text_line.text
bottom_caption = bottom_text_line.text
image = meme_generator.create_classic_meme_from_emoji(emoji, top_caption, bottom_caption)
elif (image_lines_count == 1 and pure_text_lines_count == 2):
print(f"[{eligible_comment['id']}] CLASSIC_MEME_WITH_IMAGE")
if (random_float <= CLASSIC_MEME_WITH_IMAGE_TRIGGER_CHANGE):
# Classic Meme with Image
top_text_line = pure_text_lines[0]
bottom_text_line = pure_text_lines[1]
image_line = image_lines[0]
image = image_line.images[0]
full_image_url = image.url
top_caption = top_text_line.text
bottom_caption = bottom_text_line.text
image = meme_generator.create_classic_meme_from_url(full_image_url, top_caption, bottom_caption)
elif (argument_lines_count + dialog_lines_count >= 2):
print(f"[{eligible_comment['id']}] WEBCOMIC")
if (random_float <= WEBCOMIC_TRIGGER_CHANCE):
panels : 'list[WebcomicPanel]' = []
for element_line in element_lines:
if element_line.is_dialogue_line:
caption = element_line.text
emoji = element_line.emojis[0].emoji
if len(caption) > 100:
in_background = True
else:
in_background = False
oneCharacterWebcomicPanel = OneCharacterWebcomicPanel(emoji, caption, in_background)
panels.append(oneCharacterWebcomicPanel)
elif element_line.is_argument_line:
left_caption = element_line.captions[0].text
if len(element_line.captions) == 2:
right_caption = element_line.captions[1].text
else:
right_caption = ""
left_emoji = element_line.emojis[0].emoji
right_emoji = element_line.emojis[1].emoji
twoCharacterWebcomicPanel = TwoCharacterWebcomicPanel(left_emoji, left_caption, right_emoji, right_caption)
panels.append(twoCharacterWebcomicPanel)
elif element_line.is_pure_text_line:
panels.append(TitleCardWebcomicPanel(element_line.text))
image = create_webcomic(panels)
if image != None:
print(f"[{eligible_comment['id']}] posting...")
image = add_watermark(image, eligible_comment['author']['username'])
user_id = 0 if eligible_comment['author'] == '👻' else eligible_comment['author']['id']
parent_comment_id = eligible_comment['id']
post_id = eligible_comment['post_id']
message = create_comment_message(is_chudded, is_pizzad, is_birdsite, is_marseyed)
if not DRY_MODE:
automeme_comment_id = comment_with_image(message, image, eligible_comment['id'], eligible_comment['post_id'])
else:
automeme_comment_id = None
image.save(get_real_filename(f"dry/{eligible_comment['id']}.webp"))
Comment.create_new_comment(parent_comment_id, automeme_comment_id, session)
if post_id not in FREE_POSTS:
Post.increment_replies(post_id, session)
User.increase_number_of_comments(user_id, session)
else:
print(f"[{eligible_comment['id']}] is a free post.")
end = datetime.now()
print(end-begin)
else: else:
print(f"[{eligible_comment['id']}] is a free post.") Comment.create_new_comment(eligible_comment['id'], None, session)
end = datetime.now() except BaseException as e:
print(end-begin) print(f"YIKERINOS! GOT AN EXCEPTION: {e}")
else: traceback.print_exc()
Comment.create_new_comment(eligible_comment['id'], None, session)
ScriptCall.register(session)
def comment_chunk(time : datetime, api: RDramaAPIInterface):
current_time_cur = int(time.timestamp()) # int(time.time() - 60*60*8)
comments = []
while True:
res = api.get_comments(sort="old", lower_bound=current_time_cur)
if len(res['data']) == 0:
break
comments = comments + res['data']
current_time_cur = res['data'][-1]['created_utc']
return comments
if __name__ == "__main__": if __name__ == "__main__":
TEST_AUTH_TOKEN = "ED3eURMKP9FKBFbi-JUxo8MPGWkEihuyIlAUGtVL7xwx0NEy4Nf6J_mxWYTPgAQx1iy1X91hx7PPHyEBS79hvKVIy5DMEzOyAe9PAc5pmqSJlLGq_-ROewMwFzGrqer4"
print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ") print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ")
if TEST_MODE: if TEST_MODE:
website = "localhost" website = "localhost"
auth = TEST_AUTH_TOKEN auth = TEST_AUTH_TOKEN
https = False https = False
timeout = 1 timeout = 1
AUTOMEME_ID = 7 AUTOMEME_ID = 6
OPERATOR_ID = 9 OPERATOR_ID = 9
ACTUALLY_CALL_OPEN_AI = False ACTUALLY_CALL_OPEN_AI = False
else: else:
@ -460,4 +527,4 @@ if __name__ == "__main__":
with Session(engine) as session: with Session(engine) as session:
main_processing_task(rdrama, session) main_processing_task(rdrama, session)
session.commit() session.commit()
timer.cancel() timer.cancel()

View File

@ -1,5 +1,4 @@
import math import math
from tkinter import UNITS
from typing import Tuple, Union from typing import Tuple, Union
from PIL import Image from PIL import Image
from PIL import ImageDraw, ImageSequence from PIL import ImageDraw, ImageSequence
@ -118,7 +117,7 @@ def create_modern_meme_from_emoji(emoji: str, caption: str):
class WebcomicPanel(): class WebcomicPanel():
PANEL_SIZE = 400 PANEL_SIZE = 400
FONT = "Impact.ttf" FONT = "impact.ttf"
COLOR = ColorScheme.WHITE_WITH_BLACK_BORDER COLOR = ColorScheme.WHITE_WITH_BLACK_BORDER
def __init__(self): def __init__(self):
@ -127,7 +126,7 @@ class WebcomicPanel():
def create_image(self) -> Image: def create_image(self) -> Image:
return AnimatedImage.new((self.PANEL_SIZE,self.PANEL_SIZE)) return AnimatedImage.new((self.PANEL_SIZE,self.PANEL_SIZE))
def add_text_box(self, base : Image, caption : str, region_size : tuple[int, int], coordinates : tuple[int, int], align="") -> 'AnimatedImage': def add_text_box(self, base : Image, caption : str, region_size : 'tuple[int, int]', coordinates : 'tuple[int, int]', align="") -> 'AnimatedImage':
return add_text_box(base, caption, region_size, coordinates, align=align, font=self.FONT, color=self.COLOR, init_font_size=int(self.PANEL_SIZE/10)) return add_text_box(base, caption, region_size, coordinates, align=align, font=self.FONT, color=self.COLOR, init_font_size=int(self.PANEL_SIZE/10))
class OneCharacterWebcomicPanel(WebcomicPanel): class OneCharacterWebcomicPanel(WebcomicPanel):
@ -228,7 +227,9 @@ def create_webcomic(layout : 'list[WebcomicPanel]'):
image = image.paste(panel.create_image(), (x*assumed_panel_x_size, y*assumed_panel_y_size)) image = image.paste(panel.create_image(), (x*assumed_panel_x_size, y*assumed_panel_y_size))
return image return image
def add_text_box(base : Image, caption : str, region_size : tuple[int, int], coordinates : tuple[int, int], font : str= "arial.ttf", init_font_size = 45, align :str = "", color = ColorScheme.BLACK): def add_text_box(base : Image, caption : str, region_size : 'tuple[int, int]', coordinates : 'tuple[int, int]', font : str= "arial.ttf", init_font_size = 45, align :str = "", color = ColorScheme.BLACK):
font = get_real_filename(font)
if caption == "": if caption == "":
return base return base
if init_font_size == 0: if init_font_size == 0:
@ -244,6 +245,7 @@ def add_text_box(base : Image, caption : str, region_size : tuple[int, int], coo
fill_color = (255,255,255) fill_color = (255,255,255)
stroke = (0,0,0) stroke = (0,0,0)
stroke_size = 3 stroke_size = 3
caption = caption.upper()
line_image = ImageText((region_x_size+stroke_size, region_y_size)) line_image = ImageText((region_x_size+stroke_size, region_y_size))
place = "left" place = "left"
if "ch" in align: if "ch" in align:
@ -285,7 +287,9 @@ def add_text_box(base : Image, caption : str, region_size : tuple[int, int], coo
else: else:
return add_text_box(base, caption, region_size, coordinates, font=font, init_font_size=init_font_size-1, align=align, color=color) return add_text_box(base, caption, region_size, coordinates, font=font, init_font_size=init_font_size-1, align=align, color=color)
def add_text(base : Image, caption : str, region_size : tuple[int, int], coordinates : tuple[int, int], font : str= "arial.ttf"): def add_text(base : Image, caption : str, region_size : 'tuple[int, int]', coordinates : 'tuple[int, int]', font : str= "arial.ttf"):
font = get_real_filename(font)
print(font)
if caption == "": if caption == "":
return return
region_x_size, region_y_size = region_size region_x_size, region_y_size = region_size
@ -312,7 +316,7 @@ def add_watermark(image : Image, name_of_other_creator):
return base return base
def center_and_paste(base : Image, to_paste : Image, coordinates: tuple[int, int], box_size : tuple[int, int]) -> 'AnimatedImage': def center_and_paste(base : Image, to_paste : Image, coordinates: 'tuple[int, int]', box_size : 'tuple[int, int]') -> 'AnimatedImage':
to_paste = to_paste.fit(box_size) to_paste = to_paste.fit(box_size)
image_size_x, image_size_y = to_paste.size image_size_x, image_size_y = to_paste.size
@ -346,7 +350,7 @@ def get_emoji_from_rdrama(emoji_name) -> 'AnimatedImage':
should_flip = True should_flip = True
if (exists(get_real_filename(f"emoji_cache/{cleaned_emoji_name}.webp"))): if (exists(get_real_filename(f"emoji_cache/{cleaned_emoji_name}.webp"))):
image = AnimatedImage.from_image(Image.open(f"emoji_cache/{cleaned_emoji_name}.webp")) image = AnimatedImage.from_image(Image.open(get_real_filename(f"emoji_cache/{cleaned_emoji_name}.webp")))
else: else:
image = get_image_file_from_url(f"https://www.rdrama.net/e/{cleaned_emoji_name}.webp") image = get_image_file_from_url(f"https://www.rdrama.net/e/{cleaned_emoji_name}.webp")
@ -375,11 +379,14 @@ def parse_caption_file(filename):
watermark_captions = parse_caption_file(get_real_filename(CAPTION_FILENAME)) watermark_captions = parse_caption_file(get_real_filename(CAPTION_FILENAME))
class AnimatedImage(): class AnimatedImage():
def __init__(self, frames : list[Image.Image]): def __init__(self, frames : 'list[Image.Image]'):
self.frames = frames if len(frames) > 100:
self.frames = frames[0:100]
else:
self.frames = frames
@property @property
def size(self) -> Tuple[int, int]: def size(self) -> 'Tuple[int, int]':
return self.frames[0].size return self.frames[0].size
@property @property
@ -431,11 +438,11 @@ class AnimatedImage():
new_frames.append(new_frame) new_frames.append(new_frame)
return AnimatedImage(new_frames) return AnimatedImage(new_frames)
def new(size : Tuple[int, int], color=(255,255,255)) -> 'AnimatedImage': def new(size : 'Tuple[int, int]', color=(255,255,255)) -> 'AnimatedImage':
base = Image.new(mode="RGB", size=size, color=color) base = Image.new(mode="RGB", size=size, color=color)
return AnimatedImage.from_image(base) return AnimatedImage.from_image(base)
def fit(self, region : Tuple[int, int]) -> 'AnimatedImage': def fit(self, region : 'Tuple[int, int]') -> 'AnimatedImage':
new_frames = [] new_frames = []
for frame in self.frames: for frame in self.frames:
new_frames.append(ImageOps.contain(frame, region)) new_frames.append(ImageOps.contain(frame, region))