Added Redis subscription, event, and graceful shutdown

pull/2/head
j 2024-03-23 00:59:33 -04:00
parent 7ef10dbb0f
commit b7a01becb7
1 changed files with 52 additions and 15 deletions

View File

@ -5,8 +5,9 @@ import WorkflowOrchestrator from './workflows/WorkflowOrchestrator';
import rDramaSession from './rdrama/session/SessionManager';
import redditSession from './reddit/session/SessionManager';
import { DatabaseInitializer } from './db/initializeDatabase';
import { DatabaseService } from './db/services/Database';
import { DatabaseMaintenanceService } from './db/services/DatabaseMaintenance';
import RedisSessionManager from './redis/session/SessionManager';
import { Comment } from './rdrama/models/Comment';
// Import other necessary services or configurations
async function startApplication() {
@ -16,12 +17,6 @@ async function startApplication() {
if (!db) {
throw new Error('Failed to initialize the database.');
}
const canSend = await DatabaseService.canSendNotification();
const coolDownHours = process.env.NOTIFICATION_COOLDOWN_HOURS
if (!canSend) {
console.log(`Last Message Sent less than ${coolDownHours ? coolDownHours : 4} hours ago. Set NOTIFICATION_COOLDOWN_HOURS to change this`)
return;
}
console.log('RDrama Session Start')
// Initialize SessionManager or other global configurations
@ -31,18 +26,60 @@ async function startApplication() {
}
rDramaSessionManager.setAuthorizationToken(process.env.RDRAMA_API_KEY);
console.log('Database Maintenance Start')
const databaseMaintenance = new DatabaseMaintenanceService()
await databaseMaintenance.runMaintenanceTasks()
console.log('Redis Start');
const redisManager = RedisSessionManager.getInstance();
redisManager.client.subscribe('newCommentsChannel', (error, count) => {
if (error) throw new Error('Failed to subscribe to Redis channel.');
console.log(`Subscribed to ${'newCommentsChannel'}. Listening for new comments.`);
});
console.log('Reddit Session Start')
await redditSession.getInstance()
const redditSessionManager = await redditSession.getInstance()
// Initialize and start your workflow
const workflowOrchestrator = new WorkflowOrchestrator();
await workflowOrchestrator.executeWorkflow();
await rDramaSessionManager.shutdown()
redisManager.client.on('message', async (channel, message) => {
if (channel === 'newCommentsChannel') {
const comment: Comment = JSON.parse(message)
console.log(`New comment ${comment.id} received.`);
await workflowOrchestrator.executeWorkflow(comment);
}
});
setupProcessListeners(redisManager, redditSessionManager, rDramaSessionManager);
}
function setupProcessListeners(redisManager: RedisSessionManager, redditSessionManager: redditSession, rDramaSessionManager: rDramaSession) {
process.on('SIGINT', async () => {
console.log('SIGINT received. Shutting down gracefully...');
await shutdownServices(redisManager, redditSessionManager, rDramaSessionManager);
});
process.on('SIGTERM', async () => {
console.log('SIGTERM received. Shutting down gracefully...');
await shutdownServices(redisManager, redditSessionManager, rDramaSessionManager);
});
process.on('uncaughtException', async (error) => {
console.error('Uncaught Exception:', error);
await shutdownServices(redisManager, redditSessionManager, rDramaSessionManager);
process.exit(1); // Exit with error
});
process.on('unhandledRejection', async (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
await shutdownServices(redisManager, redditSessionManager, rDramaSessionManager);
process.exit(1); // Exit with error
});
}
async function shutdownServices(redisManager: RedisSessionManager, redditSessionManager: redditSession, rDramaSessionManager: rDramaSession) {
await redisManager.shutdown();
await redditSessionManager.shutdown();
await rDramaSessionManager.shutdown();
console.log('Services shut down successfully.');
process.exit(0); // Exit cleanly
}
startApplication()