asyncio-ify

This commit is contained in:
Sylvia van Os 2021-05-18 21:18:07 +02:00
parent a08b0c05cc
commit 40d0dee88f
1 changed files with 13 additions and 22 deletions

35
cron.py
View File

@ -1,7 +1,6 @@
import argparse import argparse
import multiprocessing import asyncio
import urllib.parse import urllib.parse
from queue import Empty
from time import sleep from time import sleep
from typing import List, Tuple from typing import List, Tuple
@ -764,7 +763,7 @@ def delete_old_monitoring():
LOG.d("delete monitoring records older than %s, nb row %s", max_time, nb_row) LOG.d("delete monitoring records older than %s, nb row %s", max_time, nb_row)
def _hibp_check(api_key, queue): async def _hibp_check(api_key, queue):
""" """
Uses a single API key to check the queue as fast as possible. Uses a single API key to check the queue as fast as possible.
@ -773,7 +772,7 @@ def _hibp_check(api_key, queue):
while True: while True:
try: try:
alias_id = queue.get_nowait() alias_id = queue.get_nowait()
except Empty: except asyncio.QueueEmpty:
return return
alias = Alias.get(alias_id) alias = Alias.get(alias_id)
@ -810,10 +809,10 @@ def _hibp_check(api_key, queue):
LOG.d("Updated breaches info for %s", alias) LOG.d("Updated breaches info for %s", alias)
sleep(1.5) await asyncio.sleep(1.5)
def check_hibp(): async def check_hibp():
""" """
Check all aliases on the HIBP (Have I Been Pwned) API Check all aliases on the HIBP (Have I Been Pwned) API
""" """
@ -832,7 +831,7 @@ def check_hibp():
LOG.d("Updated list of known breaches") LOG.d("Updated list of known breaches")
LOG.d("Preparing list of aliases to check") LOG.d("Preparing list of aliases to check")
queue = multiprocessing.Queue() queue = asyncio.Queue()
max_date = arrow.now().shift(days=-HIBP_SCAN_INTERVAL_DAYS) max_date = arrow.now().shift(days=-HIBP_SCAN_INTERVAL_DAYS)
for alias in ( for alias in (
Alias.query.filter( Alias.query.filter(
@ -841,7 +840,7 @@ def check_hibp():
.order_by(Alias.hibp_last_check.asc().nullsfirst()) .order_by(Alias.hibp_last_check.asc().nullsfirst())
.all() .all()
): ):
queue.put(alias.id) await queue.put(alias.id)
LOG.d("Need to check about %s aliases", queue.qsize()) LOG.d("Need to check about %s aliases", queue.qsize())
@ -850,25 +849,17 @@ def check_hibp():
# and then sleep for 1.5 seconds (due to HIBP API request limits) # and then sleep for 1.5 seconds (due to HIBP API request limits)
checkers = [] checkers = []
for i in range(len(HIBP_API_KEYS)): for i in range(len(HIBP_API_KEYS)):
checker = multiprocessing.Process( checker = asyncio.create_task(
target=_hibp_check, _hibp_check(
args=(
HIBP_API_KEYS[i], HIBP_API_KEYS[i],
queue, queue,
), )
) )
checkers.append(checker) checkers.append(checker)
checker.start()
# Wait until all checking processes are done # Wait until all checking processes are done
while True: for checker in checkers:
sleep(5) await checker
for checker in checkers:
if checker.is_alive():
break
# All are done
break
LOG.d("Done checking HIBP API for aliases in breaches") LOG.d("Done checking HIBP API for aliases in breaches")
@ -928,4 +919,4 @@ if __name__ == "__main__":
check_custom_domain() check_custom_domain()
elif args.job == "check_hibp": elif args.job == "check_hibp":
LOG.d("Check HIBP") LOG.d("Check HIBP")
check_hibp() asyncio.run(check_hibp())