diff --git a/cron.py b/cron.py index fda5d4ef..436beca0 100644 --- a/cron.py +++ b/cron.py @@ -1,5 +1,7 @@ import argparse +import multiprocessing import urllib.parse +from queue import Empty from time import sleep from typing import List, Tuple @@ -761,47 +763,30 @@ def delete_old_monitoring(): LOG.d("delete monitoring records older than %s, nb row %s", max_time, nb_row) -def check_hibp(): +def _hibp_check(api_key, queue): """ - Check all aliases on the HIBP (Have I Been Pwned) API + Uses a single API key to check the queue as fast as possible. + + This function to be ran simultaneously (multiple _hibp_check functions with different keys on the same queue) to make maximum use of multiple API keys. """ - LOG.d("Checking HIBP API for aliases in breaches") + while True: + try: + alias_id = queue.get_nowait() + except Empty: + return - if len(HIBP_API_KEYS) == 0: - LOG.exception("No HIBP API keys") - return - - api_key_index = 0 - - LOG.d("Updating list of known breaches") - r = requests.get("https://haveibeenpwned.com/api/v3/breaches") - for entry in r.json(): - Hibp.get_or_create(id=entry["Name"]) - - db.session.commit() - - for alias in Alias.query.order_by(Alias.hibp_last_check.asc().nullsfirst()).all(): - LOG.d("Checking %s on HIBP", alias) - if not alias.needs_hibp_scan(): - LOG.d("Skipping %s, HIBP-checked too recently...", alias) - continue - - # Only one request per API key per 1.5 seconds - if api_key_index >= len(HIBP_API_KEYS): - api_key_index = 0 - sleep(1.5) + alias = Alias.get(alias_id) + LOG.d("Checking HIBP for %s", alias) request_headers = { "user-agent": "SimpleLogin", - "hibp-api-key": HIBP_API_KEYS[api_key_index], + "hibp-api-key": api_key, } r = requests.get( f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(alias.email)}", headers=request_headers, ) - api_key_index += 1 - if r.status_code == 200: # Breaches found alias.hibp_breaches = [Hibp.get_by(id=entry["Name"]) for entry in r.json()] @@ -815,14 +800,66 @@ def check_hibp(): r.status_code, r.text, ) - continue - - alias.hibp_last_check = arrow.utcnow() + return + db.session.add(alias) db.session.commit() LOG.d("Updated breaches info for %s", alias) + sleep(1.5) + + +def check_hibp(): + """ + Check all aliases on the HIBP (Have I Been Pwned) API + """ + LOG.d("Checking HIBP API for aliases in breaches") + + if len(HIBP_API_KEYS) == 0: + LOG.exception("No HIBP API keys") + return + + LOG.d("Updating list of known breaches") + r = requests.get("https://haveibeenpwned.com/api/v3/breaches") + for entry in r.json(): + Hibp.get_or_create(id=entry["Name"]) + + db.session.commit() + LOG.d("Updated list of known breaches") + + LOG.d("Preparing list of aliases to check") + queue = multiprocessing.Queue() + for alias in Alias.query.order_by(Alias.hibp_last_check.asc().nullsfirst()).all(): + if not alias.needs_hibp_scan(): + queue.put(alias.id) + LOG.d("Need to check about %s aliases", queue.qsize()) + + # Start one checking process per API key + # Each checking process will take one alias from the queue, get the info + # and then sleep for 1.5 seconds (due to HIBP API request limits) + checkers = [] + for i in range(len(HIBP_API_KEYS)): + checker = multiprocessing.Process( + target=_hibp_check, + args=( + HIBP_API_KEYS[i], + queue, + ), + ) + checkers.append(checker) + checker.start() + + # Wait until all checking processes are done + while True: + sleep(5) + for checker in checkers: + if checker.is_alive(): + break + + # All are done + break + LOG.d("Done checking HIBP API for aliases in breaches")