cron, init app, job runner: wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc
This commit is contained in:
parent
a0165d6381
commit
a99ac24b72
70
cron.py
70
cron.py
|
@ -59,6 +59,7 @@ from app.models import (
|
|||
HibpNotifiedAlias,
|
||||
)
|
||||
from app.utils import sanitize_email
|
||||
from server import create_light_app
|
||||
|
||||
|
||||
def notify_trial_end():
|
||||
|
@ -895,37 +896,38 @@ if __name__ == "__main__":
|
|||
],
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.job == "stats":
|
||||
LOG.d("Compute Stats")
|
||||
stats()
|
||||
elif args.job == "notify_trial_end":
|
||||
LOG.d("Notify users with trial ending soon")
|
||||
notify_trial_end()
|
||||
elif args.job == "notify_manual_subscription_end":
|
||||
LOG.d("Notify users with manual subscription ending soon")
|
||||
notify_manual_sub_end()
|
||||
elif args.job == "notify_premium_end":
|
||||
LOG.d("Notify users with premium ending soon")
|
||||
notify_premium_end()
|
||||
elif args.job == "delete_logs":
|
||||
LOG.d("Deleted Logs")
|
||||
delete_logs()
|
||||
elif args.job == "poll_apple_subscription":
|
||||
LOG.d("Poll Apple Subscriptions")
|
||||
poll_apple_subscription()
|
||||
elif args.job == "sanity_check":
|
||||
LOG.d("Check data consistency")
|
||||
sanity_check()
|
||||
elif args.job == "delete_old_monitoring":
|
||||
LOG.d("Delete old monitoring records")
|
||||
delete_old_monitoring()
|
||||
elif args.job == "check_custom_domain":
|
||||
LOG.d("Check custom domain")
|
||||
check_custom_domain()
|
||||
elif args.job == "check_hibp":
|
||||
LOG.d("Check HIBP")
|
||||
asyncio.run(check_hibp())
|
||||
elif args.job == "notify_hibp":
|
||||
LOG.d("Notify users about HIBP breaches")
|
||||
notify_hibp()
|
||||
# wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc
|
||||
with create_light_app().app_context():
|
||||
if args.job == "stats":
|
||||
LOG.d("Compute Stats")
|
||||
stats()
|
||||
elif args.job == "notify_trial_end":
|
||||
LOG.d("Notify users with trial ending soon")
|
||||
notify_trial_end()
|
||||
elif args.job == "notify_manual_subscription_end":
|
||||
LOG.d("Notify users with manual subscription ending soon")
|
||||
notify_manual_sub_end()
|
||||
elif args.job == "notify_premium_end":
|
||||
LOG.d("Notify users with premium ending soon")
|
||||
notify_premium_end()
|
||||
elif args.job == "delete_logs":
|
||||
LOG.d("Deleted Logs")
|
||||
delete_logs()
|
||||
elif args.job == "poll_apple_subscription":
|
||||
LOG.d("Poll Apple Subscriptions")
|
||||
poll_apple_subscription()
|
||||
elif args.job == "sanity_check":
|
||||
LOG.d("Check data consistency")
|
||||
sanity_check()
|
||||
elif args.job == "delete_old_monitoring":
|
||||
LOG.d("Delete old monitoring records")
|
||||
delete_old_monitoring()
|
||||
elif args.job == "check_custom_domain":
|
||||
LOG.d("Check custom domain")
|
||||
check_custom_domain()
|
||||
elif args.job == "check_hibp":
|
||||
LOG.d("Check HIBP")
|
||||
asyncio.run(check_hibp())
|
||||
elif args.job == "notify_hibp":
|
||||
LOG.d("Notify users about HIBP breaches")
|
||||
notify_hibp()
|
||||
|
|
|
@ -150,19 +150,6 @@ if NEWRELIC_CONFIG_PATH:
|
|||
newrelic_app = newrelic.agent.register_application()
|
||||
|
||||
|
||||
# fix the database connection leak issue
|
||||
# use this method instead of create_app
|
||||
def new_app():
|
||||
app = create_light_app()
|
||||
|
||||
@app.teardown_appcontext
|
||||
def shutdown_session(response_or_exc):
|
||||
# same as shutdown_session() in flask-sqlalchemy but this is not enough
|
||||
Session.remove()
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Contact:
|
||||
"""
|
||||
contact_from_header is the RFC 2047 format FROM header
|
||||
|
@ -2059,8 +2046,7 @@ class MailHandler:
|
|||
envelope.rcpt_tos,
|
||||
)
|
||||
|
||||
app = new_app()
|
||||
with app.app_context():
|
||||
with create_light_app().app_context():
|
||||
ret = handle(envelope)
|
||||
elapsed = time.time() - start
|
||||
LOG.i(
|
||||
|
|
|
@ -3,6 +3,7 @@ from app.db import Session
|
|||
from app.log import LOG
|
||||
from app.models import Mailbox, Contact, SLDomain
|
||||
from app.pgp_utils import load_public_key
|
||||
from server import create_light_app
|
||||
|
||||
|
||||
def load_pgp_public_keys():
|
||||
|
@ -50,5 +51,7 @@ def add_sl_domains():
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
load_pgp_public_keys()
|
||||
add_sl_domains()
|
||||
# wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc
|
||||
with create_light_app().app_context():
|
||||
load_pgp_public_keys()
|
||||
add_sl_domains()
|
||||
|
|
213
job_runner.py
213
job_runner.py
|
@ -26,19 +26,6 @@ from app.models import User, Job, BatchImport, Mailbox, CustomDomain
|
|||
from server import create_light_app
|
||||
|
||||
|
||||
# fix the database connection leak issue
|
||||
# use this method instead of create_app
|
||||
def new_app():
|
||||
app = create_light_app()
|
||||
|
||||
@app.teardown_appcontext
|
||||
def shutdown_session(response_or_exc):
|
||||
# same as shutdown_session() in flask-sqlalchemy but this is not enough
|
||||
Session.remove()
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def onboarding_send_from_alias(user):
|
||||
to_email, unsubscribe_link, via_email = user.get_communication_email()
|
||||
if not to_email:
|
||||
|
@ -101,116 +88,118 @@ def onboarding_mailbox(user):
|
|||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
# run a job 1h earlier or later is not a big deal ...
|
||||
min_dt = arrow.now().shift(hours=-1)
|
||||
max_dt = arrow.now().shift(hours=1)
|
||||
# wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc
|
||||
with create_light_app().app_context():
|
||||
# run a job 1h earlier or later is not a big deal ...
|
||||
min_dt = arrow.now().shift(hours=-1)
|
||||
max_dt = arrow.now().shift(hours=1)
|
||||
|
||||
for job in Job.filter(
|
||||
Job.taken.is_(False), Job.run_at > min_dt, Job.run_at <= max_dt
|
||||
).all():
|
||||
LOG.d("Take job %s", job)
|
||||
for job in Job.filter(
|
||||
Job.taken.is_(False), Job.run_at > min_dt, Job.run_at <= max_dt
|
||||
).all():
|
||||
LOG.d("Take job %s", job)
|
||||
|
||||
# mark the job as taken, whether it will be executed successfully or not
|
||||
job.taken = True
|
||||
Session.commit()
|
||||
|
||||
if job.name == JOB_ONBOARDING_1:
|
||||
user_id = job.payload.get("user_id")
|
||||
user = User.get(user_id)
|
||||
|
||||
# user might delete their account in the meantime
|
||||
# or disable the notification
|
||||
if user and user.notification and user.activated:
|
||||
LOG.d("send onboarding send-from-alias email to user %s", user)
|
||||
onboarding_send_from_alias(user)
|
||||
elif job.name == JOB_ONBOARDING_2:
|
||||
user_id = job.payload.get("user_id")
|
||||
user = User.get(user_id)
|
||||
|
||||
# user might delete their account in the meantime
|
||||
# or disable the notification
|
||||
if user and user.notification and user.activated:
|
||||
LOG.d("send onboarding mailbox email to user %s", user)
|
||||
onboarding_mailbox(user)
|
||||
elif job.name == JOB_ONBOARDING_4:
|
||||
user_id = job.payload.get("user_id")
|
||||
user = User.get(user_id)
|
||||
|
||||
# user might delete their account in the meantime
|
||||
# or disable the notification
|
||||
if user and user.notification and user.activated:
|
||||
LOG.d("send onboarding pgp email to user %s", user)
|
||||
onboarding_pgp(user)
|
||||
|
||||
elif job.name == JOB_BATCH_IMPORT:
|
||||
batch_import_id = job.payload.get("batch_import_id")
|
||||
batch_import = BatchImport.get(batch_import_id)
|
||||
handle_batch_import(batch_import)
|
||||
elif job.name == JOB_DELETE_ACCOUNT:
|
||||
user_id = job.payload.get("user_id")
|
||||
user = User.get(user_id)
|
||||
|
||||
if not user:
|
||||
LOG.i("No user found for %s", user_id)
|
||||
continue
|
||||
|
||||
user_email = user.email
|
||||
LOG.w("Delete user %s", user)
|
||||
User.delete(user.id)
|
||||
# mark the job as taken, whether it will be executed successfully or not
|
||||
job.taken = True
|
||||
Session.commit()
|
||||
|
||||
send_email(
|
||||
user_email,
|
||||
"Your SimpleLogin account has been deleted",
|
||||
render("transactional/account-delete.txt"),
|
||||
render("transactional/account-delete.html"),
|
||||
)
|
||||
elif job.name == JOB_DELETE_MAILBOX:
|
||||
mailbox_id = job.payload.get("mailbox_id")
|
||||
mailbox = Mailbox.get(mailbox_id)
|
||||
if not mailbox:
|
||||
continue
|
||||
if job.name == JOB_ONBOARDING_1:
|
||||
user_id = job.payload.get("user_id")
|
||||
user = User.get(user_id)
|
||||
|
||||
mailbox_email = mailbox.email
|
||||
user = mailbox.user
|
||||
# user might delete their account in the meantime
|
||||
# or disable the notification
|
||||
if user and user.notification and user.activated:
|
||||
LOG.d("send onboarding send-from-alias email to user %s", user)
|
||||
onboarding_send_from_alias(user)
|
||||
elif job.name == JOB_ONBOARDING_2:
|
||||
user_id = job.payload.get("user_id")
|
||||
user = User.get(user_id)
|
||||
|
||||
Mailbox.delete(mailbox_id)
|
||||
Session.commit()
|
||||
LOG.d("Mailbox %s %s deleted", mailbox_id, mailbox_email)
|
||||
# user might delete their account in the meantime
|
||||
# or disable the notification
|
||||
if user and user.notification and user.activated:
|
||||
LOG.d("send onboarding mailbox email to user %s", user)
|
||||
onboarding_mailbox(user)
|
||||
elif job.name == JOB_ONBOARDING_4:
|
||||
user_id = job.payload.get("user_id")
|
||||
user = User.get(user_id)
|
||||
|
||||
send_email(
|
||||
user.email,
|
||||
f"Your mailbox {mailbox_email} has been deleted",
|
||||
f"""Mailbox {mailbox_email} along with its aliases are deleted successfully.
|
||||
Regards,
|
||||
SimpleLogin team.
|
||||
""",
|
||||
)
|
||||
# user might delete their account in the meantime
|
||||
# or disable the notification
|
||||
if user and user.notification and user.activated:
|
||||
LOG.d("send onboarding pgp email to user %s", user)
|
||||
onboarding_pgp(user)
|
||||
|
||||
elif job.name == JOB_DELETE_DOMAIN:
|
||||
custom_domain_id = job.payload.get("custom_domain_id")
|
||||
custom_domain = CustomDomain.get(custom_domain_id)
|
||||
if not custom_domain:
|
||||
continue
|
||||
elif job.name == JOB_BATCH_IMPORT:
|
||||
batch_import_id = job.payload.get("batch_import_id")
|
||||
batch_import = BatchImport.get(batch_import_id)
|
||||
handle_batch_import(batch_import)
|
||||
elif job.name == JOB_DELETE_ACCOUNT:
|
||||
user_id = job.payload.get("user_id")
|
||||
user = User.get(user_id)
|
||||
|
||||
domain_name = custom_domain.domain
|
||||
user = custom_domain.user
|
||||
if not user:
|
||||
LOG.i("No user found for %s", user_id)
|
||||
continue
|
||||
|
||||
CustomDomain.delete(custom_domain.id)
|
||||
Session.commit()
|
||||
user_email = user.email
|
||||
LOG.w("Delete user %s", user)
|
||||
User.delete(user.id)
|
||||
Session.commit()
|
||||
|
||||
LOG.d("Domain %s deleted", domain_name)
|
||||
send_email(
|
||||
user_email,
|
||||
"Your SimpleLogin account has been deleted",
|
||||
render("transactional/account-delete.txt"),
|
||||
render("transactional/account-delete.html"),
|
||||
)
|
||||
elif job.name == JOB_DELETE_MAILBOX:
|
||||
mailbox_id = job.payload.get("mailbox_id")
|
||||
mailbox = Mailbox.get(mailbox_id)
|
||||
if not mailbox:
|
||||
continue
|
||||
|
||||
send_email(
|
||||
user.email,
|
||||
f"Your domain {domain_name} has been deleted",
|
||||
f"""Domain {domain_name} along with its aliases are deleted successfully.
|
||||
Regards,
|
||||
SimpleLogin team.
|
||||
""",
|
||||
)
|
||||
mailbox_email = mailbox.email
|
||||
user = mailbox.user
|
||||
|
||||
else:
|
||||
LOG.e("Unknown job name %s", job.name)
|
||||
Mailbox.delete(mailbox_id)
|
||||
Session.commit()
|
||||
LOG.d("Mailbox %s %s deleted", mailbox_id, mailbox_email)
|
||||
|
||||
time.sleep(10)
|
||||
send_email(
|
||||
user.email,
|
||||
f"Your mailbox {mailbox_email} has been deleted",
|
||||
f"""Mailbox {mailbox_email} along with its aliases are deleted successfully.
|
||||
Regards,
|
||||
SimpleLogin team.
|
||||
""",
|
||||
)
|
||||
|
||||
elif job.name == JOB_DELETE_DOMAIN:
|
||||
custom_domain_id = job.payload.get("custom_domain_id")
|
||||
custom_domain = CustomDomain.get(custom_domain_id)
|
||||
if not custom_domain:
|
||||
continue
|
||||
|
||||
domain_name = custom_domain.domain
|
||||
user = custom_domain.user
|
||||
|
||||
CustomDomain.delete(custom_domain.id)
|
||||
Session.commit()
|
||||
|
||||
LOG.d("Domain %s deleted", domain_name)
|
||||
|
||||
send_email(
|
||||
user.email,
|
||||
f"Your domain {domain_name} has been deleted",
|
||||
f"""Domain {domain_name} along with its aliases are deleted successfully.
|
||||
Regards,
|
||||
SimpleLogin team.
|
||||
""",
|
||||
)
|
||||
|
||||
else:
|
||||
LOG.e("Unknown job name %s", job.name)
|
||||
|
||||
time.sleep(10)
|
||||
|
|
Loading…
Reference in New Issue