From 40d0dee88f08dc91f0ce304f03f4f39775deb0fd Mon Sep 17 00:00:00 2001 From: Sylvia van Os Date: Tue, 18 May 2021 21:18:07 +0200 Subject: [PATCH] asyncio-ify --- cron.py | 35 +++++++++++++---------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/cron.py b/cron.py index 99ba1c2c..a0316ea3 100644 --- a/cron.py +++ b/cron.py @@ -1,7 +1,6 @@ import argparse -import multiprocessing +import asyncio import urllib.parse -from queue import Empty from time import sleep from typing import List, Tuple @@ -764,7 +763,7 @@ def delete_old_monitoring(): LOG.d("delete monitoring records older than %s, nb row %s", max_time, nb_row) -def _hibp_check(api_key, queue): +async def _hibp_check(api_key, queue): """ Uses a single API key to check the queue as fast as possible. @@ -773,7 +772,7 @@ def _hibp_check(api_key, queue): while True: try: alias_id = queue.get_nowait() - except Empty: + except asyncio.QueueEmpty: return alias = Alias.get(alias_id) @@ -810,10 +809,10 @@ def _hibp_check(api_key, queue): LOG.d("Updated breaches info for %s", alias) - sleep(1.5) + await asyncio.sleep(1.5) -def check_hibp(): +async def check_hibp(): """ Check all aliases on the HIBP (Have I Been Pwned) API """ @@ -832,7 +831,7 @@ def check_hibp(): LOG.d("Updated list of known breaches") LOG.d("Preparing list of aliases to check") - queue = multiprocessing.Queue() + queue = asyncio.Queue() max_date = arrow.now().shift(days=-HIBP_SCAN_INTERVAL_DAYS) for alias in ( Alias.query.filter( @@ -841,7 +840,7 @@ def check_hibp(): .order_by(Alias.hibp_last_check.asc().nullsfirst()) .all() ): - queue.put(alias.id) + await queue.put(alias.id) LOG.d("Need to check about %s aliases", queue.qsize()) @@ -850,25 +849,17 @@ def check_hibp(): # and then sleep for 1.5 seconds (due to HIBP API request limits) checkers = [] for i in range(len(HIBP_API_KEYS)): - checker = multiprocessing.Process( - target=_hibp_check, - args=( + checker = asyncio.create_task( + _hibp_check( HIBP_API_KEYS[i], queue, - ), + ) ) checkers.append(checker) - checker.start() # Wait until all checking processes are done - while True: - sleep(5) - for checker in checkers: - if checker.is_alive(): - break - - # All are done - break + for checker in checkers: + await checker LOG.d("Done checking HIBP API for aliases in breaches") @@ -928,4 +919,4 @@ if __name__ == "__main__": check_custom_domain() elif args.job == "check_hibp": LOG.d("Check HIBP") - check_hibp() + asyncio.run(check_hibp())