114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
import logging
|
|
import math
|
|
import sys
|
|
import time
|
|
from collections import OrderedDict
|
|
|
|
import requests
|
|
|
|
|
|
class DramaClient:
|
|
BASE_URL = "https://rdrama.net"
|
|
|
|
def __init__(self, token, logger=None):
|
|
self.session = requests.Session()
|
|
self.token = token
|
|
retries = requests.adapters.Retry(
|
|
total=5, backoff_factor=10, status_forcelist=[500, 502, 503, 504, 521]
|
|
)
|
|
self.session.mount(
|
|
"https://", requests.adapters.HTTPAdapter(max_retries=retries)
|
|
)
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
self.chud_phrase = self.get("/@me").get("chud_phrase", "")
|
|
|
|
def get(self, endpoint):
|
|
self.logger.info(f"GET {endpoint}")
|
|
time.sleep(1)
|
|
|
|
r = self.session.get(
|
|
f"{self.BASE_URL}{endpoint}", headers={"Authorization": self.token}
|
|
)
|
|
if r.status_code != 200:
|
|
self.logger.error("Error! {r}, {r.status_code}, {r.text}")
|
|
r.raise_for_status()
|
|
|
|
return r.json()
|
|
|
|
def post(self, endpoint, data=None, images=None):
|
|
self.logger.info(f"POST {endpoint}")
|
|
time.sleep(5)
|
|
|
|
if data is not None:
|
|
for key, value in data.items():
|
|
data[key] = str(value)
|
|
|
|
r = self.session.post(
|
|
f"{self.BASE_URL}{endpoint}",
|
|
data=data,
|
|
headers={"Authorization": self.token},
|
|
files=images,
|
|
)
|
|
|
|
if r.status_code != 200:
|
|
self.logger.error("Error! {r}, {r.status_code}, {r.text}")
|
|
r.raise_for_status()
|
|
|
|
return r.json()
|
|
|
|
# Return comments with a newer ID than `after`, up to `limit`.
|
|
def fetch_new_comments(self, after=0, limit=math.inf):
|
|
def newest_id(comments):
|
|
return max(c["id"] for c in comments) if comments else 0
|
|
|
|
def oldest_id(comments):
|
|
return min(c["id"] for c in comments) if comments else math.inf
|
|
|
|
comments = []
|
|
page = 1
|
|
|
|
# Fetch /comment?page=x until we've reached `after` or have satisfied `limit`.
|
|
while oldest_id(comments) > after and len(comments) < limit:
|
|
page_comments = self.fetch_page(page)
|
|
if not page_comments:
|
|
break
|
|
|
|
comments.extend(page_comments)
|
|
page += 1
|
|
|
|
# Filter for new comments.
|
|
comments = [c for c in comments if c["id"] > after]
|
|
# Deduplicate comments in case one was pushed to the next page while fetching.
|
|
comments = list(OrderedDict((c["id"], c) for c in comments).values())
|
|
# Oldest first.
|
|
comments.reverse()
|
|
|
|
return comments, newest_id(comments)
|
|
|
|
# Return replies and mentions.
|
|
def fetch_notifications(self):
|
|
notifs = self.get("/unread")["data"]
|
|
notifs = [n for n in notifs if n["body"]]
|
|
return notifs
|
|
|
|
# Return the post and comment thread (only including parents) for a comment.
|
|
def fetch_context(self, comment):
|
|
post = self.get(f"/post/{comment['post_id']}")
|
|
|
|
comments = [comment]
|
|
while parent_id := comments[-1].get("parent_comment_id", None):
|
|
parent = self.get(f"/comment/{parent_id}")
|
|
comments.append(parent)
|
|
|
|
# Make the top-level comment be first.
|
|
comments.reverse()
|
|
|
|
return post, comments
|
|
|
|
def fetch_page(self, page):
|
|
return self.get(f"/comments?page={page}")["data"]
|
|
|
|
def reply(self, comment, body, images=None):
|
|
data = {"parent_fullname": f"c_{comment['id']}", "body": body}
|
|
return self.post("/comment", data=data, images=images)
|