diff --git a/app/config.py b/app/config.py index 6e660f8f..e8824f5a 100644 --- a/app/config.py +++ b/app/config.py @@ -421,6 +421,8 @@ try: except Exception: HIBP_SCAN_INTERVAL_DAYS = 7 HIBP_API_KEYS = sl_getenv("HIBP_API_KEYS", list) or [] +HIBP_MAX_ALIAS_CHECK = 10_000 +HIBP_RPM = 100 POSTMASTER = os.environ.get("POSTMASTER") diff --git a/cron.py b/cron.py index 3626d699..503046e8 100644 --- a/cron.py +++ b/cron.py @@ -5,7 +5,7 @@ from typing import List, Tuple import arrow import requests -from sqlalchemy import func, desc, or_, and_ +from sqlalchemy import func, desc, or_, and_, nullsfirst from sqlalchemy.ext.compiler import compiles from sqlalchemy.orm import joinedload from sqlalchemy.orm.exc import ObjectDeletedError @@ -962,6 +962,9 @@ async def _hibp_check(api_key, queue): This function to be ran simultaneously (multiple _hibp_check functions with different keys on the same queue) to make maximum use of multiple API keys. """ + default_rate_sleep = (60.0 / config.HIBP_RPM) + 0.1 + rate_sleep = default_rate_sleep + rate_hit_counter = 0 while True: try: alias_id = queue.get_nowait() @@ -969,9 +972,11 @@ async def _hibp_check(api_key, queue): return alias = Alias.get(alias_id) - # an alias can be deleted in the meantime if not alias: - return + continue + user = alias.user + if user.disabled or not user.is_paid(): + continue LOG.d("Checking HIBP for %s", alias) @@ -983,7 +988,6 @@ async def _hibp_check(api_key, queue): f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(alias.email)}", headers=request_headers, ) - if r.status_code == 200: # Breaches found alias.hibp_breaches = [ @@ -991,20 +995,27 @@ async def _hibp_check(api_key, queue): ] if len(alias.hibp_breaches) > 0: LOG.w("%s appears in HIBP breaches %s", alias, alias.hibp_breaches) + if rate_hit_counter > 0: + rate_hit_counter -= 1 elif r.status_code == 404: # No breaches found alias.hibp_breaches = [] elif r.status_code == 429: # rate limited LOG.w("HIBP rate limited, check alias %s in the next run", alias) - await asyncio.sleep(1.6) - return + rate_hit_counter += 1 + rate_sleep = default_rate_sleep + (0.2 * rate_hit_counter) + if rate_hit_counter > 10: + LOG.w(f"HIBP rate limited too many times stopping with alias {alias}") + return + # Just sleep for a while + asyncio.sleep(5) elif r.status_code > 500: LOG.w("HIBP server 5** error %s", r.status_code) return else: LOG.error( - "An error occured while checking alias %s: %s - %s", + "An error occurred while checking alias %s: %s - %s", alias, r.status_code, r.text, @@ -1015,9 +1026,8 @@ async def _hibp_check(api_key, queue): Session.add(alias) Session.commit() - LOG.d("Updated breaches info for %s", alias) - - await asyncio.sleep(1.6) + LOG.d("Updated breach info for %s", alias) + await asyncio.sleep(rate_sleep) async def check_hibp(): @@ -1040,15 +1050,22 @@ async def check_hibp(): Session.commit() LOG.d("Updated list of known breaches") + LOG.d("Getting the list of users to skip") + query = "select u.id, count(a.id) from users u, alias a where a.user_id=u.id group by u.id having count(a.id) > :max_alias" + rows = Session.execute(query, {"max_alias": config.HIBP_MAX_ALIAS_CHECK}) + user_ids = [row[0] for row in rows] + LOG.d("Got %d users to skip" % len(user_ids)) + LOG.d("Preparing list of aliases to check") queue = asyncio.Queue() max_date = arrow.now().shift(days=-config.HIBP_SCAN_INTERVAL_DAYS) for alias in ( Alias.filter( - or_(Alias.hibp_last_check.is_(None), Alias.hibp_last_check < max_date) + or_(Alias.hibp_last_check.is_(None), Alias.hibp_last_check < max_date), + Alias.user_id.notin_(user_ids), ) .filter(Alias.enabled) - .order_by(Alias.hibp_last_check.asc()) + .order_by(nullsfirst(Alias.hibp_last_check.asc()), Alias.id.asc()) .yield_per(500) .enable_eagerloads(False) ):