162 lines
5.4 KiB
Python
162 lines
5.4 KiB
Python
from datetime import datetime
|
|
from email.policy import default
|
|
from sqlite3 import Date
|
|
import time
|
|
import openai
|
|
import sqlalchemy
|
|
from sqlalchemy.orm import declarative_base, Session
|
|
from sqlalchemy import Column, DateTime, String, ForeignKey, Integer, Boolean, Table, and_, or_
|
|
|
|
Base = declarative_base()
|
|
|
|
class User(Base):
|
|
__tablename__ = "user"
|
|
|
|
id = Column(Integer, primary_key = True)
|
|
number_of_comments = Column(Integer, default = 0)
|
|
|
|
def get_user(user_id : int, session : Session):
|
|
stmt = sqlalchemy.select(User).where(User.id == user_id)
|
|
user = session.execute(stmt).scalar_one_or_none()
|
|
if (user == None):
|
|
user = User(id = user_id, number_of_comments = 0)
|
|
session.add(user)
|
|
return user
|
|
|
|
def increase_number_of_comments(user_id : int, session : Session):
|
|
User.get_user(user_id, session).number_of_comments+=1
|
|
|
|
def get_number_of_comments(user_id : int, session : Session) -> int:
|
|
return User.get_user(user_id, session).number_of_comments
|
|
|
|
def reset_number_of_comments(user_id : int, session : Session):
|
|
User.get_user(user_id, session).number_of_comments = 0
|
|
|
|
def reset_all_comments(session : Session):
|
|
stmt = sqlalchemy.select(User)
|
|
all_comments = session.execute(stmt).scalars().fetchall()
|
|
|
|
for comment in all_comments:
|
|
comment.number_of_comments = 0
|
|
|
|
session.flush()
|
|
session.commit()
|
|
|
|
class Comment(Base):
|
|
__tablename__ = "comment"
|
|
|
|
id = Column(Integer, primary_key = True)
|
|
user_comment_id = Column(Integer)
|
|
bbbb_comment_id = Column(Integer)
|
|
|
|
def get_user_comment(user_comment_id:int, session : Session):
|
|
stmt = sqlalchemy.select(Comment).where(Comment.user_comment_id == user_comment_id)
|
|
comments = session.execute(stmt).scalars().fetchall()
|
|
|
|
if len(comments) == 0:
|
|
return None
|
|
else:
|
|
return comments[0]
|
|
|
|
def get_bbbb_comment(bbbb_comment_id:int, session : Session):
|
|
stmt = sqlalchemy.select(Comment).where(Comment.bbbb_comment_id == bbbb_comment_id)
|
|
comments = session.execute(stmt).scalars().fetchall()
|
|
|
|
if len(comments) == 0:
|
|
return None
|
|
else:
|
|
return comments[0]
|
|
|
|
def get_comment(comment_id : int, session : Session):
|
|
user_comment = Comment.get_user_comment(comment_id, session)
|
|
if (user_comment is not None):
|
|
return user_comment
|
|
else:
|
|
return Comment.get_bbbb_comment(comment_id, session)
|
|
|
|
def has_replied_to_comment(comment_id : int, session : Session):
|
|
return Comment.get_comment(comment_id, session) == None
|
|
|
|
def create_new_comment(user_comment_id : int, bbbb_comment_id : int, session : Session):
|
|
comment = Comment(user_comment_id = user_comment_id, bbbb_comment_id = bbbb_comment_id)
|
|
session.add(comment)
|
|
|
|
class Post(Base):
|
|
__tablename__ = "post"
|
|
|
|
id = Column(Integer, primary_key = True)
|
|
has_replied = Column(Boolean, default = False)
|
|
replies_to_post = Column(Integer, default = 0)
|
|
|
|
def get_post(post_id : int, session : Session):
|
|
stmt = sqlalchemy.select(Post).where(Post.id == post_id)
|
|
post = session.execute(stmt).scalar_one_or_none()
|
|
if (post == None):
|
|
post = Post(id = post_id)
|
|
session.add(post)
|
|
return post
|
|
|
|
def has_replied_to_post(post_id : int, session : Session):
|
|
return Post.get_post(post_id, session).has_replied
|
|
|
|
def increment_replies(post_id : int, session : Session):
|
|
Post.get_post(post_id, session).replies_to_post += 1
|
|
|
|
def get_number_of_replies(post_id : int, session : Session):
|
|
replies = Post.get_post(post_id, session).replies_to_post
|
|
if replies == None:
|
|
replies = 0
|
|
|
|
return replies
|
|
|
|
def register_post_reply(post_id : int, session : Session):
|
|
Post.get_post(post_id, session).has_replied = True
|
|
|
|
'''
|
|
Just for keeping a log of all comments we encounter
|
|
'''
|
|
class CommentString(Base):
|
|
__tablename__ = "comment_string"
|
|
id = Column(Integer, primary_key = True)
|
|
user_id = Column(Integer)
|
|
time = Column(DateTime)
|
|
text = Column(String)
|
|
|
|
def add_comment(comment_id : int, user_id : int, time : datetime, text: str, session : Session):
|
|
# Check if it exists
|
|
|
|
stmt = sqlalchemy.select(CommentString).where(CommentString.id == comment_id)
|
|
comment = session.execute(stmt).scalar_one_or_none()
|
|
if (comment == None):
|
|
comment = CommentString(
|
|
id = comment_id,
|
|
user_id = user_id,
|
|
time = time,
|
|
text = text
|
|
)
|
|
session.add(comment)
|
|
|
|
'''
|
|
Mostly for keeping track of how long it's been since the last run, so we can run things without worrying about set number of pages
|
|
'''
|
|
class ScriptCall(Base):
|
|
__tablename__ = "script_call"
|
|
|
|
id = Column(Integer, primary_key = True)
|
|
time = Column(DateTime)
|
|
|
|
def register(session : Session):
|
|
sc = ScriptCall(time = datetime.now())
|
|
session.add(sc)
|
|
|
|
def get_time_of_last_run(session) -> datetime:
|
|
stmt = sqlalchemy.select(ScriptCall).order_by(ScriptCall.time.desc()).limit(1)
|
|
sc = session.execute(stmt).scalar_one_or_none()
|
|
|
|
if (sc == None):
|
|
return datetime.now()
|
|
else:
|
|
return sc.time
|
|
|
|
|
|
|