diff --git a/app/config.py b/app/config.py index be424cf2..9d08083e 100644 --- a/app/config.py +++ b/app/config.py @@ -375,5 +375,8 @@ ALIAS_LIMIT = os.environ.get("ALIAS_LIMIT") or "100/day;50/hour;5/minute" ENABLE_SPAM_ASSASSIN = "ENABLE_SPAM_ASSASSIN" in os.environ -HIBP_SCAN_INTERVAL_DAYS = os.environ.get("HIBP_SCAN_INTERVAL_DAYS") or 7 -HIBP_API_KEYS = sl_getenv("HIBP_API_KEYS", list) or [] \ No newline at end of file +try: + HIBP_SCAN_INTERVAL_DAYS = int(os.environ.get("HIBP_SCAN_INTERVAL_DAYS")) +except Exception: + HIBP_SCAN_INTERVAL_DAYS = 7 +HIBP_API_KEYS = sl_getenv("HIBP_API_KEYS", list) or [] diff --git a/app/models.py b/app/models.py index b38dfde8..ff776ff3 100644 --- a/app/models.py +++ b/app/models.py @@ -25,7 +25,7 @@ from app.config import ( FIRST_ALIAS_DOMAIN, DISABLE_ONBOARDING, UNSUBSCRIBER, - HIBP_SCAN_INTERVAL_DAYS + HIBP_SCAN_INTERVAL_DAYS, ) from app.errors import AliasInTrashError from app.extensions import db @@ -1089,7 +1089,9 @@ class Alias(db.Model, ModelMixin): return False def needs_hibp_scan(self): - return self.hibp_last_check is None or (self.hibp_last_check < arrow.now().shift(days=-HIBP_SCAN_INTERVAL_DAYS)) + return self.hibp_last_check is None or ( + self.hibp_last_check < arrow.now().shift(days=-HIBP_SCAN_INTERVAL_DAYS) + ) @classmethod def create(cls, **kw): @@ -2021,15 +2023,17 @@ class AliasMailbox(db.Model, ModelMixin): class AliasHibp(db.Model, ModelMixin): __tablename__ = "alias_hibp" - __table_args__ = ( - db.UniqueConstraint("alias_id", "hibp_id", name="uq_alias_hibp"), - ) + __table_args__ = (db.UniqueConstraint("alias_id", "hibp_id", name="uq_alias_hibp"),) alias_id = db.Column(db.Integer(), db.ForeignKey("alias.id")) hibp_id = db.Column(db.String(), db.ForeignKey("hibp.id")) - alias = db.relationship("Alias", backref=db.backref("alias_hibp", cascade="all, delete-orphan")) - hibp = db.relationship("Hibp", backref=db.backref("alias_hibp", cascade="all, delete-orphan")) + alias = db.relationship( + "Alias", backref=db.backref("alias_hibp", cascade="all, delete-orphan") + ) + hibp = db.relationship( + "Hibp", backref=db.backref("alias_hibp", cascade="all, delete-orphan") + ) class DirectoryMailbox(db.Model, ModelMixin): diff --git a/cron.py b/cron.py index b6b64a82..fda5d4ef 100644 --- a/cron.py +++ b/cron.py @@ -17,7 +17,7 @@ from app.config import ( EMAIL_SERVERS_WITH_PRIORITY, URL, AlERT_WRONG_MX_RECORD_CUSTOM_DOMAIN, - HIBP_API_KEYS + HIBP_API_KEYS, ) from app.dns_utils import get_mx_domains from app.email_utils import ( @@ -53,7 +53,7 @@ from app.models import ( SLDomain, DeletedAlias, DomainDeletedAlias, - Hibp + Hibp, ) from app.utils import sanitize_email from server import create_app @@ -776,7 +776,7 @@ def check_hibp(): LOG.d("Updating list of known breaches") r = requests.get("https://haveibeenpwned.com/api/v3/breaches") for entry in r.json(): - Hibp.get_or_create(id=entry['Name']) + Hibp.get_or_create(id=entry["Name"]) db.session.commit() @@ -787,26 +787,34 @@ def check_hibp(): continue # Only one request per API key per 1.5 seconds - if (api_key_index >= len(HIBP_API_KEYS)): + if api_key_index >= len(HIBP_API_KEYS): api_key_index = 0 sleep(1.5) request_headers = { "user-agent": "SimpleLogin", - "hibp-api-key": HIBP_API_KEYS[api_key_index] + "hibp-api-key": HIBP_API_KEYS[api_key_index], } - r = requests.get(f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(alias.email)}", headers=request_headers) + r = requests.get( + f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(alias.email)}", + headers=request_headers, + ) api_key_index += 1 if r.status_code == 200: # Breaches found - alias.hibp_breaches = [Hibp.get_by(id=entry['Name']) for entry in r.json()] + alias.hibp_breaches = [Hibp.get_by(id=entry["Name"]) for entry in r.json()] elif r.status_code == 404: # No breaches found alias.hibp_breaches = [] else: - LOG.error("An error occured while checking alias %s: %s - %s", alias, r.status_code, r.text) + LOG.error( + "An error occured while checking alias %s: %s - %s", + alias, + r.status_code, + r.text, + ) continue alias.hibp_last_check = arrow.utcnow() @@ -836,7 +844,7 @@ if __name__ == "__main__": "sanity_check", "delete_old_monitoring", "check_custom_domain", - "check_hibp" + "check_hibp", ], ) args = parser.parse_args() diff --git a/server.py b/server.py index 2028ffdc..841a3f57 100644 --- a/server.py +++ b/server.py @@ -299,9 +299,7 @@ def fake_data(): db.session.commit() # example@example.com is in a LOT of data breaches - Alias.create( - email="example@example.com", user_id=user.id, mailbox_id=m1.id - ) + Alias.create(email="example@example.com", user_id=user.id, mailbox_id=m1.id) for i in range(3): if i % 2 == 0: