97 lines
3.6 KiB
TypeScript
97 lines
3.6 KiB
TypeScript
const async = require('async');
|
|
import dotenv from 'dotenv';
|
|
dotenv.config();
|
|
|
|
import WorkflowOrchestrator from './workflows/WorkflowOrchestrator';
|
|
import rDramaSession from './rdrama/session/SessionManager';
|
|
import { DatabaseInitializer } from './db/initializeDatabase';
|
|
import RedisSessionManager from './redis/session/SessionManager';
|
|
import { Comment } from './rdrama/models/Comment';
|
|
|
|
// Import other necessary services or configurations
|
|
|
|
async function startApplication() {
|
|
console.log('Database Start')
|
|
const databaseInitializer = DatabaseInitializer.getInstance();
|
|
const db = await databaseInitializer.getDbInstance()
|
|
if (!db) {
|
|
throw new Error('Failed to initialize the database.');
|
|
}
|
|
|
|
console.log('RDrama Session Start')
|
|
// Initialize SessionManager or other global configurations
|
|
const rDramaSessionManager = rDramaSession.getInstance();
|
|
if (!process.env.RDRAMA_API_KEY) {
|
|
throw new Error('RDRAMA_API_KEY is undefined. Please set this environment variable.');
|
|
}
|
|
rDramaSessionManager.setAuthorizationToken(process.env.RDRAMA_API_KEY);
|
|
|
|
console.log('Redis Start');
|
|
const redisManager = RedisSessionManager.getInstance();
|
|
redisManager.client.subscribe('newCommentsChannel', (error, count) => {
|
|
if (error) throw new Error('Failed to subscribe to Redis channel.');
|
|
console.log(`Subscribed to ${'newCommentsChannel'}. Listening for new comments.`);
|
|
});
|
|
|
|
const workflowOrchestrator = new WorkflowOrchestrator();
|
|
|
|
|
|
|
|
const commentQueue = async.queue(async (comment: Comment, callback: () => void) => {
|
|
try {
|
|
await workflowOrchestrator.executeWorkflow(comment);
|
|
console.log(`Processed comment ${comment.id}`);
|
|
} catch (error) {
|
|
console.error(`Error processing comment ${comment.id}`, error);
|
|
}
|
|
callback();
|
|
}, 1);
|
|
|
|
redisManager.client.on('message', (channel, message) => {
|
|
if (channel === 'newCommentsChannel') {
|
|
const comment = JSON.parse(message);
|
|
console.log(`New comment ${comment.id} received.`);
|
|
// Push the comment to the queue instead of processing it directly
|
|
commentQueue.push(comment);
|
|
}
|
|
});
|
|
|
|
setupProcessListeners(redisManager, rDramaSessionManager);
|
|
}
|
|
|
|
|
|
function setupProcessListeners(redisManager: RedisSessionManager, rDramaSessionManager: rDramaSession) {
|
|
process.on('SIGINT', async () => {
|
|
console.log('SIGINT received. Shutting down gracefully...');
|
|
await shutdownServices(redisManager, rDramaSessionManager);
|
|
});
|
|
|
|
process.on('SIGTERM', async () => {
|
|
console.log('SIGTERM received. Shutting down gracefully...');
|
|
await shutdownServices(redisManager, rDramaSessionManager);
|
|
});
|
|
|
|
process.on('uncaughtException', async (error) => {
|
|
console.error('Uncaught Exception:', error);
|
|
await shutdownServices(redisManager, rDramaSessionManager);
|
|
process.exit(1); // Exit with error
|
|
});
|
|
|
|
process.on('unhandledRejection', async (reason, promise) => {
|
|
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
|
|
await shutdownServices(redisManager, rDramaSessionManager);
|
|
process.exit(1); // Exit with error
|
|
});
|
|
}
|
|
|
|
async function shutdownServices(redisManager: RedisSessionManager, rDramaSessionManager: rDramaSession) {
|
|
await redisManager.shutdown();
|
|
await rDramaSessionManager.shutdown();
|
|
console.log('Services shut down successfully.');
|
|
process.exit(0); // Exit cleanly
|
|
}
|
|
|
|
startApplication()
|
|
.then(() => console.log('Application started successfully.'))
|
|
.catch((error) => console.error('Application failed to start:', error));
|