from datetime import datetime from email.policy import default from sqlite3 import Date import time import openai import sqlalchemy from sqlalchemy.orm import declarative_base, Session from sqlalchemy import Column, DateTime, String, ForeignKey, Integer, Boolean, Table, and_, or_ Base = declarative_base() class User(Base): __tablename__ = "user" id = Column(Integer, primary_key = True) number_of_comments = Column(Integer, default = 0) def get_user(user_id : int, session : Session): stmt = sqlalchemy.select(User).where(User.id == user_id) user = session.execute(stmt).scalar_one_or_none() if (user == None): user = User(id = user_id, number_of_comments = 0) session.add(user) return user def increase_number_of_comments(user_id : int, session : Session): User.get_user(user_id, session).number_of_comments+=1 def get_number_of_comments(user_id : int, session : Session) -> int: return User.get_user(user_id, session).number_of_comments def reset_number_of_comments(user_id : int, session : Session): User.get_user(user_id, session).number_of_comments = 0 def reset_all_comments(session : Session): stmt = sqlalchemy.select(User) all_comments = session.execute(stmt).scalars().fetchall() for comment in all_comments: comment.number_of_comments = 0 session.flush() session.commit() class Comment(Base): __tablename__ = "comment" id = Column(Integer, primary_key = True) user_comment_id = Column(Integer) bbbb_comment_id = Column(Integer) def get_user_comment(user_comment_id:int, session : Session): stmt = sqlalchemy.select(Comment).where(Comment.user_comment_id == user_comment_id) comments = session.execute(stmt).scalars().fetchall() if len(comments) == 0: return None else: return comments[0] def get_bbbb_comment(bbbb_comment_id:int, session : Session): stmt = sqlalchemy.select(Comment).where(Comment.bbbb_comment_id == bbbb_comment_id) comments = session.execute(stmt).scalars().fetchall() if len(comments) == 0: return None else: return comments[0] def get_comment(comment_id : int, session : Session): user_comment = Comment.get_user_comment(comment_id, session) if (user_comment is not None): return user_comment else: return Comment.get_bbbb_comment(comment_id, session) def has_replied_to_comment(comment_id : int, session : Session): return Comment.get_comment(comment_id, session) == None def create_new_comment(user_comment_id : int, bbbb_comment_id : int, session : Session): comment = Comment(user_comment_id = user_comment_id, bbbb_comment_id = bbbb_comment_id) session.add(comment) class Post(Base): __tablename__ = "post" id = Column(Integer, primary_key = True) has_replied = Column(Boolean, default = False) replies_to_post = Column(Integer, default = 0) def get_post(post_id : int, session : Session): stmt = sqlalchemy.select(Post).where(Post.id == post_id) post = session.execute(stmt).scalar_one_or_none() if (post == None): post = Post(id = post_id) session.add(post) return post def has_replied_to_post(post_id : int, session : Session): return Post.get_post(post_id, session).has_replied def increment_replies(post_id : int, session : Session): Post.get_post(post_id, session).replies_to_post += 1 def get_number_of_replies(post_id : int, session : Session): replies = Post.get_post(post_id, session).replies_to_post if replies == None: replies = 0 return replies def register_post_reply(post_id : int, session : Session): Post.get_post(post_id, session).has_replied = True ''' Just for keeping a log of all comments we encounter ''' class CommentString(Base): __tablename__ = "comment_string" id = Column(Integer, primary_key = True) user_id = Column(Integer) time = Column(DateTime) text = Column(String) def add_comment(comment_id : int, user_id : int, time : datetime, text: str, session : Session): # Check if it exists stmt = sqlalchemy.select(CommentString).where(CommentString.id == comment_id) comment = session.execute(stmt).scalar_one_or_none() if (comment == None): comment = CommentString( id = comment_id, user_id = user_id, time = time, text = text ) session.add(comment) ''' Mostly for keeping track of how long it's been since the last run, so we can run things without worrying about set number of pages ''' class ScriptCall(Base): __tablename__ = "script_call" id = Column(Integer, primary_key = True) time = Column(DateTime) def register(session : Session): sc = ScriptCall(time = datetime.now()) session.add(sc) def get_time_of_last_run(session) -> datetime: stmt = sqlalchemy.select(ScriptCall).order_by(ScriptCall.time.desc()).limit(1) sc = session.execute(stmt).scalar_one_or_none() if (sc == None): return datetime.now() else: return sc.time