diff --git a/cron.py b/cron.py index b3446961..fc8caade 100644 --- a/cron.py +++ b/cron.py @@ -976,6 +976,9 @@ async def _hibp_check(api_key, queue): continue user = alias.user if user.disabled or not user.is_paid(): + # Mark it as hibp done to skip it as if it had been checked + alias.hibp_last_check = arrow.utcnow() + Session.commit() continue LOG.d("Checking HIBP for %s", alias) @@ -1056,43 +1059,51 @@ async def check_hibp(): user_ids = [row[0] for row in rows] LOG.d("Got %d users to skip" % len(user_ids)) - LOG.d("Preparing list of aliases to check") + LOG.d("Checking aliases") queue = asyncio.Queue() + min_alias_id = 0 + max_alias_id = Session.query(func.max(Alias.id)).scalar() + step = 500 max_date = arrow.now().shift(days=-config.HIBP_SCAN_INTERVAL_DAYS) - alias_query = Alias.filter( - or_(Alias.hibp_last_check.is_(None), Alias.hibp_last_check < max_date), - Alias.user_id.notin_(user_ids), - Alias.enabled, - ) - if config.HIBP_SKIP_PARTNER_ALIAS: - alias_query = alias_query(Alias.flags.op("&")(Alias.FLAG_PARTNER_CREATED) == 0) - for alias in ( - alias_query.order_by(nullsfirst(Alias.hibp_last_check.asc()), Alias.id.asc()) - .yield_per(500) - .enable_eagerloads(False) - ): - await queue.put(alias.id) - - LOG.d("Need to check about %s aliases", queue.qsize()) - - # Start one checking process per API key - # Each checking process will take one alias from the queue, get the info - # and then sleep for 1.5 seconds (due to HIBP API request limits) - checkers = [] - for i in range(len(config.HIBP_API_KEYS)): - checker = asyncio.create_task( - _hibp_check( - config.HIBP_API_KEYS[i], - queue, - ) + alias_checked = 0 + for alias_batch_id in range(min_alias_id, max_alias_id, step): + alias_query = Alias.filter( + or_(Alias.hibp_last_check.is_(None), Alias.hibp_last_check < max_date), + Alias.user_id.notin_(user_ids), + Alias.enabled, + Alias.id >= alias_batch_id, + Alias.id < alias_batch_id + step, ) - checkers.append(checker) + if config.HIBP_SKIP_PARTNER_ALIAS: + alias_query = alias_query( + Alias.flags.op("&")(Alias.FLAG_PARTNER_CREATED) == 0 + ) + for alias in alias_query.order_by( + nullsfirst(Alias.hibp_last_check.asc()), Alias.id.asc() + ).enable_eagerloads(False): + await queue.put(alias.id) - # Wait until all checking processes are done - for checker in checkers: - await checker + alias_checked += queue.qsize() + LOG.d("Need to check about %s aliases in this loop", queue.qsize()) - LOG.d("Done checking HIBP API for aliases in breaches") + # Start one checking process per API key + # Each checking process will take one alias from the queue, get the info + # and then sleep for 1.5 seconds (due to HIBP API request limits) + checkers = [] + for i in range(len(config.HIBP_API_KEYS)): + checker = asyncio.create_task( + _hibp_check( + config.HIBP_API_KEYS[i], + queue, + ) + ) + checkers.append(checker) + + # Wait until all checking processes are done + for checker in checkers: + await checker + + LOG.d(f"Done checking {alias_checked} HIBP API for aliases in breaches") def notify_hibp():