DramaHarvester/src/rdrama/services/CommentProcessor.ts

54 lines
2.4 KiB
TypeScript

import { Comment } from '../models/Comment';
import { CommentFetcher } from './CommentFetcher';
import RedisCommentService from '../../redis/services/CommentService';
/**
* CommentProcessor handles the retrieval and processing of comments from the r/Drama API.
* It manages API requests through CommentFetcher, including rate limiting and retries, and coordinates
* with DatabaseService for checking the existence and persisting new comments.
*/
export class CommentProcessor {
/**
* Fetches comments from the r/Drama API across multiple pages, up to the specified maximum.
* Iterates through pages starting from the first page until the maximum page limit is reached
* or there are no more comments to fetch. Each page's fetch operation is timed for performance analysis.
*
* @returns {Promise<void>} A promise that resolves void.
* @throws {Error} Throws an error if there is a failure in fetching comments from the API.
*/
static async processComments(maxPages: number = 10): Promise<void> {
let comments: Comment[] = [];
let stopFetching = false;
for (let page = 1; page <= maxPages && !stopFetching; page++) {
const newComments = await CommentFetcher.fetchComments(page)
// Check each new comment against the database and existing comments in this batch
for (const comment of newComments) {
// Check if the comment was already processed in this batch
if (comments.some(c => c.id === comment.id)) continue;
const exists = await RedisCommentService.retrieveComment(comment.id.toString());
if (exists) {
console.log(`Comment ${comment.id} exists`)
stopFetching = true;
break; // Stop processing this batch of comments
}
// Cache new comment with a 3600-second expiration
await RedisCommentService.storeComment(comment);
// Optionally publish the comment to a Redis channel for subscribers
await RedisCommentService.publishComment(comment);
console.log(`Published Comment ${comment.id}`)
// prevent accidental recognition of a previous comment
comments.push(comment);
}
if (newComments.length === 0) break; // No more comments to fetch
}
}
}