From b7a01becb725fc6a5a624b765d2fccfca1a6bd7b Mon Sep 17 00:00:00 2001 From: j Date: Sat, 23 Mar 2024 00:59:33 -0400 Subject: [PATCH] Added Redis subscription, event, and graceful shutdown --- src/index.ts | 67 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/src/index.ts b/src/index.ts index 11f8ed8..3a441f8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,8 +5,9 @@ import WorkflowOrchestrator from './workflows/WorkflowOrchestrator'; import rDramaSession from './rdrama/session/SessionManager'; import redditSession from './reddit/session/SessionManager'; import { DatabaseInitializer } from './db/initializeDatabase'; -import { DatabaseService } from './db/services/Database'; -import { DatabaseMaintenanceService } from './db/services/DatabaseMaintenance'; +import RedisSessionManager from './redis/session/SessionManager'; +import { Comment } from './rdrama/models/Comment'; + // Import other necessary services or configurations async function startApplication() { @@ -16,12 +17,6 @@ async function startApplication() { if (!db) { throw new Error('Failed to initialize the database.'); } - const canSend = await DatabaseService.canSendNotification(); - const coolDownHours = process.env.NOTIFICATION_COOLDOWN_HOURS - if (!canSend) { - console.log(`Last Message Sent less than ${coolDownHours ? coolDownHours : 4} hours ago. Set NOTIFICATION_COOLDOWN_HOURS to change this`) - return; - } console.log('RDrama Session Start') // Initialize SessionManager or other global configurations @@ -31,18 +26,60 @@ async function startApplication() { } rDramaSessionManager.setAuthorizationToken(process.env.RDRAMA_API_KEY); - console.log('Database Maintenance Start') - const databaseMaintenance = new DatabaseMaintenanceService() - await databaseMaintenance.runMaintenanceTasks() + console.log('Redis Start'); + const redisManager = RedisSessionManager.getInstance(); + redisManager.client.subscribe('newCommentsChannel', (error, count) => { + if (error) throw new Error('Failed to subscribe to Redis channel.'); + console.log(`Subscribed to ${'newCommentsChannel'}. Listening for new comments.`); + }); console.log('Reddit Session Start') - await redditSession.getInstance() + const redditSessionManager = await redditSession.getInstance() - // Initialize and start your workflow const workflowOrchestrator = new WorkflowOrchestrator(); - await workflowOrchestrator.executeWorkflow(); - await rDramaSessionManager.shutdown() + redisManager.client.on('message', async (channel, message) => { + if (channel === 'newCommentsChannel') { + const comment: Comment = JSON.parse(message) + console.log(`New comment ${comment.id} received.`); + await workflowOrchestrator.executeWorkflow(comment); + } + }); + + setupProcessListeners(redisManager, redditSessionManager, rDramaSessionManager); +} + + +function setupProcessListeners(redisManager: RedisSessionManager, redditSessionManager: redditSession, rDramaSessionManager: rDramaSession) { + process.on('SIGINT', async () => { + console.log('SIGINT received. Shutting down gracefully...'); + await shutdownServices(redisManager, redditSessionManager, rDramaSessionManager); + }); + + process.on('SIGTERM', async () => { + console.log('SIGTERM received. Shutting down gracefully...'); + await shutdownServices(redisManager, redditSessionManager, rDramaSessionManager); + }); + + process.on('uncaughtException', async (error) => { + console.error('Uncaught Exception:', error); + await shutdownServices(redisManager, redditSessionManager, rDramaSessionManager); + process.exit(1); // Exit with error + }); + + process.on('unhandledRejection', async (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason); + await shutdownServices(redisManager, redditSessionManager, rDramaSessionManager); + process.exit(1); // Exit with error + }); +} + +async function shutdownServices(redisManager: RedisSessionManager, redditSessionManager: redditSession, rDramaSessionManager: rDramaSession) { + await redisManager.shutdown(); + await redditSessionManager.shutdown(); + await rDramaSessionManager.shutdown(); + console.log('Services shut down successfully.'); + process.exit(0); // Exit cleanly } startApplication()