Black
This commit is contained in:
parent
b3fa445250
commit
bee468e055
|
@ -375,5 +375,8 @@ ALIAS_LIMIT = os.environ.get("ALIAS_LIMIT") or "100/day;50/hour;5/minute"
|
|||
|
||||
ENABLE_SPAM_ASSASSIN = "ENABLE_SPAM_ASSASSIN" in os.environ
|
||||
|
||||
HIBP_SCAN_INTERVAL_DAYS = os.environ.get("HIBP_SCAN_INTERVAL_DAYS") or 7
|
||||
HIBP_API_KEYS = sl_getenv("HIBP_API_KEYS", list) or []
|
||||
try:
|
||||
HIBP_SCAN_INTERVAL_DAYS = int(os.environ.get("HIBP_SCAN_INTERVAL_DAYS"))
|
||||
except Exception:
|
||||
HIBP_SCAN_INTERVAL_DAYS = 7
|
||||
HIBP_API_KEYS = sl_getenv("HIBP_API_KEYS", list) or []
|
||||
|
|
|
@ -25,7 +25,7 @@ from app.config import (
|
|||
FIRST_ALIAS_DOMAIN,
|
||||
DISABLE_ONBOARDING,
|
||||
UNSUBSCRIBER,
|
||||
HIBP_SCAN_INTERVAL_DAYS
|
||||
HIBP_SCAN_INTERVAL_DAYS,
|
||||
)
|
||||
from app.errors import AliasInTrashError
|
||||
from app.extensions import db
|
||||
|
@ -1089,7 +1089,9 @@ class Alias(db.Model, ModelMixin):
|
|||
return False
|
||||
|
||||
def needs_hibp_scan(self):
|
||||
return self.hibp_last_check is None or (self.hibp_last_check < arrow.now().shift(days=-HIBP_SCAN_INTERVAL_DAYS))
|
||||
return self.hibp_last_check is None or (
|
||||
self.hibp_last_check < arrow.now().shift(days=-HIBP_SCAN_INTERVAL_DAYS)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create(cls, **kw):
|
||||
|
@ -2021,15 +2023,17 @@ class AliasMailbox(db.Model, ModelMixin):
|
|||
class AliasHibp(db.Model, ModelMixin):
|
||||
__tablename__ = "alias_hibp"
|
||||
|
||||
__table_args__ = (
|
||||
db.UniqueConstraint("alias_id", "hibp_id", name="uq_alias_hibp"),
|
||||
)
|
||||
__table_args__ = (db.UniqueConstraint("alias_id", "hibp_id", name="uq_alias_hibp"),)
|
||||
|
||||
alias_id = db.Column(db.Integer(), db.ForeignKey("alias.id"))
|
||||
hibp_id = db.Column(db.String(), db.ForeignKey("hibp.id"))
|
||||
|
||||
alias = db.relationship("Alias", backref=db.backref("alias_hibp", cascade="all, delete-orphan"))
|
||||
hibp = db.relationship("Hibp", backref=db.backref("alias_hibp", cascade="all, delete-orphan"))
|
||||
alias = db.relationship(
|
||||
"Alias", backref=db.backref("alias_hibp", cascade="all, delete-orphan")
|
||||
)
|
||||
hibp = db.relationship(
|
||||
"Hibp", backref=db.backref("alias_hibp", cascade="all, delete-orphan")
|
||||
)
|
||||
|
||||
|
||||
class DirectoryMailbox(db.Model, ModelMixin):
|
||||
|
|
26
cron.py
26
cron.py
|
@ -17,7 +17,7 @@ from app.config import (
|
|||
EMAIL_SERVERS_WITH_PRIORITY,
|
||||
URL,
|
||||
AlERT_WRONG_MX_RECORD_CUSTOM_DOMAIN,
|
||||
HIBP_API_KEYS
|
||||
HIBP_API_KEYS,
|
||||
)
|
||||
from app.dns_utils import get_mx_domains
|
||||
from app.email_utils import (
|
||||
|
@ -53,7 +53,7 @@ from app.models import (
|
|||
SLDomain,
|
||||
DeletedAlias,
|
||||
DomainDeletedAlias,
|
||||
Hibp
|
||||
Hibp,
|
||||
)
|
||||
from app.utils import sanitize_email
|
||||
from server import create_app
|
||||
|
@ -776,7 +776,7 @@ def check_hibp():
|
|||
LOG.d("Updating list of known breaches")
|
||||
r = requests.get("https://haveibeenpwned.com/api/v3/breaches")
|
||||
for entry in r.json():
|
||||
Hibp.get_or_create(id=entry['Name'])
|
||||
Hibp.get_or_create(id=entry["Name"])
|
||||
|
||||
db.session.commit()
|
||||
|
||||
|
@ -787,26 +787,34 @@ def check_hibp():
|
|||
continue
|
||||
|
||||
# Only one request per API key per 1.5 seconds
|
||||
if (api_key_index >= len(HIBP_API_KEYS)):
|
||||
if api_key_index >= len(HIBP_API_KEYS):
|
||||
api_key_index = 0
|
||||
sleep(1.5)
|
||||
|
||||
request_headers = {
|
||||
"user-agent": "SimpleLogin",
|
||||
"hibp-api-key": HIBP_API_KEYS[api_key_index]
|
||||
"hibp-api-key": HIBP_API_KEYS[api_key_index],
|
||||
}
|
||||
r = requests.get(f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(alias.email)}", headers=request_headers)
|
||||
r = requests.get(
|
||||
f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(alias.email)}",
|
||||
headers=request_headers,
|
||||
)
|
||||
|
||||
api_key_index += 1
|
||||
|
||||
if r.status_code == 200:
|
||||
# Breaches found
|
||||
alias.hibp_breaches = [Hibp.get_by(id=entry['Name']) for entry in r.json()]
|
||||
alias.hibp_breaches = [Hibp.get_by(id=entry["Name"]) for entry in r.json()]
|
||||
elif r.status_code == 404:
|
||||
# No breaches found
|
||||
alias.hibp_breaches = []
|
||||
else:
|
||||
LOG.error("An error occured while checking alias %s: %s - %s", alias, r.status_code, r.text)
|
||||
LOG.error(
|
||||
"An error occured while checking alias %s: %s - %s",
|
||||
alias,
|
||||
r.status_code,
|
||||
r.text,
|
||||
)
|
||||
continue
|
||||
|
||||
alias.hibp_last_check = arrow.utcnow()
|
||||
|
@ -836,7 +844,7 @@ if __name__ == "__main__":
|
|||
"sanity_check",
|
||||
"delete_old_monitoring",
|
||||
"check_custom_domain",
|
||||
"check_hibp"
|
||||
"check_hibp",
|
||||
],
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
|
|
@ -299,9 +299,7 @@ def fake_data():
|
|||
db.session.commit()
|
||||
|
||||
# example@example.com is in a LOT of data breaches
|
||||
Alias.create(
|
||||
email="example@example.com", user_id=user.id, mailbox_id=m1.id
|
||||
)
|
||||
Alias.create(email="example@example.com", user_id=user.id, mailbox_id=m1.id)
|
||||
|
||||
for i in range(3):
|
||||
if i % 2 == 0:
|
||||
|
|
Loading…
Reference in New Issue