autodrama/autodrama.py

433 lines
18 KiB
Python

TEST_MODE = False
from typing import Tuple
from numpy import average
import praw
from praw.models import Comment, Submission
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from psaw import PushshiftAPI
from os.path import exists, join, realpath, split
import langdetect
from RDramaAPIInterface import RDramaAPIInterface
from bs4 import BeautifulSoup
from markdown import markdown
import datetime
BANNED_WORDS_IN_POST = ['comment', 'promotion']
BANNED_SUBREDDITS = ['LoveIslandTV']
LANGUAGE_DETECTION_ACCURACY_THRESHOLD = 10
def get_real_filename(filename : str):
path_to_script = realpath(__file__)
path_to_script_directory, _ = split(path_to_script)
return join(path_to_script_directory, filename)
with open(get_real_filename("id")) as f:
client_id = f.read()
with open(get_real_filename("secret")) as f:
client_secret = f.read()
with open(get_real_filename("user_agent")) as f:
user_agent = f.read()
reddit = praw.Reddit(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent
)
pushshift_api = PushshiftAPI(reddit)
def is_english(string : str) -> bool:
return string != '' and langdetect.detect(string) == 'en'
def string_split(string):
return [a for a in sanitize_sentence(string).split(" ") if a != ""]
def is_submission_english(submission : Submission):
title = sanitize_sentence(submission.title)
if (title != "" and is_english(title)):
return True
elif (len(string_split(title)) <= LANGUAGE_DETECTION_ACCURACY_THRESHOLD):
description = submission.subreddit.description
if (is_english(description)):
return True
elif (len(string_split(description)) <= LANGUAGE_DETECTION_ACCURACY_THRESHOLD):
return is_english(submission.subreddit.display_name)
else:
return False
def sanitize_sentence(sentence):
to_return = ''.join([i for i in sentence.lower() if i in 'abcdefghijklmnopqrstuvwxyz '])
return to_return
def contains_banned_words(sentence):
santitized_sentence = ''.join([i for i in sentence.lower() if i in 'abcdefghijklmnopqrstuvwxyz '])
return bool(set(BANNED_WORDS_IN_POST).intersection(santitized_sentence.split(" ")))
def has_banned_submitter(submission : Submission):
if submission.author == None:
return False
return submission.author.name == "AutoModerator"
def get_based_submissions(subreddit, time_frame, limit):
subscriber_cache = {}
subreddit_name_cache = {}
submissions = []
most_based_score = 0
most_relatively_based_score = 0
for submission in reddit.subreddit(subreddit).controversial(time_frame, limit=limit):
try:
basedness = (1-submission.upvote_ratio)*submission.num_comments
if (has_banned_submitter(submission) or contains_banned_words(submission.title)):
continue
if (not is_submission_english(submission)):
print(f"Disregarding \"{submission.title}\" ({submission.id}): Not english")
continue
if (submission.subreddit not in subscriber_cache):
subscriber_cache[submission.subreddit] = submission.subreddit.subscribers
if (not submission.subreddit in subreddit_name_cache):
subreddit_name_cache[submission.subreddit] = submission.subreddit.display_name
if (subreddit_name_cache[submission.subreddit] in BANNED_SUBREDDITS):
print(f"Disregarding \"{submission.title}\" ({submission.id}): Banned subreddit")
continue
relative_basedness = ((basedness/subscriber_cache[submission.subreddit]))*100000
if (basedness > most_based_score):
most_based_score = basedness
most_based_submission = submission
if (relative_basedness > most_relatively_based_score):
most_relatively_based_score = relative_basedness
most_relatively_based_submission = submission
submissions.append((basedness, relative_basedness, submission))
print(f"(B: {basedness} RB: {relative_basedness}){submission.title}")
except Exception as e:
print(f"Error while processing {submission} : {e}")
return submissions
def strip_markdown(markdown_string):
html = markdown(markdown_string)
soup = BeautifulSoup(html, "html.parser")
text = ''.join(soup.findAll(text=True))
return text
def analyze_comments(submission : 'Submission'):
print(f"[{submission.id}]Retrieving Comments")
comments = pushshift_api.search_comments(subreddit=submission.subreddit.display_name, link_id=submission.id)
comment_list = list(comments)
print(f"[{submission.id}]Creating Network")
comment_map = {i.id:i for i in comment_list}
child_map = {}
for comment in comment_map.values():
try:
parent_id = comment.parent_id[3:]
if (parent_id not in child_map):
child_map[parent_id] = []
child_map[parent_id].append(comment)
except:
print(f"Error matching {comment} to its parent.")
sid_obj = SentimentIntensityAnalyzer()
print(f"[{submission.id}]Classifying Comments")
user_to_total_anger = {}
redditors = {}
ranked_comments = []
angry_comments = []
for comment in comment_map.values():
try:
comment_info = {
'comment' : comment
}
if (comment.body == '[deleted]' or comment.author == None):
continue
if ("t1" in comment.parent_id[0:2]): #Not a parent comment
parent = comment_map[comment.parent_id[3:]]
comment_info['parent'] = parent
parent_score = parent.score
if (comment.id in child_map):
child_scores = [i.score for i in child_map[comment.id] if isinstance(i, Comment)]
else:
child_scores = []
if len(child_scores) > 0: #More than one child - not sure how to handle the no child case
average_child_score = sum(child_scores)/len(child_scores)
if (average_child_score > 0 and parent_score > 0):
comment_score = comment.score
if (comment_score >= average_child_score and comment_score <= parent_score):
pass
else:
basedness = average_child_score - comment_score
ranked_comments.append((basedness, comment_info))
else:
#A parent comment
comment_info['parent'] = None
if (comment.id in child_map):
child_scores = [i.score for i in child_map[comment.id] if isinstance(i, Comment)]
else:
child_scores = []
if len(child_scores) > 0: #More than one child - not sure how to handle the no child case
average_child_score = sum(child_scores)/len(child_scores)
comment_score = comment.score
if (comment_score >= average_child_score):
pass
else:
basedness = average_child_score - comment_score
ranked_comments.append((basedness, comment_info))
# Add to angriness
score = sid_obj.polarity_scores(remove_quoted_text(comment.body))['compound']
if score < -0.5:
angry_comments.append((sid_obj.polarity_scores(comment.body)['compound'], comment_info))
if comment.author not in user_to_total_anger:
user_to_total_anger[comment.author] = 0.0
redditors[comment.author] = {}
redditors[comment.author]['comments'] = []
redditors[comment.author]['angriness'] = 0
redditors[comment.author]['author'] = comment.author
user_to_total_anger[comment.author]+=score
redditors[comment.author]['comments'].append((score, comment_info))
redditors[comment.author]['angriness'] += score
except Exception as e:
print(f"Error while processing {comment}: {e}")
print(f"[{submission.id}]Done")
ranked_comments.sort(reverse=True, key= lambda a : a[0])
angry_comments.sort(key=lambda a:a[0])
lolcows = [(v, k) for k, v in user_to_total_anger.items()]
lolcows.sort(key=lambda a:a[0])
redditors_ranked = [(data['angriness'], data) for username, data in redditors.items()]
redditors_ranked.sort(key=lambda a:a[0])
return {
'based' : ranked_comments,
'angry': angry_comments,
'lolcows': lolcows,
'redditors': redditors_ranked
}
#get_based_submissions("all", "hour", 25, True)
def remove_quoted_text(string):
return '\n'.join([i for i in string.split("\n") if i == "" or i[0] != ">"])
def generate_comment_display_section(submissions : 'Tuple[float, Submission]', section_title, detail_display, number_to_show, show_details = True, detail_func = lambda a : a, max_len = 1000 ):
markdown_lines = []
if len(submissions) != 0:
markdown_lines.append(f"## {section_title}")
for comment_info in submissions[:number_to_show]:
attribute = comment_info[0]
parent = comment_info[1]['parent']
comment = comment_info[1]['comment']
if (show_details):
markdown_lines.append(f"{detail_display}: {detail_func(attribute)}")
comment_indent = ""
if (parent != None):
parent_body = remove_quoted_text(parent.body)
parent_body = strip_markdown(parent_body)
parent_body = parent_body.replace("\n", "")
if len(parent_body) > max_len:
parent_body = parent_body[0:max_len-3] + "..."
markdown_lines.append(f"> {parent_body} ({parent.score})")
comment_indent = ">>"
else:
comment_indent = ">"
comment_body = remove_quoted_text(comment.body)
comment_body = strip_markdown(comment_body)
comment_body = comment_body.replace("\n", "")
if len(comment_body) > max_len:
comment_body = comment_body[0:max_len-3] + "..."
markdown_lines.append(f"{comment_indent} [{comment_body}](https://reddit.com{comment.permalink}?context=3) ({comment.score})")
return markdown_lines
def comment_basedness_score_string(basedness):
score = 0
if basedness > 1000:
score = 5
elif basedness > 500:
score = 4
elif basedness > 100:
score = 3
elif basedness > 50:
score = 2
elif basedness > 10:
score = 1
else:
score = 0
return get_score_string(score, "🔥", "🔘")
def angriness_score_string(angriness):
score = 0
if angriness < -0.95:
score = 5
elif angriness < -0.9:
score = 4
elif angriness < -0.85:
score = 3
elif angriness < -0.75:
score = 2
elif angriness < -0.6:
score = 1
else:
score = 0
return get_score_string(score, "😡", "🔘")
def get_reddit_full_url(partial_url):
return f"https://reddit.com{partial_url}"
def generate_lolcow_display_section(lolcows):
markdown_lines = []
biggest_lolcow_info = lolcows[0]
biggest_lolcow_score = biggest_lolcow_info[0]
biggest_lolcow = biggest_lolcow_info[1]['author']
number_of_comments = len(biggest_lolcow_info[1]['comments'])
lolcow_comments = biggest_lolcow_info[1]['comments']
lolcow_score_string = get_score_string(-1*biggest_lolcow_score, "🐮", "🔘", allow_over=True)
markdown_lines.append(f"# Biggest Lolcow: /u/{biggest_lolcow.name}")
markdown_lines.append(f"Score: {lolcow_score_string}")
markdown_lines.append(f"Number of comments: {number_of_comments}")
comment_angryness_scores = [-1*a[0] for a in biggest_lolcow_info[1]['comments']]
average_angriness = average(comment_angryness_scores)
maximum_angry_info = min(lolcow_comments, key=lambda a : a[0])
minimum_angry_info = max(lolcow_comments, key=lambda a : a[0])
markdown_lines.append(f"Average angriness: {angriness_score_string(average_angriness)}")
markdown_lines.append(f"Maximum angriness: [{angriness_score_string(maximum_angry_info[0])}]({get_reddit_full_url(maximum_angry_info[1]['comment'].permalink)}?context=3)")
markdown_lines.append(f"Minimum angriness: [{angriness_score_string(minimum_angry_info[0])}]({get_reddit_full_url(minimum_angry_info[1]['comment'].permalink)}?context=3)")
return markdown_lines
def generate_submission_report(submission : 'Submission', absolute: bool):
markdown_lines = []
comment_analysis_results = analyze_comments(submission)
basedness_display_func = lambda a : get_comment_basedness_out_of_five(a, absolute)
markdown_lines.extend(generate_comment_display_section(comment_analysis_results['based'], "Most Based Comments", "Basedness", 3, detail_func=basedness_display_func))
markdown_lines.extend(generate_comment_display_section(comment_analysis_results['angry'], "Angriest Comments", "Angriness", 3, detail_func=angriness_score_string))
markdown_lines.extend(generate_lolcow_display_section(comment_analysis_results['redditors']))
markdown_lines.append(f"")
markdown_lines.append("*:marppy: autodrama: automating away the jobs of dramautists. :marseycapitalistmanlet: Ping HeyMoon if there are any problems or you have a suggestion :marseyjamming:*")
return "\n\n".join(markdown_lines)
def create_file_report(submission : 'Submission', absolute):
submission_name = submission.title
print(f"Generating submission for https://reddit.com{submission.permalink}")
filename = "".join([i.lower() for i in submission_name if i.lower() in "abcdefghijklmnopqrstuvwxyz "])[:30].replace(" ", "_") + "_" + submission.subreddit.name + ".md"
submission_report = generate_submission_report(submission, absolute)
print(submission_report)
with open(filename, "wb") as f:
f.write(submission_report.encode("utf-8"))
def create_file_reports_for_list_of_submissions(submissions : 'list[Tuple[float, float, Submission]]'):
for i in submissions:
try:
submission = i[2]
create_file_report(submission)
except Exception as e:
print(f"Yikes, had a bit of a fucky wucky: {e}")
def get_basedness_score_out_of_five(basedness : int) -> int:
if basedness > 10000:
return 5
elif basedness > 5000:
return 4
elif basedness > 1000:
return 3
elif basedness > 100:
return 2
elif basedness > 10:
return 1
else:
return 0
def get_comment_basedness_out_of_five(basedness: int, absolute : bool):
if (absolute):
if basedness > 1000:
score = 5
elif basedness > 500:
score = 4
elif basedness > 100:
score = 3
elif basedness > 50:
score = 2
elif basedness > 10:
score = 1
else:
score = 0
else:
if basedness > 100:
score = 5
elif basedness > 50:
score = 4
elif basedness > 10:
score = 3
elif basedness > 5:
score = 2
elif basedness > 1:
score = 1
else:
score = 0
return get_score_string(score, "🔥", "🔘")
def get_score_string(score: int, filled_emoji, empty_emoji, allow_over = False) -> str:
to_return = "".join([filled_emoji if ((i+1) <= score) else empty_emoji for i in range(5)])
if (allow_over):
if (score > 5):
to_return += f"(+{int(score)-5}{filled_emoji})"
return to_return
def create_rdrama_report(rdrama : RDramaAPIInterface, submission : 'Submission', basedness: int, absolute_basedness: bool):
score = get_basedness_score_out_of_five(basedness)
score_string = get_score_string(score, "🔥" if absolute_basedness else "🤓", "🔘")
title = f"[{score_string}] {submission.title}"
url = f"https://reddit.com{submission.permalink}"
body = generate_submission_report(submission, absolute_basedness)
if len(body) > 20000:
body = body[0:19997] + "..."
try:
rdrama.make_post(title, url, body)
except Exception as e:
print(f"Yikes, a fucky wucky occured! {e}")
def get_first_unposted(rdrama : RDramaAPIInterface, submissions : 'list[Submission]'):
for submission in submissions:
if (not rdrama.has_url_been_posted(f"https://www.reddit.com{submission.permalink}")):
return submission
return None
def daily_drama_post(rdrama : RDramaAPIInterface):
print("Performing Daily Drama Post!")
based_submissions = get_based_submissions("all", "day", 150)
print("Posting the most relatively based submission for the day...")
based_submissions.sort(reverse=True, key = lambda a : a[1]) #Sort by relative basedness
most_relatively_based_submission = get_first_unposted(rdrama, [i[2] for i in based_submissions])
create_rdrama_report(rdrama, most_relatively_based_submission, based_submissions[0][1], False)
print("Posting the most based submission for the day...")
based_submissions.sort(reverse=True, key = lambda a : a[0]) #Sort by basedness
most_absolutely_based_submission = get_first_unposted(rdrama, [i[2] for i in based_submissions])
create_rdrama_report(rdrama, most_absolutely_based_submission, based_submissions[0][0], True)
print("Done!")
TEST_AUTH_TOKEN = "jU_k7alzoqfogYqQgcPJ3vIWILiDtI7UWdMTmKbvuttMih-YbhRCs8B3BBCRSKkdSJ0w_JfzJn2YBkdDEw5DIf3UXb3vGTRvLB_9BQ9zBiTz9opp3MFGSudH_s_C7keq" #todo - parameterize
if TEST_MODE:
website = "localhost"
auth = TEST_AUTH_TOKEN
https = False
timeout = 1
else:
website = "rdrama.net"
with open(get_real_filename("rdrama_auth_token"), "r") as f:
auth = f.read()
https = True
timeout = 10
rdrama = RDramaAPIInterface(auth, website, https=https)
def create_report(submission : Submission, absolute):
file_name = sanitize_sentence(submission.title).replace(" ", "_")
with open(f"{file_name}.md", "w+", encoding="utf-8") as f:
f.write(generate_submission_report(submission, absolute))
if __name__ == "__main__":
print(f"Starting at {datetime.datetime.now()}")
daily_drama_post(rdrama)
print(f"Ending at {datetime.datetime.now()}")