433 lines
18 KiB
Python
433 lines
18 KiB
Python
TEST_MODE = False
|
|
|
|
from typing import Tuple
|
|
from numpy import average
|
|
import praw
|
|
from praw.models import Comment, Submission
|
|
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
|
|
from psaw import PushshiftAPI
|
|
from os.path import exists, join, realpath, split
|
|
import langdetect
|
|
from RDramaAPIInterface import RDramaAPIInterface
|
|
from bs4 import BeautifulSoup
|
|
from markdown import markdown
|
|
import datetime
|
|
|
|
BANNED_WORDS_IN_POST = ['comment', 'promotion']
|
|
BANNED_SUBREDDITS = ['LoveIslandTV']
|
|
LANGUAGE_DETECTION_ACCURACY_THRESHOLD = 10
|
|
|
|
def get_real_filename(filename : str):
|
|
path_to_script = realpath(__file__)
|
|
path_to_script_directory, _ = split(path_to_script)
|
|
return join(path_to_script_directory, filename)
|
|
|
|
with open(get_real_filename("id")) as f:
|
|
client_id = f.read()
|
|
with open(get_real_filename("secret")) as f:
|
|
client_secret = f.read()
|
|
with open(get_real_filename("user_agent")) as f:
|
|
user_agent = f.read()
|
|
|
|
reddit = praw.Reddit(
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
user_agent=user_agent
|
|
)
|
|
|
|
pushshift_api = PushshiftAPI(reddit)
|
|
|
|
def is_english(string : str) -> bool:
|
|
return string != '' and langdetect.detect(string) == 'en'
|
|
|
|
def string_split(string):
|
|
return [a for a in sanitize_sentence(string).split(" ") if a != ""]
|
|
|
|
def is_submission_english(submission : Submission):
|
|
title = sanitize_sentence(submission.title)
|
|
if (title != "" and is_english(title)):
|
|
return True
|
|
elif (len(string_split(title)) <= LANGUAGE_DETECTION_ACCURACY_THRESHOLD):
|
|
description = submission.subreddit.description
|
|
if (is_english(description)):
|
|
return True
|
|
elif (len(string_split(description)) <= LANGUAGE_DETECTION_ACCURACY_THRESHOLD):
|
|
return is_english(submission.subreddit.display_name)
|
|
else:
|
|
return False
|
|
|
|
def sanitize_sentence(sentence):
|
|
to_return = ''.join([i for i in sentence.lower() if i in 'abcdefghijklmnopqrstuvwxyz '])
|
|
return to_return
|
|
|
|
def contains_banned_words(sentence):
|
|
santitized_sentence = ''.join([i for i in sentence.lower() if i in 'abcdefghijklmnopqrstuvwxyz '])
|
|
return bool(set(BANNED_WORDS_IN_POST).intersection(santitized_sentence.split(" ")))
|
|
|
|
def has_banned_submitter(submission : Submission):
|
|
if submission.author == None:
|
|
return False
|
|
return submission.author.name == "AutoModerator"
|
|
|
|
def get_based_submissions(subreddit, time_frame, limit):
|
|
subscriber_cache = {}
|
|
subreddit_name_cache = {}
|
|
|
|
submissions = []
|
|
most_based_score = 0
|
|
most_relatively_based_score = 0
|
|
for submission in reddit.subreddit(subreddit).controversial(time_frame, limit=limit):
|
|
try:
|
|
basedness = (1-submission.upvote_ratio)*submission.num_comments
|
|
if (has_banned_submitter(submission) or contains_banned_words(submission.title)):
|
|
continue
|
|
if (not is_submission_english(submission)):
|
|
print(f"Disregarding \"{submission.title}\" ({submission.id}): Not english")
|
|
continue
|
|
if (submission.subreddit not in subscriber_cache):
|
|
subscriber_cache[submission.subreddit] = submission.subreddit.subscribers
|
|
if (not submission.subreddit in subreddit_name_cache):
|
|
subreddit_name_cache[submission.subreddit] = submission.subreddit.display_name
|
|
if (subreddit_name_cache[submission.subreddit] in BANNED_SUBREDDITS):
|
|
print(f"Disregarding \"{submission.title}\" ({submission.id}): Banned subreddit")
|
|
continue
|
|
relative_basedness = ((basedness/subscriber_cache[submission.subreddit]))*100000
|
|
if (basedness > most_based_score):
|
|
most_based_score = basedness
|
|
most_based_submission = submission
|
|
if (relative_basedness > most_relatively_based_score):
|
|
most_relatively_based_score = relative_basedness
|
|
most_relatively_based_submission = submission
|
|
submissions.append((basedness, relative_basedness, submission))
|
|
print(f"(B: {basedness} RB: {relative_basedness}){submission.title}")
|
|
except Exception as e:
|
|
print(f"Error while processing {submission} : {e}")
|
|
return submissions
|
|
|
|
def strip_markdown(markdown_string):
|
|
html = markdown(markdown_string)
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
text = ''.join(soup.findAll(text=True))
|
|
return text
|
|
|
|
def analyze_comments(submission : 'Submission'):
|
|
print(f"[{submission.id}]Retrieving Comments")
|
|
comments = pushshift_api.search_comments(subreddit=submission.subreddit.display_name, link_id=submission.id)
|
|
comment_list = list(comments)
|
|
print(f"[{submission.id}]Creating Network")
|
|
comment_map = {i.id:i for i in comment_list}
|
|
child_map = {}
|
|
for comment in comment_map.values():
|
|
try:
|
|
parent_id = comment.parent_id[3:]
|
|
if (parent_id not in child_map):
|
|
child_map[parent_id] = []
|
|
child_map[parent_id].append(comment)
|
|
except:
|
|
print(f"Error matching {comment} to its parent.")
|
|
|
|
sid_obj = SentimentIntensityAnalyzer()
|
|
print(f"[{submission.id}]Classifying Comments")
|
|
user_to_total_anger = {}
|
|
redditors = {}
|
|
ranked_comments = []
|
|
angry_comments = []
|
|
for comment in comment_map.values():
|
|
try:
|
|
comment_info = {
|
|
'comment' : comment
|
|
}
|
|
if (comment.body == '[deleted]' or comment.author == None):
|
|
continue
|
|
if ("t1" in comment.parent_id[0:2]): #Not a parent comment
|
|
parent = comment_map[comment.parent_id[3:]]
|
|
comment_info['parent'] = parent
|
|
parent_score = parent.score
|
|
if (comment.id in child_map):
|
|
child_scores = [i.score for i in child_map[comment.id] if isinstance(i, Comment)]
|
|
else:
|
|
child_scores = []
|
|
if len(child_scores) > 0: #More than one child - not sure how to handle the no child case
|
|
average_child_score = sum(child_scores)/len(child_scores)
|
|
if (average_child_score > 0 and parent_score > 0):
|
|
comment_score = comment.score
|
|
if (comment_score >= average_child_score and comment_score <= parent_score):
|
|
pass
|
|
else:
|
|
basedness = average_child_score - comment_score
|
|
ranked_comments.append((basedness, comment_info))
|
|
else:
|
|
#A parent comment
|
|
comment_info['parent'] = None
|
|
if (comment.id in child_map):
|
|
child_scores = [i.score for i in child_map[comment.id] if isinstance(i, Comment)]
|
|
else:
|
|
child_scores = []
|
|
if len(child_scores) > 0: #More than one child - not sure how to handle the no child case
|
|
average_child_score = sum(child_scores)/len(child_scores)
|
|
comment_score = comment.score
|
|
if (comment_score >= average_child_score):
|
|
pass
|
|
else:
|
|
basedness = average_child_score - comment_score
|
|
ranked_comments.append((basedness, comment_info))
|
|
# Add to angriness
|
|
score = sid_obj.polarity_scores(remove_quoted_text(comment.body))['compound']
|
|
if score < -0.5:
|
|
angry_comments.append((sid_obj.polarity_scores(comment.body)['compound'], comment_info))
|
|
|
|
if comment.author not in user_to_total_anger:
|
|
user_to_total_anger[comment.author] = 0.0
|
|
redditors[comment.author] = {}
|
|
redditors[comment.author]['comments'] = []
|
|
redditors[comment.author]['angriness'] = 0
|
|
redditors[comment.author]['author'] = comment.author
|
|
|
|
user_to_total_anger[comment.author]+=score
|
|
redditors[comment.author]['comments'].append((score, comment_info))
|
|
redditors[comment.author]['angriness'] += score
|
|
except Exception as e:
|
|
print(f"Error while processing {comment}: {e}")
|
|
|
|
print(f"[{submission.id}]Done")
|
|
ranked_comments.sort(reverse=True, key= lambda a : a[0])
|
|
angry_comments.sort(key=lambda a:a[0])
|
|
lolcows = [(v, k) for k, v in user_to_total_anger.items()]
|
|
lolcows.sort(key=lambda a:a[0])
|
|
redditors_ranked = [(data['angriness'], data) for username, data in redditors.items()]
|
|
redditors_ranked.sort(key=lambda a:a[0])
|
|
return {
|
|
'based' : ranked_comments,
|
|
'angry': angry_comments,
|
|
'lolcows': lolcows,
|
|
'redditors': redditors_ranked
|
|
}
|
|
#get_based_submissions("all", "hour", 25, True)
|
|
|
|
def remove_quoted_text(string):
|
|
return '\n'.join([i for i in string.split("\n") if i == "" or i[0] != ">"])
|
|
|
|
def generate_comment_display_section(submissions : 'Tuple[float, Submission]', section_title, detail_display, number_to_show, show_details = True, detail_func = lambda a : a, max_len = 1000 ):
|
|
markdown_lines = []
|
|
if len(submissions) != 0:
|
|
markdown_lines.append(f"## {section_title}")
|
|
for comment_info in submissions[:number_to_show]:
|
|
attribute = comment_info[0]
|
|
parent = comment_info[1]['parent']
|
|
comment = comment_info[1]['comment']
|
|
if (show_details):
|
|
markdown_lines.append(f"{detail_display}: {detail_func(attribute)}")
|
|
comment_indent = ""
|
|
|
|
if (parent != None):
|
|
parent_body = remove_quoted_text(parent.body)
|
|
parent_body = strip_markdown(parent_body)
|
|
parent_body = parent_body.replace("\n", "")
|
|
|
|
if len(parent_body) > max_len:
|
|
parent_body = parent_body[0:max_len-3] + "..."
|
|
markdown_lines.append(f"> {parent_body} ({parent.score})")
|
|
comment_indent = ">>"
|
|
else:
|
|
comment_indent = ">"
|
|
|
|
comment_body = remove_quoted_text(comment.body)
|
|
comment_body = strip_markdown(comment_body)
|
|
comment_body = comment_body.replace("\n", "")
|
|
if len(comment_body) > max_len:
|
|
comment_body = comment_body[0:max_len-3] + "..."
|
|
markdown_lines.append(f"{comment_indent} [{comment_body}](https://reddit.com{comment.permalink}?context=3) ({comment.score})")
|
|
return markdown_lines
|
|
|
|
def comment_basedness_score_string(basedness):
|
|
score = 0
|
|
if basedness > 1000:
|
|
score = 5
|
|
elif basedness > 500:
|
|
score = 4
|
|
elif basedness > 100:
|
|
score = 3
|
|
elif basedness > 50:
|
|
score = 2
|
|
elif basedness > 10:
|
|
score = 1
|
|
else:
|
|
score = 0
|
|
return get_score_string(score, "🔥", "🔘")
|
|
|
|
def angriness_score_string(angriness):
|
|
score = 0
|
|
if angriness < -0.95:
|
|
score = 5
|
|
elif angriness < -0.9:
|
|
score = 4
|
|
elif angriness < -0.85:
|
|
score = 3
|
|
elif angriness < -0.75:
|
|
score = 2
|
|
elif angriness < -0.6:
|
|
score = 1
|
|
else:
|
|
score = 0
|
|
|
|
return get_score_string(score, "😡", "🔘")
|
|
|
|
def get_reddit_full_url(partial_url):
|
|
return f"https://reddit.com{partial_url}"
|
|
|
|
def generate_lolcow_display_section(lolcows):
|
|
markdown_lines = []
|
|
|
|
|
|
biggest_lolcow_info = lolcows[0]
|
|
biggest_lolcow_score = biggest_lolcow_info[0]
|
|
biggest_lolcow = biggest_lolcow_info[1]['author']
|
|
number_of_comments = len(biggest_lolcow_info[1]['comments'])
|
|
lolcow_comments = biggest_lolcow_info[1]['comments']
|
|
lolcow_score_string = get_score_string(-1*biggest_lolcow_score, "🐮", "🔘", allow_over=True)
|
|
markdown_lines.append(f"# Biggest Lolcow: /u/{biggest_lolcow.name}")
|
|
markdown_lines.append(f"Score: {lolcow_score_string}")
|
|
markdown_lines.append(f"Number of comments: {number_of_comments}")
|
|
comment_angryness_scores = [-1*a[0] for a in biggest_lolcow_info[1]['comments']]
|
|
average_angriness = average(comment_angryness_scores)
|
|
maximum_angry_info = min(lolcow_comments, key=lambda a : a[0])
|
|
minimum_angry_info = max(lolcow_comments, key=lambda a : a[0])
|
|
markdown_lines.append(f"Average angriness: {angriness_score_string(average_angriness)}")
|
|
markdown_lines.append(f"Maximum angriness: [{angriness_score_string(maximum_angry_info[0])}]({get_reddit_full_url(maximum_angry_info[1]['comment'].permalink)}?context=3)")
|
|
markdown_lines.append(f"Minimum angriness: [{angriness_score_string(minimum_angry_info[0])}]({get_reddit_full_url(minimum_angry_info[1]['comment'].permalink)}?context=3)")
|
|
|
|
return markdown_lines
|
|
|
|
def generate_submission_report(submission : 'Submission', absolute: bool):
|
|
markdown_lines = []
|
|
comment_analysis_results = analyze_comments(submission)
|
|
basedness_display_func = lambda a : get_comment_basedness_out_of_five(a, absolute)
|
|
markdown_lines.extend(generate_comment_display_section(comment_analysis_results['based'], "Most Based Comments", "Basedness", 3, detail_func=basedness_display_func))
|
|
markdown_lines.extend(generate_comment_display_section(comment_analysis_results['angry'], "Angriest Comments", "Angriness", 3, detail_func=angriness_score_string))
|
|
markdown_lines.extend(generate_lolcow_display_section(comment_analysis_results['redditors']))
|
|
markdown_lines.append(f"")
|
|
markdown_lines.append("*:marppy: autodrama: automating away the jobs of dramautists. :marseycapitalistmanlet: Ping HeyMoon if there are any problems or you have a suggestion :marseyjamming:*")
|
|
return "\n\n".join(markdown_lines)
|
|
|
|
def create_file_report(submission : 'Submission', absolute):
|
|
submission_name = submission.title
|
|
print(f"Generating submission for https://reddit.com{submission.permalink}")
|
|
filename = "".join([i.lower() for i in submission_name if i.lower() in "abcdefghijklmnopqrstuvwxyz "])[:30].replace(" ", "_") + "_" + submission.subreddit.name + ".md"
|
|
submission_report = generate_submission_report(submission, absolute)
|
|
print(submission_report)
|
|
with open(filename, "wb") as f:
|
|
f.write(submission_report.encode("utf-8"))
|
|
|
|
def create_file_reports_for_list_of_submissions(submissions : 'list[Tuple[float, float, Submission]]'):
|
|
for i in submissions:
|
|
try:
|
|
submission = i[2]
|
|
create_file_report(submission)
|
|
except Exception as e:
|
|
print(f"Yikes, had a bit of a fucky wucky: {e}")
|
|
|
|
def get_basedness_score_out_of_five(basedness : int) -> int:
|
|
if basedness > 10000:
|
|
return 5
|
|
elif basedness > 5000:
|
|
return 4
|
|
elif basedness > 1000:
|
|
return 3
|
|
elif basedness > 100:
|
|
return 2
|
|
elif basedness > 10:
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
def get_comment_basedness_out_of_five(basedness: int, absolute : bool):
|
|
if (absolute):
|
|
if basedness > 1000:
|
|
score = 5
|
|
elif basedness > 500:
|
|
score = 4
|
|
elif basedness > 100:
|
|
score = 3
|
|
elif basedness > 50:
|
|
score = 2
|
|
elif basedness > 10:
|
|
score = 1
|
|
else:
|
|
score = 0
|
|
else:
|
|
if basedness > 100:
|
|
score = 5
|
|
elif basedness > 50:
|
|
score = 4
|
|
elif basedness > 10:
|
|
score = 3
|
|
elif basedness > 5:
|
|
score = 2
|
|
elif basedness > 1:
|
|
score = 1
|
|
else:
|
|
score = 0
|
|
return get_score_string(score, "🔥", "🔘")
|
|
|
|
def get_score_string(score: int, filled_emoji, empty_emoji, allow_over = False) -> str:
|
|
to_return = "".join([filled_emoji if ((i+1) <= score) else empty_emoji for i in range(5)])
|
|
if (allow_over):
|
|
if (score > 5):
|
|
to_return += f"(+{int(score)-5}{filled_emoji})"
|
|
return to_return
|
|
|
|
def create_rdrama_report(rdrama : RDramaAPIInterface, submission : 'Submission', basedness: int, absolute_basedness: bool):
|
|
score = get_basedness_score_out_of_five(basedness)
|
|
score_string = get_score_string(score, "🔥" if absolute_basedness else "🤓", "🔘")
|
|
title = f"[{score_string}] {submission.title}"
|
|
url = f"https://reddit.com{submission.permalink}"
|
|
body = generate_submission_report(submission, absolute_basedness)
|
|
if len(body) > 20000:
|
|
body = body[0:19997] + "..."
|
|
try:
|
|
rdrama.make_post(title, url, body)
|
|
except Exception as e:
|
|
print(f"Yikes, a fucky wucky occured! {e}")
|
|
|
|
def get_first_unposted(rdrama : RDramaAPIInterface, submissions : 'list[Submission]'):
|
|
for submission in submissions:
|
|
if (not rdrama.has_url_been_posted(f"https://www.reddit.com{submission.permalink}")):
|
|
return submission
|
|
return None
|
|
|
|
def daily_drama_post(rdrama : RDramaAPIInterface):
|
|
print("Performing Daily Drama Post!")
|
|
based_submissions = get_based_submissions("all", "day", 150)
|
|
print("Posting the most relatively based submission for the day...")
|
|
based_submissions.sort(reverse=True, key = lambda a : a[1]) #Sort by relative basedness
|
|
most_relatively_based_submission = get_first_unposted(rdrama, [i[2] for i in based_submissions])
|
|
create_rdrama_report(rdrama, most_relatively_based_submission, based_submissions[0][1], False)
|
|
print("Posting the most based submission for the day...")
|
|
based_submissions.sort(reverse=True, key = lambda a : a[0]) #Sort by basedness
|
|
most_absolutely_based_submission = get_first_unposted(rdrama, [i[2] for i in based_submissions])
|
|
create_rdrama_report(rdrama, most_absolutely_based_submission, based_submissions[0][0], True)
|
|
print("Done!")
|
|
|
|
TEST_AUTH_TOKEN = "jU_k7alzoqfogYqQgcPJ3vIWILiDtI7UWdMTmKbvuttMih-YbhRCs8B3BBCRSKkdSJ0w_JfzJn2YBkdDEw5DIf3UXb3vGTRvLB_9BQ9zBiTz9opp3MFGSudH_s_C7keq" #todo - parameterize
|
|
if TEST_MODE:
|
|
website = "localhost"
|
|
auth = TEST_AUTH_TOKEN
|
|
https = False
|
|
timeout = 1
|
|
else:
|
|
website = "rdrama.net"
|
|
with open(get_real_filename("rdrama_auth_token"), "r") as f:
|
|
auth = f.read()
|
|
https = True
|
|
timeout = 10
|
|
rdrama = RDramaAPIInterface(auth, website, https=https)
|
|
|
|
def create_report(submission : Submission, absolute):
|
|
file_name = sanitize_sentence(submission.title).replace(" ", "_")
|
|
with open(f"{file_name}.md", "w+", encoding="utf-8") as f:
|
|
f.write(generate_submission_report(submission, absolute))
|
|
|
|
if __name__ == "__main__":
|
|
print(f"Starting at {datetime.datetime.now()}")
|
|
daily_drama_post(rdrama)
|
|
print(f"Ending at {datetime.datetime.now()}") |