Compare commits
4 Commits
657b11e36b
...
1e07f50fc4
Author | SHA1 | Date |
---|---|---|
j | 1e07f50fc4 | |
j | fbdd22909a | |
j | 1f8ad214db | |
j | e2eea38644 |
|
@ -22,7 +22,7 @@ async function startApplication() {
|
||||||
// Initialize and start workflow
|
// Initialize and start workflow
|
||||||
try {
|
try {
|
||||||
await CommentProcessor.processComments();
|
await CommentProcessor.processComments();
|
||||||
console.log('Workflow executed successfully.');
|
console.log(`${new Date()} Workflow executed successfully.`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('An error occurred during workflow execution:', error);
|
console.error('An error occurred during workflow execution:', error);
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ async function startApplication() {
|
||||||
setInterval(async () => {
|
setInterval(async () => {
|
||||||
try {
|
try {
|
||||||
await CommentProcessor.processComments();
|
await CommentProcessor.processComments();
|
||||||
console.log('Workflow executed successfully.');
|
console.log(`${new Date()} Workflow executed successfully.`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('An error occurred during workflow execution:', error);
|
console.error('An error occurred during workflow execution:', error);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ export class CommentFetcher {
|
||||||
* @throws {Error} Throws an error if there is a failure in fetching comments from the API.
|
* @throws {Error} Throws an error if there is a failure in fetching comments from the API.
|
||||||
*/
|
*/
|
||||||
static async fetchComments(page: number): Promise<Comment[]> {
|
static async fetchComments(page: number): Promise<Comment[]> {
|
||||||
console.time(`Fetching page: ${page}`);
|
console.time(`${new Date()} Fetching page: ${page}`);
|
||||||
try {
|
try {
|
||||||
const axiosInstance = SessionManager.getInstance().axiosInstance;
|
const axiosInstance = SessionManager.getInstance().axiosInstance;
|
||||||
const response = await axiosInstance.get(`/comments?page=${page}`);
|
const response = await axiosInstance.get(`/comments?page=${page}`);
|
||||||
|
|
|
@ -32,7 +32,7 @@ export class CommentProcessor {
|
||||||
|
|
||||||
const exists = await RedisCommentService.retrieveComment(comment.id.toString());
|
const exists = await RedisCommentService.retrieveComment(comment.id.toString());
|
||||||
if (exists) {
|
if (exists) {
|
||||||
console.log(`Comment ${comment.id} exists`)
|
console.log(`${new Date()} Comment ${comment.id} exists`)
|
||||||
stopFetching = true;
|
stopFetching = true;
|
||||||
break; // Stop processing this batch of comments
|
break; // Stop processing this batch of comments
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@ export class CommentProcessor {
|
||||||
|
|
||||||
// Optionally publish the comment to a Redis channel for subscribers
|
// Optionally publish the comment to a Redis channel for subscribers
|
||||||
await RedisCommentService.publishComment(comment);
|
await RedisCommentService.publishComment(comment);
|
||||||
console.log(`Published Comment ${comment.id}`)
|
console.log(`${new Date()} Published Comment ${comment.id}`)
|
||||||
// prevent accidental recognition of a previous comment
|
// prevent accidental recognition of a previous comment
|
||||||
comments.push(comment);
|
comments.push(comment);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,13 +20,18 @@ class SessionManager {
|
||||||
datastore: "ioredis",
|
datastore: "ioredis",
|
||||||
clearDatastore: false,
|
clearDatastore: false,
|
||||||
clientOptions: {
|
clientOptions: {
|
||||||
host: process.env.REDIS_HOST,
|
host: process.env.REDIS_HOST!,
|
||||||
port: Number(process.env.REDIS_PORT),
|
port: Number(process.env.REDIS_PORT),
|
||||||
password: process.env.REDIS_PASSWORD || undefined, // Use undefined if no password is set
|
password: process.env.REDIS_PASSWORD || undefined, // Use undefined if no password is set
|
||||||
enableOfflineQueue: true
|
enableOfflineQueue: true
|
||||||
},
|
},
|
||||||
maxConcurrent: 1, // Maximum number of concurrent requests
|
maxConcurrent: 1, // Maximum number of concurrent requests
|
||||||
minTime: 1000 // Minimum time between dispatches of requests in milliseconds
|
minTime: 1000, // Minimum time between dispatches of requests in milliseconds
|
||||||
|
reservoir: 41, // Initial reservoir value for the first hour
|
||||||
|
reservoirRefreshAmount: 41, // Reservoir value to reset to every hour
|
||||||
|
reservoirRefreshInterval: 60 * 60 * 1000, // Interval to reset the reservoir (1 hour in milliseconds)
|
||||||
|
highWater: 5, // Maximum number of queued jobs.
|
||||||
|
strategy: Bottleneck.strategy.OVERFLOW_PRIORITY // Strategy to drop the oldest jobs in the queue when highWater is reached
|
||||||
});
|
});
|
||||||
|
|
||||||
this.axiosInstance = axios.create({
|
this.axiosInstance = axios.create({
|
||||||
|
@ -82,13 +87,13 @@ class SessionManager {
|
||||||
// Wrap the post method
|
// Wrap the post method
|
||||||
const originalPost = instance.post;
|
const originalPost = instance.post;
|
||||||
instance.post = <T = any, R = AxiosResponse<T>>(url: string, data?: any, config?: AxiosRequestConfig): Promise<R> => {
|
instance.post = <T = any, R = AxiosResponse<T>>(url: string, data?: any, config?: AxiosRequestConfig): Promise<R> => {
|
||||||
return this.limiter.schedule(() => originalPost.apply(instance, [url, data, config])) as Promise<R>;
|
return this.limiter.schedule({ priority: 1 }, () => originalPost.apply(instance, [url, data, config])) as Promise<R>;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Wrap the put method
|
// Wrap the put method
|
||||||
const originalPut = instance.put;
|
const originalPut = instance.put;
|
||||||
instance.put = <T = any, R = AxiosResponse<T>>(url: string, data?: any, config?: AxiosRequestConfig): Promise<R> => {
|
instance.put = <T = any, R = AxiosResponse<T>>(url: string, data?: any, config?: AxiosRequestConfig): Promise<R> => {
|
||||||
return this.limiter.schedule(() => originalPut.apply(instance, [url, data, config])) as Promise<R>;
|
return this.limiter.schedule({ priority: 1 }, () => originalPut.apply(instance, [url, data, config])) as Promise<R>;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Wrap the delete method
|
// Wrap the delete method
|
||||||
|
|
Loading…
Reference in New Issue