automeme/BotModels.py

162 lines
5.4 KiB
Python

from datetime import datetime
from email.policy import default
from sqlite3 import Date
import time
import openai
import sqlalchemy
from sqlalchemy.orm import declarative_base, Session
from sqlalchemy import Column, DateTime, String, ForeignKey, Integer, Boolean, Table, and_, or_
Base = declarative_base()
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key = True)
number_of_comments = Column(Integer, default = 0)
def get_user(user_id : int, session : Session):
stmt = sqlalchemy.select(User).where(User.id == user_id)
user = session.execute(stmt).scalar_one_or_none()
if (user == None):
user = User(id = user_id, number_of_comments = 0)
session.add(user)
return user
def increase_number_of_comments(user_id : int, session : Session):
User.get_user(user_id, session).number_of_comments+=1
def get_number_of_comments(user_id : int, session : Session) -> int:
return User.get_user(user_id, session).number_of_comments
def reset_number_of_comments(user_id : int, session : Session):
User.get_user(user_id, session).number_of_comments = 0
def reset_all_comments(session : Session):
stmt = sqlalchemy.select(User)
all_comments = session.execute(stmt).scalars().fetchall()
for comment in all_comments:
comment.number_of_comments = 0
session.flush()
session.commit()
class Comment(Base):
__tablename__ = "comment"
id = Column(Integer, primary_key = True)
user_comment_id = Column(Integer)
bbbb_comment_id = Column(Integer)
def get_user_comment(user_comment_id:int, session : Session):
stmt = sqlalchemy.select(Comment).where(Comment.user_comment_id == user_comment_id)
comments = session.execute(stmt).scalars().fetchall()
if len(comments) == 0:
return None
else:
return comments[0]
def get_bbbb_comment(bbbb_comment_id:int, session : Session):
stmt = sqlalchemy.select(Comment).where(Comment.bbbb_comment_id == bbbb_comment_id)
comments = session.execute(stmt).scalars().fetchall()
if len(comments) == 0:
return None
else:
return comments[0]
def get_comment(comment_id : int, session : Session):
user_comment = Comment.get_user_comment(comment_id, session)
if (user_comment is not None):
return user_comment
else:
return Comment.get_bbbb_comment(comment_id, session)
def has_replied_to_comment(comment_id : int, session : Session):
return Comment.get_comment(comment_id, session) == None
def create_new_comment(user_comment_id : int, bbbb_comment_id : int, session : Session):
comment = Comment(user_comment_id = user_comment_id, bbbb_comment_id = bbbb_comment_id)
session.add(comment)
class Post(Base):
__tablename__ = "post"
id = Column(Integer, primary_key = True)
has_replied = Column(Boolean, default = False)
replies_to_post = Column(Integer, default = 0)
def get_post(post_id : int, session : Session):
stmt = sqlalchemy.select(Post).where(Post.id == post_id)
post = session.execute(stmt).scalar_one_or_none()
if (post == None):
post = Post(id = post_id)
session.add(post)
return post
def has_replied_to_post(post_id : int, session : Session):
return Post.get_post(post_id, session).has_replied
def increment_replies(post_id : int, session : Session):
Post.get_post(post_id, session).replies_to_post += 1
def get_number_of_replies(post_id : int, session : Session):
replies = Post.get_post(post_id, session).replies_to_post
if replies == None:
replies = 0
return replies
def register_post_reply(post_id : int, session : Session):
Post.get_post(post_id, session).has_replied = True
'''
Just for keeping a log of all comments we encounter
'''
class CommentString(Base):
__tablename__ = "comment_string"
id = Column(Integer, primary_key = True)
user_id = Column(Integer)
time = Column(DateTime)
text = Column(String)
def add_comment(comment_id : int, user_id : int, time : datetime, text: str, session : Session):
# Check if it exists
stmt = sqlalchemy.select(CommentString).where(CommentString.id == comment_id)
comment = session.execute(stmt).scalar_one_or_none()
if (comment == None):
comment = CommentString(
id = comment_id,
user_id = user_id,
time = time,
text = text
)
session.add(comment)
'''
Mostly for keeping track of how long it's been since the last run, so we can run things without worrying about set number of pages
'''
class ScriptCall(Base):
__tablename__ = "script_call"
id = Column(Integer, primary_key = True)
time = Column(DateTime)
def register(session : Session):
sc = ScriptCall(time = datetime.now())
session.add(sc)
def get_time_of_last_run(session) -> datetime:
stmt = sqlalchemy.select(ScriptCall).order_by(ScriptCall.time.desc()).limit(1)
sc = session.execute(stmt).scalar_one_or_none()
if (sc == None):
return datetime.now()
else:
return sc.time