Serialize Processing to ensure all comments are processed in order recieved (may reverse later)

master
j 2024-04-06 02:05:09 -04:00
parent d779175b90
commit 1053140576
3 changed files with 29 additions and 5 deletions

11
package-lock.json generated
View File

@ -5,6 +5,7 @@
"packages": {
"": {
"dependencies": {
"async": "^3.2.5",
"axios": "^1.6.7",
"axios-request-throttle": "^1.0.0",
"axios-retry": "^4.0.0",
@ -173,6 +174,11 @@
"node": "^12.13.0 || ^14.15.0 || >=16.0.0"
}
},
"node_modules/async": {
"version": "3.2.5",
"resolved": "https://registry.npmjs.org/async/-/async-3.2.5.tgz",
"integrity": "sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg=="
},
"node_modules/asynckit": {
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
@ -2209,6 +2215,11 @@
"readable-stream": "^3.6.0"
}
},
"async": {
"version": "3.2.5",
"resolved": "https://registry.npmjs.org/async/-/async-3.2.5.tgz",
"integrity": "sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg=="
},
"asynckit": {
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",

View File

@ -1,5 +1,6 @@
{
"dependencies": {
"async": "^3.2.5",
"axios": "^1.6.7",
"axios-request-throttle": "^1.0.0",
"axios-retry": "^4.0.0",
@ -25,4 +26,4 @@
"start": "npm run build && node dist/index.js",
"test": "tsc -p tsconfig.tests.json"
}
}
}

View File

@ -1,3 +1,4 @@
const async = require('async');
import dotenv from 'dotenv';
dotenv.config();
@ -36,11 +37,22 @@ async function startApplication() {
redisManager.client.on('message', async (channel, message) => {
if (channel === 'newCommentsChannel') {
const comment: Comment = JSON.parse(message)
console.log(`New comment ${comment.id} received.`);
const commentQueue = async.queue(async (comment: Comment, callback: () => void) => {
try {
await workflowOrchestrator.executeWorkflow(comment);
console.log(`Processed comment ${comment.id}`);
} catch (error) {
console.error(`Error processing comment ${comment.id}`, error);
}
callback();
}, 1);
redisManager.client.on('message', (channel, message) => {
if (channel === 'newCommentsChannel') {
const comment = JSON.parse(message);
console.log(`New comment ${comment.id} received.`);
// Push the comment to the queue instead of processing it directly
commentQueue.push(comment);
}
});