Compare commits

...

4 Commits

Author SHA1 Message Date
j 1e07f50fc4 addtl consoling 2024-03-31 00:23:00 -04:00
j fbdd22909a addtl consoling 2024-03-31 00:22:49 -04:00
j 1f8ad214db rdrama reservoir with OVERFLOW_PRIORITY highwater strategy, asserted that host would be present in env 2024-03-31 00:21:35 -04:00
j e2eea38644 addtl consoling 2024-03-31 00:21:04 -04:00
4 changed files with 14 additions and 9 deletions

View File

@ -22,7 +22,7 @@ async function startApplication() {
// Initialize and start workflow
try {
await CommentProcessor.processComments();
console.log('Workflow executed successfully.');
console.log(`${new Date()} Workflow executed successfully.`);
} catch (error) {
console.error('An error occurred during workflow execution:', error);
}
@ -31,7 +31,7 @@ async function startApplication() {
setInterval(async () => {
try {
await CommentProcessor.processComments();
console.log('Workflow executed successfully.');
console.log(`${new Date()} Workflow executed successfully.`);
} catch (error) {
console.error('An error occurred during workflow execution:', error);
}

View File

@ -15,7 +15,7 @@ export class CommentFetcher {
* @throws {Error} Throws an error if there is a failure in fetching comments from the API.
*/
static async fetchComments(page: number): Promise<Comment[]> {
console.time(`Fetching page: ${page}`);
console.time(`${new Date()} Fetching page: ${page}`);
try {
const axiosInstance = SessionManager.getInstance().axiosInstance;
const response = await axiosInstance.get(`/comments?page=${page}`);

View File

@ -32,7 +32,7 @@ export class CommentProcessor {
const exists = await RedisCommentService.retrieveComment(comment.id.toString());
if (exists) {
console.log(`Comment ${comment.id} exists`)
console.log(`${new Date()} Comment ${comment.id} exists`)
stopFetching = true;
break; // Stop processing this batch of comments
}
@ -41,7 +41,7 @@ export class CommentProcessor {
// Optionally publish the comment to a Redis channel for subscribers
await RedisCommentService.publishComment(comment);
console.log(`Published Comment ${comment.id}`)
console.log(`${new Date()} Published Comment ${comment.id}`)
// prevent accidental recognition of a previous comment
comments.push(comment);
}

View File

@ -20,13 +20,18 @@ class SessionManager {
datastore: "ioredis",
clearDatastore: false,
clientOptions: {
host: process.env.REDIS_HOST,
host: process.env.REDIS_HOST!,
port: Number(process.env.REDIS_PORT),
password: process.env.REDIS_PASSWORD || undefined, // Use undefined if no password is set
enableOfflineQueue: true
},
maxConcurrent: 1, // Maximum number of concurrent requests
minTime: 1000 // Minimum time between dispatches of requests in milliseconds
minTime: 1000, // Minimum time between dispatches of requests in milliseconds
reservoir: 41, // Initial reservoir value for the first hour
reservoirRefreshAmount: 41, // Reservoir value to reset to every hour
reservoirRefreshInterval: 60 * 60 * 1000, // Interval to reset the reservoir (1 hour in milliseconds)
highWater: 5, // Maximum number of queued jobs.
strategy: Bottleneck.strategy.OVERFLOW_PRIORITY // Strategy to drop the oldest jobs in the queue when highWater is reached
});
this.axiosInstance = axios.create({
@ -82,13 +87,13 @@ class SessionManager {
// Wrap the post method
const originalPost = instance.post;
instance.post = <T = any, R = AxiosResponse<T>>(url: string, data?: any, config?: AxiosRequestConfig): Promise<R> => {
return this.limiter.schedule(() => originalPost.apply(instance, [url, data, config])) as Promise<R>;
return this.limiter.schedule({ priority: 1 }, () => originalPost.apply(instance, [url, data, config])) as Promise<R>;
};
// Wrap the put method
const originalPut = instance.put;
instance.put = <T = any, R = AxiosResponse<T>>(url: string, data?: any, config?: AxiosRequestConfig): Promise<R> => {
return this.limiter.schedule(() => originalPut.apply(instance, [url, data, config])) as Promise<R>;
return this.limiter.schedule({ priority: 1 }, () => originalPut.apply(instance, [url, data, config])) as Promise<R>;
};
// Wrap the delete method