first
commit
884c67c6b5
|
@ -0,0 +1,9 @@
|
|||
*.db
|
||||
bbbb_reddit_client_id
|
||||
bbbb_reddit_secret
|
||||
bbbb_user_agent
|
||||
log
|
||||
open_ai_token
|
||||
output
|
||||
rdrama_auth_token
|
||||
__pycache__/
|
|
@ -0,0 +1,188 @@
|
|||
from datetime import datetime
|
||||
from email.policy import default
|
||||
import time
|
||||
import openai
|
||||
import sqlalchemy
|
||||
from sqlalchemy.orm import declarative_base, Session
|
||||
from sqlalchemy import Column, DateTime, String, ForeignKey, Integer, Boolean, Table, and_, or_
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "user"
|
||||
|
||||
id = Column(Integer, primary_key = True)
|
||||
number_of_comments = Column(Integer, default = 0)
|
||||
|
||||
def get_user(user_id : int, session : Session):
|
||||
stmt = sqlalchemy.select(User).where(User.id == user_id)
|
||||
user = session.execute(stmt).scalar_one_or_none()
|
||||
if (user == None):
|
||||
user = User(id = user_id, number_of_comments = 0)
|
||||
session.add(user)
|
||||
return user
|
||||
|
||||
def increase_number_of_comments(user_id : int, session : Session):
|
||||
User.get_user(user_id, session).number_of_comments+=1
|
||||
|
||||
def get_number_of_comments(user_id : int, session : Session) -> int:
|
||||
return User.get_user(user_id, session).number_of_comments
|
||||
|
||||
def reset_number_of_comments(user_id : int, session : Session):
|
||||
User.get_user(user_id, session).number_of_comments = 0
|
||||
|
||||
def reset_all_comments(session : Session):
|
||||
stmt = sqlalchemy.select(User)
|
||||
all_comments = session.execute(stmt).scalars().fetchall()
|
||||
|
||||
for comment in all_comments:
|
||||
comment.number_of_comments = 0
|
||||
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
|
||||
class Comment(Base):
|
||||
__tablename__ = "comment"
|
||||
|
||||
id = Column(Integer, primary_key = True)
|
||||
user_comment_id = Column(Integer)
|
||||
bbbb_comment_id = Column(Integer)
|
||||
conversation_depth = Column(Integer)
|
||||
comment_string = Column(String)
|
||||
|
||||
def get_past_comments(session : Session) -> 'list[str]':
|
||||
stmt = sqlalchemy.select(Comment)
|
||||
return [i.comment_string for i in session.execute(stmt).scalars().fetchall()[0:100]]
|
||||
|
||||
def get_user_comment(user_comment_id:int, session : Session):
|
||||
stmt = sqlalchemy.select(Comment).where(Comment.user_comment_id == user_comment_id)
|
||||
comments = session.execute(stmt).scalars().fetchall()
|
||||
|
||||
if len(comments) == 0:
|
||||
return None
|
||||
else:
|
||||
return comments[0]
|
||||
|
||||
def get_bbbb_comment(bbbb_comment_id:int, session : Session):
|
||||
stmt = sqlalchemy.select(Comment).where(Comment.bbbb_comment_id == bbbb_comment_id)
|
||||
comments = session.execute(stmt).scalars().fetchall()
|
||||
|
||||
if len(comments) == 0:
|
||||
return None
|
||||
else:
|
||||
return comments[0]
|
||||
|
||||
def get_comment(comment_id : int, session : Session):
|
||||
user_comment = Comment.get_user_comment(comment_id, session)
|
||||
if (user_comment is not None):
|
||||
return user_comment
|
||||
else:
|
||||
return Comment.get_bbbb_comment(comment_id, session)
|
||||
|
||||
def has_replied_to_comment(comment_id : int, session : Session):
|
||||
return Comment.get_comment(comment_id, session) == None
|
||||
|
||||
def get_conversation_depth(parent_comment_id : int, session : Session):
|
||||
looked_up_comment = Comment.get_comment(parent_comment_id, session)
|
||||
if (looked_up_comment is not None):
|
||||
return looked_up_comment.conversation_depth
|
||||
else:
|
||||
return 0
|
||||
|
||||
def create_new_comment(user_comment_id : int, bbbb_comment_id : int, conversation_depth : int, comment_string : str, session : Session):
|
||||
comment = Comment(user_comment_id = user_comment_id, bbbb_comment_id = bbbb_comment_id, conversation_depth = conversation_depth, comment_string = comment_string)
|
||||
session.add(comment)
|
||||
|
||||
class Post(Base):
|
||||
__tablename__ = "post"
|
||||
|
||||
id = Column(Integer, primary_key = True)
|
||||
has_replied = Column(Boolean, default = False)
|
||||
replies_to_post = Column(Integer, default = 0)
|
||||
|
||||
def get_post(post_id : int, session : Session):
|
||||
stmt = sqlalchemy.select(Post).where(Post.id == post_id)
|
||||
post = session.execute(stmt).scalar_one_or_none()
|
||||
if (post == None):
|
||||
post = Post(id = post_id)
|
||||
session.add(post)
|
||||
return post
|
||||
|
||||
def has_replied_to_post(post_id : int, session : Session):
|
||||
return Post.get_post(post_id, session).has_replied
|
||||
|
||||
def increment_replies(post_id : int, session : Session):
|
||||
Post.get_post(post_id, session).replies_to_post += 1
|
||||
|
||||
def get_number_of_replies(post_id : int, session : Session):
|
||||
replies = Post.get_post(post_id, session).replies_to_post
|
||||
if replies == None:
|
||||
replies = 0
|
||||
|
||||
return replies
|
||||
|
||||
def register_post_reply(post_id : int, session : Session):
|
||||
Post.get_post(post_id, session).has_replied = True
|
||||
|
||||
|
||||
class OpenAIToken(Base):
|
||||
__tablename__ = "openaikey"
|
||||
|
||||
id = Column(Integer, primary_key = True)
|
||||
token = Column(String)
|
||||
is_active = Column(Boolean,default=False)
|
||||
is_expended = Column(Boolean,default=False)
|
||||
number_of_requests = Column(Integer, default=0)
|
||||
registered_time = Column(DateTime)
|
||||
begin_time = Column(DateTime)
|
||||
end_time = Column(DateTime)
|
||||
|
||||
def add_token(token : str, session : Session):
|
||||
openAIToken = OpenAIToken(token = token,
|
||||
is_active = False,
|
||||
is_expended = False,
|
||||
registered_time = datetime.now())
|
||||
session.add(openAIToken)
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
def get_all_valid_tokens(session : Session) -> 'list[OpenAIToken]':
|
||||
stmt = sqlalchemy.select(OpenAIToken).where(OpenAIToken.is_expended == False)
|
||||
open_ai_tokens = session.execute(stmt).scalars().fetchall()
|
||||
return open_ai_tokens
|
||||
|
||||
def get_active_token(session : Session) -> 'OpenAIToken':
|
||||
stmt = sqlalchemy.select(OpenAIToken).where(OpenAIToken.is_active == True)
|
||||
open_ai_token = session.execute(stmt).scalar_one_or_none()
|
||||
if (open_ai_token is None):
|
||||
print("Aw shucks, it's None")
|
||||
valid_tokens = OpenAIToken.get_all_valid_tokens(session)
|
||||
if len(valid_tokens) == 0:
|
||||
raise BaseException("WE ARE OUT OF TOKENS!!!!")
|
||||
else:
|
||||
print("Activating token")
|
||||
new_active_token = valid_tokens[0]
|
||||
new_active_token.is_active = True
|
||||
new_active_token.begin_time = datetime.now()
|
||||
return new_active_token
|
||||
else:
|
||||
print("Returning token")
|
||||
return open_ai_token
|
||||
|
||||
def call_open_ai(prompt : str, session : Session) -> 'str':
|
||||
openAIToken = OpenAIToken.get_active_token(session)
|
||||
if (openAIToken.number_of_requests == None):
|
||||
openAIToken.number_of_requests = 1
|
||||
else:
|
||||
openAIToken.number_of_requests+=1
|
||||
print(f"Calling OPENAI. Token has been used {openAIToken.number_of_requests} times, beginning at {openAIToken.begin_time}.")
|
||||
openai.api_key = openAIToken.token
|
||||
try:
|
||||
return openai.Completion.create(model="text-davinci-002", prompt=prompt, temperature=0.9, max_tokens=256)
|
||||
except openai.error.RateLimitError:
|
||||
openAIToken.is_active = False
|
||||
openAIToken.is_expended = True
|
||||
openAIToken.end_time = datetime.now()
|
||||
print(f"Expended this token! Token lasted {openAIToken.end_time - openAIToken.begin_time}")
|
||||
return OpenAIToken.call_open_ai(prompt, session)
|
|
@ -0,0 +1,517 @@
|
|||
from difflib import SequenceMatcher
|
||||
import json
|
||||
from lib2to3.pytree import Base
|
||||
from numpy import roll
|
||||
import traceback
|
||||
import openai
|
||||
from RDramaAPIInterface import RDramaAPIInterface
|
||||
import pprint
|
||||
from os.path import exists, join, realpath, split
|
||||
import random
|
||||
from markdown import markdown
|
||||
import re
|
||||
from bs4 import BeautifulSoup
|
||||
from better_profanity import profanity
|
||||
import time
|
||||
from datetime import datetime
|
||||
from BBBB_Models import Base, Comment, OpenAIToken, Post, User
|
||||
from sqlalchemy.orm import Session
|
||||
from threading import Timer
|
||||
import os
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
TEST_MODE = False
|
||||
TIME_NORMALIZATION_MODE = True
|
||||
MAX_INPUT_TOKENS = 512
|
||||
random.seed(time.time())
|
||||
pp = pprint.PrettyPrinter()
|
||||
NAUGHTY_WORDS = [
|
||||
"faggot","fag","nigger","kike","spic","gook"]
|
||||
SNARKY_MARSIES = [
|
||||
'marseysneed',
|
||||
'marseyseethe',
|
||||
'marseyeyeroll',
|
||||
'marseycope',
|
||||
'marseyl',
|
||||
'marseybrainlet',
|
||||
'marseymalding',
|
||||
'soyquack',
|
||||
'soymad',
|
||||
'soycry',
|
||||
'seethejak',
|
||||
]
|
||||
SM_THRESHOLD = 0.6
|
||||
BBBB_ID = 12125
|
||||
BBBB_USERNAME = "bbbb"
|
||||
OPERATOR_ID = 3635
|
||||
PAGES_TO_SCAN = 5
|
||||
COMMENT_MEMORY = 100
|
||||
ALLOWED_COMMENTS_PER_USER_PER_DAY = 10
|
||||
ALLOWED_COMMENTS_PER_POST = 20
|
||||
ALLOWED_CONVERSATION_DEPTH = 3
|
||||
MINUTES_BEFORE_FORCED_SHUTDOWN = 10
|
||||
ACTUALLY_CALL_OPEN_AI = True
|
||||
MIDNIGHT_PROBABILITY_MULTIPLIER = 4
|
||||
AFTERNOON_MORNING_MULTIPLIER = 3
|
||||
NOON_MULTIPLER = 2
|
||||
profanity.load_censor_words(NAUGHTY_WORDS)
|
||||
|
||||
def strip_markdown(markdown_string):
|
||||
markdown_string = re.sub(">.*\n", "", markdown_string)
|
||||
try:
|
||||
html = markdown(markdown_string)
|
||||
except AttributeError:
|
||||
html = markdown_string #if there is no markdown in the string you get an error
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
text = ''.join(soup.findAll(text=True))
|
||||
text = re.sub(r":[^ ]*:", "", text) #remove marseys
|
||||
text = re.sub(r"!blackjack[^ ]*", "", text)
|
||||
text = re.sub(r"fortune", "", text)
|
||||
text = re.sub(r"factcheck", "", text)
|
||||
text = re.sub(r"!slots", "", text)
|
||||
text = re.sub(r"([^\.])\n", r"\1. ", text)
|
||||
text = re.sub(r"(\.)\n", ". ", text)
|
||||
text = re.sub(r"\n", "", text)
|
||||
text = re.sub(r"http://[^ ]*", "", text)
|
||||
text = re.sub(r"https://[^ ]*", "", text)
|
||||
text = re.sub(r"\"", "'", text)
|
||||
|
||||
text = remove_naughty_words(text)
|
||||
# make sure there are only letters in the string.
|
||||
if len(set(list(text.lower())).intersection(set(["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']))) == 0:
|
||||
text = ""
|
||||
|
||||
text = re.sub("@?bbbb is", "you are", text, flags=re.IGNORECASE)
|
||||
text = re.sub("@?bbbb's", "your", text, flags=re.IGNORECASE)
|
||||
text = re.sub("@?bbbb", "you", text, flags=re.IGNORECASE)
|
||||
|
||||
text = re.sub("@(.*?)\s", "", text)
|
||||
text = re.sub("!slots.*?\s", "", text)
|
||||
|
||||
text = re.sub("(?i)trans lives matter", "", text)
|
||||
|
||||
return text
|
||||
|
||||
def normalize_for_quick_substring_detection(string):
|
||||
string = string.lower()
|
||||
string = re.sub(r"[!?>-]", ".", string)
|
||||
string = ''.join([letter for letter in string if letter in ["a",'b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z', '.']])
|
||||
return string
|
||||
|
||||
def quick_substring_detection(x, y):
|
||||
x_normalize = normalize_for_quick_substring_detection(x)
|
||||
y_normalize = normalize_for_quick_substring_detection(y)
|
||||
if (x_normalize in y_normalize):
|
||||
print(f"[QSD] entire string x \"({x_normalize})\" is in y \"({y_normalize})\"")
|
||||
return True
|
||||
for sentence in x_normalize.split("."):
|
||||
if len(sentence) < 20:
|
||||
continue
|
||||
if sentence == "":
|
||||
continue
|
||||
if sentence in y_normalize:
|
||||
print(f"[QSD] sentence \"{sentence}\" is in y (\"{y_normalize}\")")
|
||||
return True
|
||||
return False
|
||||
|
||||
def remove_duplicates(list):
|
||||
return [json.loads(j) for j in set([json.dumps(i) for i in list])]
|
||||
|
||||
def remove_naughty_words(string):
|
||||
return profanity.censor(string)
|
||||
|
||||
def get_real_filename(filename : str):
|
||||
path_to_script = realpath(__file__)
|
||||
path_to_script_directory, _ = split(path_to_script)
|
||||
return join(path_to_script_directory, filename)
|
||||
|
||||
with open(get_real_filename("open_ai_token"), "r") as f:
|
||||
openai.api_key = f.read()
|
||||
|
||||
def rude_reply(comment, session):
|
||||
if ACTUALLY_CALL_OPEN_AI:
|
||||
response = OpenAIToken.call_open_ai(f"Write an abrasive reply to this comment: \"{comment}\"", session)
|
||||
else:
|
||||
print("NOT Calling OPENAI...")
|
||||
message = "foo"*random.randrange(1,20)
|
||||
response = {
|
||||
'choices': [{
|
||||
'text': f"\n\n{message}"
|
||||
}]
|
||||
}
|
||||
return response
|
||||
|
||||
def rude_reply_wrapper(comment, past_comments, session):
|
||||
print(f"[RRW] Getting reply for \"{comment}\"")
|
||||
if (len(comment.split(" ")) > MAX_INPUT_TOKENS):
|
||||
print("[RRW]")
|
||||
new_comment = ""
|
||||
for sentence in comment.split("."):
|
||||
if len(sentence) + len(new_comment) < MAX_INPUT_TOKENS:
|
||||
new_comment+=sentence
|
||||
else:
|
||||
break
|
||||
print(f"[RRW] Holy fuck that's a lot of words. New comment is \"{new_comment}\"")
|
||||
comment = new_comment
|
||||
valid_replies = []
|
||||
short_reply = ""
|
||||
for i in range(5):
|
||||
reply = rude_reply(comment, session)['choices'][0]['text']
|
||||
reply = reply.strip()
|
||||
print(f"[RRW] Trying \"{reply}\"")
|
||||
if reply[0] == "\"" and reply[-1] == "\"":
|
||||
print("[RRW] (Removing the outer parens)]")
|
||||
reply = reply[1:-1]
|
||||
original_reply = reply
|
||||
sm_ratio = SequenceMatcher(None, original_reply.lower(), comment.lower()).ratio()
|
||||
if sm_ratio > SM_THRESHOLD:
|
||||
print(f"[RRW] SM says too similar. ratio = {sm_ratio}")
|
||||
reply = ""
|
||||
if (quick_substring_detection(original_reply, comment)):
|
||||
print(f"[RRW] QSD says too similar.")
|
||||
reply = ""
|
||||
elif len(reply.split(" ")) < 5:
|
||||
print(f"[RRW] Too short")
|
||||
short_reply = reply
|
||||
#valid_replies.append(reply)
|
||||
continue
|
||||
else:
|
||||
print("[RRW] Under the original system, we would accept this answer.")
|
||||
valid_replies.append(reply)
|
||||
|
||||
if (len(valid_replies) ==0 ):
|
||||
print("[RRW] No valid replies.")
|
||||
if (short_reply != ""):
|
||||
return short_reply
|
||||
return ""
|
||||
|
||||
scored_replies = []
|
||||
for reply in valid_replies:
|
||||
high_score = 0
|
||||
high_scoring_string = ""
|
||||
for past_comment in past_comments:
|
||||
sm_ratio = SequenceMatcher(None, reply.lower(), past_comment.lower()).ratio()
|
||||
if sm_ratio > high_score:
|
||||
high_score = sm_ratio
|
||||
high_scoring_string = past_comment
|
||||
scored_replies.append((reply, high_score))
|
||||
|
||||
scored_replies.sort(key=lambda a : a[1])
|
||||
print(f"[RRW] returning {scored_replies[0]}")
|
||||
return scored_replies[0][0]
|
||||
|
||||
def get_conversation_depth(comment_id : int, rdrama : RDramaAPIInterface) -> int:
|
||||
depth = 0
|
||||
was_last_bbbb = True
|
||||
last_comment_id = comment_id
|
||||
while True:
|
||||
comment = rdrama.get_comment(last_comment_id)
|
||||
|
||||
if comment['level'] == 1:
|
||||
return depth
|
||||
|
||||
if comment['author_name'] == BBBB_USERNAME:
|
||||
was_last_bbbb = True
|
||||
last_comment_id = comment['parent_comment_id']
|
||||
depth += 1
|
||||
elif not was_last_bbbb:
|
||||
return depth
|
||||
else:
|
||||
was_last_bbbb = False
|
||||
last_comment_id = comment['parent_comment_id']
|
||||
|
||||
def parse_tracking_file(filename, func):
|
||||
if not exists(filename):
|
||||
return []
|
||||
to_return = []
|
||||
with open(filename, "r") as f:
|
||||
for id in f.readlines():
|
||||
to_return.append(func(id))
|
||||
return to_return
|
||||
|
||||
def parse_id_file(filename):
|
||||
return parse_tracking_file(filename, lambda a : int(a))
|
||||
|
||||
def parse_comments_file(filename):
|
||||
return parse_tracking_file(filename, lambda a : str(a).strip())
|
||||
|
||||
def write_tracking_file(filename, ids):
|
||||
if (len(ids) > COMMENT_MEMORY):
|
||||
ids = ids[0:COMMENT_MEMORY]
|
||||
with open(filename, "w+") as f:
|
||||
for id in ids:
|
||||
f.write(f"{id}\n")
|
||||
|
||||
def get_rude_reply(comment_to_reply_to, is_chudded, is_pizzad, session):
|
||||
past_comments = Comment.get_past_comments(session)
|
||||
reply = ""
|
||||
if (comment_to_reply_to == ""):
|
||||
marsey = random.choice(SNARKY_MARSIES)
|
||||
reply = f":{marsey}:"
|
||||
else:
|
||||
reply = rude_reply_wrapper(comment_to_reply_to, past_comments, session)
|
||||
if reply == "":
|
||||
marsey = random.choice(SNARKY_MARSIES)
|
||||
reply = f":{marsey}:"
|
||||
else:
|
||||
past_comments = past_comments.insert(0, reply)
|
||||
|
||||
if is_chudded:
|
||||
reply += "\n\nTRANS LIVES MATTER"
|
||||
if is_pizzad:
|
||||
if len(reply) < 280:
|
||||
reply += "\n\n"
|
||||
while len(reply) < 280: reply += 'pizza'
|
||||
return reply
|
||||
|
||||
def do_rude_reply(rdrama, comment_id, post_id, user_id, comment_to_reply_to, is_chudded, is_pizzad, session : Session, depth = 0):
|
||||
rude_reply = get_rude_reply(comment_to_reply_to, is_chudded, is_pizzad, session)
|
||||
reply = rdrama.reply_to_comment_easy(comment_id, post_id, rude_reply)
|
||||
reply_id = reply['id']
|
||||
Comment.create_new_comment(comment_id, reply_id, depth, rude_reply, session)
|
||||
Post.increment_replies(post_id, session)
|
||||
User.increase_number_of_comments(user_id, session)
|
||||
session.commit()
|
||||
|
||||
def do_rude_post_reply(rdrama : RDramaAPIInterface, post_id, user_id, post_contents, is_chudded, is_pizzad, session):
|
||||
rdrama.reply_to_post(post_id, get_rude_reply(post_contents, is_chudded, is_pizzad, session))
|
||||
Post.register_post_reply(post_id, session)
|
||||
Post.increment_replies(post_id, session)
|
||||
User.increase_number_of_comments(user_id, session)
|
||||
session.commit()
|
||||
|
||||
def can_reply_to_comment(parent_comment_id, user_id, post_id, session : Session, bulk = False) -> bool:
|
||||
if (not bulk and parent_comment_id != None and Comment.get_conversation_depth(parent_comment_id, session) > ALLOWED_CONVERSATION_DEPTH):
|
||||
print(f"Cannot reply to comment. Conversation is too deep. Depth = {Comment.get_conversation_depth(parent_comment_id, session)}")
|
||||
return False
|
||||
elif (Post.get_number_of_replies(post_id, session) > ALLOWED_COMMENTS_PER_POST):
|
||||
print(f"Cannot reply to comment. Post has too many bbbb replies.")
|
||||
return False
|
||||
elif (User.get_number_of_comments(user_id, session) > ALLOWED_COMMENTS_PER_USER_PER_DAY):
|
||||
print(f"Cannot reply to comment. User has replied too many times since last refresh.")
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def can_reply_to_post(post_id, user_id, session : Session) -> bool:
|
||||
if (Post.get_number_of_replies(post_id, session) > ALLOWED_COMMENTS_PER_POST):
|
||||
print(f"Cannot reply to post. Post has too many replies.")
|
||||
return False
|
||||
elif (User.get_number_of_comments(user_id, session) > ALLOWED_COMMENTS_PER_USER_PER_DAY):
|
||||
print(f"Cannot reply to post. User has replied too many times since last refresh.")
|
||||
return False
|
||||
elif (Post.has_replied_to_post(post_id, session)):
|
||||
print(f"Cannot reply to post. We have already replied to the post.")
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def handle_comment_ping(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session):
|
||||
print("I got pinged.")
|
||||
comment_info = rdrama.get_comment(notification['id'])
|
||||
if comment_info['level'] != 1:
|
||||
parent_comment_info = rdrama.get_comment(comment_info['parent_comment_id'])
|
||||
if not (parent_comment_info['is_bot'] or parent_comment_info['author_name'] == BBBB_USERNAME or parent_comment_info['author_name'] == "👻"):
|
||||
print("I can reply to the parent comment.")
|
||||
comment_to_reply_to = strip_markdown(parent_comment_info['body'])
|
||||
comment_id = parent_comment_info['id']
|
||||
comment_post_id = notification['post_id']
|
||||
comment_user_id = notification['user_id']
|
||||
Comment.has_replied_to_comment(parent_comment_info['id'], session)
|
||||
parent_comment_id = None if 'parent_comment_id' not in parent_comment_info else parent_comment_info['parent_comment_id']
|
||||
if (can_reply_to_comment(parent_comment_id, comment_post_id, comment_user_id, session) and Comment.get_comment(parent_comment_info['id'], session) == None):
|
||||
depth = 0 if parent_comment_id == None else Comment.get_conversation_depth(parent_comment_info['parent_comment_id'], session)
|
||||
do_rude_reply(rdrama, comment_id, comment_post_id, comment_user_id, comment_to_reply_to, is_chudded, is_pizzad, session, depth = depth+1)
|
||||
else:
|
||||
print("I already replied to this comment.")
|
||||
else:
|
||||
print("I cannot reply to the parent comment.")
|
||||
else:
|
||||
#It's a post
|
||||
print("The parent is a post.")
|
||||
post_id = comment_info['post_id']
|
||||
post = rdrama.get_post(post_id)
|
||||
post_body = post['body']
|
||||
post_user_id = post['author']['id']
|
||||
if (can_reply_to_post(post_id, post_user_id, session)):
|
||||
do_rude_post_reply(rdrama, post_id, post_user_id, strip_markdown(post_body), is_chudded, is_pizzad, session)
|
||||
|
||||
def handle_post_ping(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session):
|
||||
print("I was pinged in a post.")
|
||||
if not Post.has_replied_to_post(notification['post_id'], session):
|
||||
post_id = notification['post_id']
|
||||
#comment_id_blacklist.insert(0, post_id) #TODO: Add record of this.
|
||||
post = rdrama.get_post(post_id)
|
||||
post_body = post['body']
|
||||
post_user_id = post['author']['id']
|
||||
do_rude_post_reply(rdrama, post_id, post_user_id, strip_markdown(post_body), is_chudded, is_pizzad, session)
|
||||
|
||||
def handle_comment_mention(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session):
|
||||
comment_to_reply_to = strip_markdown(notification['message'])
|
||||
#comment_id_blacklist.insert(0, notification['id']) #TODO: Record keeping
|
||||
|
||||
depth = Comment.get_conversation_depth(notification['parent_comment_id'], session)
|
||||
print(f"Someone replied to me / pinged me. Depth = {depth}")
|
||||
if (can_reply_to_comment(notification['parent_comment_id'], notification['user_id'], notification['post_id'], session) and Comment.get_comment(notification['id'], session) == None):
|
||||
print("I can reply, the depth is not too much.")
|
||||
do_rude_reply(rdrama, notification['id'], notification['post_id'], notification['user_id'], comment_to_reply_to, is_chudded, is_pizzad, session, depth=depth+1)
|
||||
else:
|
||||
return
|
||||
|
||||
if ("@bbbb" in notification['message']):
|
||||
handle_comment_ping(rdrama, notification, is_chudded, is_pizzad, session)
|
||||
|
||||
def handle_direct_message(rdrama : RDramaAPIInterface, notification : dict, is_chudded : bool, is_pizzad : bool, session : Session):
|
||||
print(notification['message'])
|
||||
message = notification['message']
|
||||
if (notification['user_id'] == OPERATOR_ID):
|
||||
if (len(message.split(" ")) == 1):
|
||||
comment_id = int(message)
|
||||
comment_info = rdrama.get_comment(comment_id)
|
||||
pp.pprint(comment_info)
|
||||
post_id = comment_info['post_id']
|
||||
user_id = comment_info['user_id']
|
||||
comment_to_reply_to = strip_markdown(comment_info['body'])
|
||||
|
||||
#comment_id_blacklist.insert(0, comment_id) #TODO: Add a record here.
|
||||
do_rude_reply(rdrama, comment_id, post_id, user_id, comment_to_reply_to, is_chudded, is_pizzad, session)
|
||||
print(F"DOING A REQUESTED RUDE REPLY TO {comment_id}")
|
||||
|
||||
def reply_to_notifications(rdrama : RDramaAPIInterface, is_chudded, is_pizzad, session):
|
||||
unread = rdrama.get_parsed_notification()
|
||||
for notification in unread:
|
||||
try:
|
||||
if (notification['type'] == 'direct_message'):
|
||||
handle_direct_message(rdrama, notification, is_chudded, is_pizzad, session)
|
||||
elif (notification['type'] == 'comment_mention'):
|
||||
handle_comment_mention(rdrama, notification, is_chudded, is_pizzad, session)
|
||||
elif (notification['type'] == 'post_mention'):
|
||||
handle_post_ping(rdrama, notification, is_chudded, is_pizzad, session)
|
||||
else:
|
||||
print("Unhandled notification")
|
||||
pp.pprint(notification)
|
||||
except BaseException as e:
|
||||
print(f"problem with with a notification :( {e}")
|
||||
pp.pprint(notification)
|
||||
traceback.print_exc()
|
||||
|
||||
def is_self(comment):
|
||||
try:
|
||||
if comment['author'] == "👻":
|
||||
return True
|
||||
return comment['author']['id'] == BBBB_ID
|
||||
except Exception as e:
|
||||
print(f"YIKES! This comment is a wierd one... {e}")
|
||||
pp.pprint(comment)
|
||||
return True
|
||||
|
||||
def reply_to_random_comment(rdrama : RDramaAPIInterface, is_chudded, is_pizzad, session):
|
||||
comments = rdrama.get_comments(number_of_pages=PAGES_TO_SCAN)['data']
|
||||
comments = [comment for comment in comments if not comment['is_bot']] #No bots
|
||||
comments = [comment for comment in comments if not is_self(comment)] #Don't reply to self
|
||||
comments = [comment for comment in comments if can_reply_to_comment(None, comment['author']['id'], comment['post_id'], session)]
|
||||
comments = [comment for comment in comments if Comment.get_comment(comment['id'], session) is None]
|
||||
comments = [{
|
||||
'body' : strip_markdown(comment['body']),
|
||||
'id' : comment['id'],
|
||||
'author_id': comment['author']['id'],
|
||||
'parent_id' : comment['post_id']
|
||||
} for comment in comments] #Normal form, easier to work with
|
||||
comments = [comment for comment in comments if comment['body'] != ""]
|
||||
comments = [comment for comment in comments if len(comment['body'].split(" ")) > 10] #Get rid of short sentences
|
||||
comments = remove_duplicates(comments)
|
||||
if len(comments) == 0:
|
||||
print("No comments to reply to")
|
||||
return
|
||||
comment_to_reply_to = random.choice(comments)
|
||||
do_rude_reply(rdrama, comment_to_reply_to['id'], comment_to_reply_to['parent_id'], comment_to_reply_to['author_id'], comment_to_reply_to['body'], is_chudded, is_pizzad, session)
|
||||
pp.pprint(comment_to_reply_to)
|
||||
|
||||
def main_processing_task(rdrama : RDramaAPIInterface, session : Session):
|
||||
is_chudded = False #Do we have the chud award?
|
||||
can_communicate = True #Can we send any message at all?
|
||||
is_pizzad = False
|
||||
rdrama.get_front_page()
|
||||
bbbb_information = rdrama.get_user_information(BBBB_ID)
|
||||
print(f"coins: {bbbb_information['coins']} comments: {bbbb_information['comment_count']}")
|
||||
for badge in bbbb_information['badges']:
|
||||
if (badge['name'] == "Marsey Award"):
|
||||
print("We have the marsey award. STOP.")
|
||||
can_communicate = False
|
||||
if (badge['name'] == "Chud"):
|
||||
print("We have the CHUD award. CONTINUE")
|
||||
is_chudded = True
|
||||
if (badge['name'] == "Bird Site Award"):
|
||||
print("We have the Bird Site Award. STOP.")
|
||||
can_communicate = False
|
||||
if (badge['name'] == "Pizzashill Award"):
|
||||
print("We have the Pizzashill Award. CONTINUE.")
|
||||
is_pizzad = True
|
||||
|
||||
if bbbb_information['is_banned']:
|
||||
print("We are banned. STOP.")
|
||||
can_communicate = False
|
||||
|
||||
if can_communicate:
|
||||
try:
|
||||
reply_to_notifications(rdrama, is_chudded, is_pizzad, session)
|
||||
except BaseException as e:
|
||||
print(f"Problem replying to notifications :( {e} (Aevann probably fucked up the /unread endpoint lol)")
|
||||
traceback.print_exc()
|
||||
hours_since_noon = abs(datetime.now().hour-12)
|
||||
odds = 0
|
||||
if hours_since_noon > 9:
|
||||
odds = MIDNIGHT_PROBABILITY_MULTIPLIER*hours_since_noon
|
||||
elif hours_since_noon > 6:
|
||||
odds = AFTERNOON_MORNING_MULTIPLIER *hours_since_noon
|
||||
else:
|
||||
odds = NOON_MULTIPLER * hours_since_noon
|
||||
rolled_number = random.randrange(1,odds+2)
|
||||
if (not TIME_NORMALIZATION_MODE or rolled_number == 1):
|
||||
if (not TIME_NORMALIZATION_MODE):
|
||||
print("TIME NORMALIZATION MODE IS OFF")
|
||||
try:
|
||||
reply_to_random_comment(rdrama, is_chudded, is_pizzad, session)
|
||||
except BaseException as e:
|
||||
print(f"Problem with replying to random comment :( {e}")
|
||||
traceback.print_exc()
|
||||
else:
|
||||
print(f"Check failed. Odds = 1/{odds+1}. Rolled = {rolled_number}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
TEST_AUTH_TOKEN = "lNYlc-A77HDaAgUSbu6Px_T2I6QFEZR-P7sDtTZ4Lw-3S9LlETGijvVb-hysKUHgKKRwO3bE4eH1pPrmvBMgbY0C1XsAgaSHEaY2OyBdSEUWSxYIVp_1fGnkegdZDORZ" #todo - parameterize
|
||||
print(f"======= RUNNING AT {datetime.now().hour}:{datetime.now().minute} ======= ")
|
||||
if TEST_MODE:
|
||||
website = "localhost"
|
||||
auth = TEST_AUTH_TOKEN
|
||||
https = False
|
||||
timeout = 1
|
||||
BBBB_ID = 6
|
||||
OPERATOR_ID = 9
|
||||
ACTUALLY_CALL_OPEN_AI = False
|
||||
else:
|
||||
website = "rdrama.net"
|
||||
with open(get_real_filename("rdrama_auth_token"), "r") as f:
|
||||
auth = f.read()
|
||||
https = True
|
||||
timeout = 10
|
||||
rdrama = RDramaAPIInterface(auth, website, timeout, https=https)
|
||||
|
||||
#Set up fail safe
|
||||
def exitfunc():
|
||||
print("*That's it, you're going in the retard squisher.*")
|
||||
print("FAILSAFE FORCED SHUTDOWN", datetime.now())
|
||||
os._exit(0)
|
||||
|
||||
timer = Timer(60*MINUTES_BEFORE_FORCED_SHUTDOWN, exitfunc)
|
||||
timer.start()
|
||||
db_filename = "bbbb_database.db"
|
||||
engine = create_engine(f"sqlite:///{get_real_filename(db_filename)}")
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
with Session(engine) as session:
|
||||
main_processing_task(rdrama, session)
|
||||
session.commit()
|
||||
timer.cancel()
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
from BBBB_Models import Comment, OpenAIToken, User, Base
|
||||
from bbbb import get_real_filename
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import create_engine
|
||||
import sys
|
||||
|
||||
if __name__ == "__main__":
|
||||
db_filename = "bbbb_database.db"
|
||||
engine = create_engine(f"sqlite:///{get_real_filename(db_filename)}")
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
with Session(engine) as session:
|
||||
command = sys.argv[1]
|
||||
if (command == "add_token"):
|
||||
token = sys.argv[2]
|
||||
OpenAIToken.add_token(token, session)
|
||||
print("Added the token!")
|
||||
elif (command == "reset_users"):
|
||||
User.reset_all_comments(session)
|
||||
print("Reset the users!")
|
||||
elif (command == "test"):
|
||||
print(OpenAIToken.call_open_ai("THEOREM: Let c be the hypotenuse of a right triangle and let a, b be the other two sides; then a^2 + b^2 = c^2\n\nLet's think step by step.", session))
|
||||
elif (command == "has_replied"):
|
||||
comment_id = sys.argv[2]
|
||||
print(Comment.get_comment(comment_id, session))
|
||||
print(Comment.has_replied_to_comment(comment_id))
|
||||
else:
|
||||
print(f"command {command} not understood.")
|
||||
|
||||
session.commit()
|
||||
|
||||
|
Loading…
Reference in New Issue