'distributor' -> 'batcher' + slow down new ID checks

master
float-trip 2023-07-11 15:37:58 +00:00
parent 70bad811bb
commit 4937f625d2
3 changed files with 11 additions and 3 deletions

View File

@ -27,6 +27,11 @@ with open(os.path.join(DIR, "batcher_config.yaml")) as f:
class IDBatcher: class IDBatcher:
"""
Periodically gets the newest comment/post ID from reddit, compares it to the
last fetched one, and adds all intermediate values into a queue.
"""
def __init__(self): def __init__(self):
self.lock = threading.Lock() self.lock = threading.Lock()
self.ids_queue = deque() self.ids_queue = deque()
@ -43,7 +48,7 @@ class IDBatcher:
self.fetch_new_ids() self.fetch_new_ids()
schedule.every(5).seconds.do(self.fetch_new_ids) schedule.every(30).seconds.do(self.fetch_new_ids)
thread = threading.Thread(target=self.start_scheduler) thread = threading.Thread(target=self.start_scheduler)
thread.start() thread.start()

View File

@ -126,11 +126,15 @@ class RedditClient:
class FetcherClient: class FetcherClient:
"""
Retrieves lists of IDs from the batcher, fetches them from reddit, and submits the JSON back.
"""
def __init__(self, client_id, reddit_client): def __init__(self, client_id, reddit_client):
self.client_id = client_id self.client_id = client_id
self.reddit_client = reddit_client self.reddit_client = reddit_client
self.channel = grpc.insecure_channel( self.channel = grpc.insecure_channel(
config["distributor_uri"], config["batcher_uri"],
options=[ options=[
("grpc.max_send_message_length", 100 * 10**6), ("grpc.max_send_message_length", 100 * 10**6),
("grpc.max_receive_message_length", 1 * 10**6), ("grpc.max_receive_message_length", 1 * 10**6),

View File

@ -10,7 +10,6 @@ service IDService {
rpc SubmitBatch (SubmitRequest) returns (SubmitResponse) {} rpc SubmitBatch (SubmitRequest) returns (SubmitResponse) {}
} }
// The BatchRequest message contains the client id.
message BatchRequest { message BatchRequest {
string client_id = 1; string client_id = 1;
} }