From 10531405766fc5a3f2ef880e31fe101df5995f96 Mon Sep 17 00:00:00 2001 From: j Date: Sat, 6 Apr 2024 02:05:09 -0400 Subject: [PATCH] Serialize Processing to ensure all comments are processed in order recieved (may reverse later) --- package-lock.json | 11 +++++++++++ package.json | 3 ++- src/index.ts | 20 ++++++++++++++++---- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/package-lock.json b/package-lock.json index 5ec99ef..bb4c8a3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5,6 +5,7 @@ "packages": { "": { "dependencies": { + "async": "^3.2.5", "axios": "^1.6.7", "axios-request-throttle": "^1.0.0", "axios-retry": "^4.0.0", @@ -173,6 +174,11 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/async": { + "version": "3.2.5", + "resolved": "https://registry.npmjs.org/async/-/async-3.2.5.tgz", + "integrity": "sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg==" + }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -2209,6 +2215,11 @@ "readable-stream": "^3.6.0" } }, + "async": { + "version": "3.2.5", + "resolved": "https://registry.npmjs.org/async/-/async-3.2.5.tgz", + "integrity": "sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg==" + }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", diff --git a/package.json b/package.json index 7e9790e..88320fb 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,6 @@ { "dependencies": { + "async": "^3.2.5", "axios": "^1.6.7", "axios-request-throttle": "^1.0.0", "axios-retry": "^4.0.0", @@ -25,4 +26,4 @@ "start": "npm run build && node dist/index.js", "test": "tsc -p tsconfig.tests.json" } -} \ No newline at end of file +} diff --git a/src/index.ts b/src/index.ts index f150819..59585d4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ +const async = require('async'); import dotenv from 'dotenv'; dotenv.config(); @@ -36,11 +37,22 @@ async function startApplication() { - redisManager.client.on('message', async (channel, message) => { - if (channel === 'newCommentsChannel') { - const comment: Comment = JSON.parse(message) - console.log(`New comment ${comment.id} received.`); + const commentQueue = async.queue(async (comment: Comment, callback: () => void) => { + try { await workflowOrchestrator.executeWorkflow(comment); + console.log(`Processed comment ${comment.id}`); + } catch (error) { + console.error(`Error processing comment ${comment.id}`, error); + } + callback(); + }, 1); + + redisManager.client.on('message', (channel, message) => { + if (channel === 'newCommentsChannel') { + const comment = JSON.parse(message); + console.log(`New comment ${comment.id} received.`); + // Push the comment to the queue instead of processing it directly + commentQueue.push(comment); } });