From 5fa41d6ccf320ee57516d529c089459b58160470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Tue, 28 Jun 2022 09:22:48 +0200 Subject: [PATCH] Add state management to job (#1113) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add state management to job * Add migration Co-authored-by: Adrià Casajús --- app/models.py | 15 ++ job_runner.py | 237 +++++++++--------- ...22_062714_bfebc2d5c719_add_state_to_job.py | 33 +++ 3 files changed, 172 insertions(+), 113 deletions(-) create mode 100644 migrations/versions/2022_062714_bfebc2d5c719_add_state_to_job.py diff --git a/app/models.py b/app/models.py index 6156756b..b63dc7ac 100644 --- a/app/models.py +++ b/app/models.py @@ -262,6 +262,13 @@ class VerpType(EnumE): transactional = 2 +class JobState(EnumE): + ready = 0 + taken = 1 + done = 2 + error = 3 + + class Hibp(Base, ModelMixin): __tablename__ = "hibp" name = sa.Column(sa.String(), nullable=False, unique=True, index=True) @@ -2370,6 +2377,14 @@ class Job(Base, ModelMixin): # whether the job has been taken by the job runner taken = sa.Column(sa.Boolean, default=False, nullable=False) run_at = sa.Column(ArrowType) + state = sa.Column( + sa.Integer, + nullable=False, + server_default=str(JobState.ready.value), + default=JobState.ready.value, + ) + attempts = sa.Column(sa.Integer, nullable=False, server_default="0", default=0) + taken_at = sa.Column(ArrowType, nullable=True) def __repr__(self): return f"" diff --git a/job_runner.py b/job_runner.py index 24773e43..c8fd5b97 100644 --- a/job_runner.py +++ b/job_runner.py @@ -15,7 +15,7 @@ from app.email_utils import ( from app.import_utils import handle_batch_import from app.jobs.export_user_data_job import ExportUserDataJob from app.log import LOG -from app.models import User, Job, BatchImport, Mailbox, CustomDomain +from app.models import User, Job, BatchImport, Mailbox, CustomDomain, JobState from server import create_light_app @@ -106,6 +106,120 @@ def welcome_proton(user): ) +def process_job(job: Job): + if job.name == config.JOB_ONBOARDING_1: + user_id = job.payload.get("user_id") + user = User.get(user_id) + + # user might delete their account in the meantime + # or disable the notification + if user and user.notification and user.activated: + LOG.d("send onboarding send-from-alias email to user %s", user) + onboarding_send_from_alias(user) + elif job.name == config.JOB_ONBOARDING_2: + user_id = job.payload.get("user_id") + user = User.get(user_id) + + # user might delete their account in the meantime + # or disable the notification + if user and user.notification and user.activated: + LOG.d("send onboarding mailbox email to user %s", user) + onboarding_mailbox(user) + elif job.name == config.JOB_ONBOARDING_4: + user_id = job.payload.get("user_id") + user = User.get(user_id) + + # user might delete their account in the meantime + # or disable the notification + if user and user.notification and user.activated: + LOG.d("send onboarding pgp email to user %s", user) + onboarding_pgp(user) + + elif job.name == config.JOB_BATCH_IMPORT: + batch_import_id = job.payload.get("batch_import_id") + batch_import = BatchImport.get(batch_import_id) + handle_batch_import(batch_import) + elif job.name == config.JOB_DELETE_ACCOUNT: + user_id = job.payload.get("user_id") + user = User.get(user_id) + + if not user: + LOG.i("No user found for %s", user_id) + return + + user_email = user.email + LOG.w("Delete user %s", user) + User.delete(user.id) + Session.commit() + + send_email( + user_email, + "Your SimpleLogin account has been deleted", + render("transactional/account-delete.txt"), + render("transactional/account-delete.html"), + retries=3, + ) + elif job.name == config.JOB_DELETE_MAILBOX: + mailbox_id = job.payload.get("mailbox_id") + mailbox = Mailbox.get(mailbox_id) + if not mailbox: + return + + mailbox_email = mailbox.email + user = mailbox.user + + Mailbox.delete(mailbox_id) + Session.commit() + LOG.d("Mailbox %s %s deleted", mailbox_id, mailbox_email) + + send_email( + user.email, + f"Your mailbox {mailbox_email} has been deleted", + f"""Mailbox {mailbox_email} along with its aliases are deleted successfully. +Regards, +SimpleLogin team. +""", + retries=3, + ) + + elif job.name == config.JOB_DELETE_DOMAIN: + custom_domain_id = job.payload.get("custom_domain_id") + custom_domain = CustomDomain.get(custom_domain_id) + if not custom_domain: + return + + domain_name = custom_domain.domain + user = custom_domain.user + + CustomDomain.delete(custom_domain.id) + Session.commit() + + LOG.d("Domain %s deleted", domain_name) + + send_email( + user.email, + f"Your domain {domain_name} has been deleted", + f"""Domain {domain_name} along with its aliases are deleted successfully. + +Regards, +SimpleLogin team. +""", + retries=3, + ) + elif job.name == config.JOB_SEND_USER_REPORT: + export_job = ExportUserDataJob.create_from_job(job) + if export_job: + export_job.run() + elif job.name == config.JOB_SEND_PROTON_WELCOME_1: + user_id = job.payload.get("user_id") + user = User.get(user_id) + if user and user.activated: + LOG.d("send proton welcome email to user %s", user) + welcome_proton(user) + else: + LOG.e("Unknown job name %s", job.name) + + if __name__ == "__main__": while True: # wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc @@ -114,6 +228,8 @@ if __name__ == "__main__": min_dt = arrow.now().shift(hours=-1) max_dt = arrow.now().shift(hours=1) + # TODO: Change This condition after deploying this MR + # to Job.state == ready or (Job.state == taken and job.taken_at < arrow.now.shift(minutes=-10)) for job in Job.filter( Job.taken.is_(False), Job.run_at > min_dt, Job.run_at <= max_dt ).all(): @@ -121,118 +237,13 @@ if __name__ == "__main__": # mark the job as taken, whether it will be executed successfully or not job.taken = True + job.taken_at = arrow.now() + job.state = JobState.taken.value + job.attempts += 1 + Session.commit() + process_job(job) + + job.state = JobState.done.value Session.commit() - if job.name == config.JOB_ONBOARDING_1: - user_id = job.payload.get("user_id") - user = User.get(user_id) - - # user might delete their account in the meantime - # or disable the notification - if user and user.notification and user.activated: - LOG.d("send onboarding send-from-alias email to user %s", user) - onboarding_send_from_alias(user) - elif job.name == config.JOB_ONBOARDING_2: - user_id = job.payload.get("user_id") - user = User.get(user_id) - - # user might delete their account in the meantime - # or disable the notification - if user and user.notification and user.activated: - LOG.d("send onboarding mailbox email to user %s", user) - onboarding_mailbox(user) - elif job.name == config.JOB_ONBOARDING_4: - user_id = job.payload.get("user_id") - user = User.get(user_id) - - # user might delete their account in the meantime - # or disable the notification - if user and user.notification and user.activated: - LOG.d("send onboarding pgp email to user %s", user) - onboarding_pgp(user) - - elif job.name == config.JOB_BATCH_IMPORT: - batch_import_id = job.payload.get("batch_import_id") - batch_import = BatchImport.get(batch_import_id) - handle_batch_import(batch_import) - elif job.name == config.JOB_DELETE_ACCOUNT: - user_id = job.payload.get("user_id") - user = User.get(user_id) - - if not user: - LOG.i("No user found for %s", user_id) - continue - - user_email = user.email - LOG.w("Delete user %s", user) - User.delete(user.id) - Session.commit() - - send_email( - user_email, - "Your SimpleLogin account has been deleted", - render("transactional/account-delete.txt"), - render("transactional/account-delete.html"), - retries=3, - ) - elif job.name == config.JOB_DELETE_MAILBOX: - mailbox_id = job.payload.get("mailbox_id") - mailbox = Mailbox.get(mailbox_id) - if not mailbox: - continue - - mailbox_email = mailbox.email - user = mailbox.user - - Mailbox.delete(mailbox_id) - Session.commit() - LOG.d("Mailbox %s %s deleted", mailbox_id, mailbox_email) - - send_email( - user.email, - f"Your mailbox {mailbox_email} has been deleted", - f"""Mailbox {mailbox_email} along with its aliases are deleted successfully. -Regards, -SimpleLogin team. -""", - retries=3, - ) - - elif job.name == config.JOB_DELETE_DOMAIN: - custom_domain_id = job.payload.get("custom_domain_id") - custom_domain = CustomDomain.get(custom_domain_id) - if not custom_domain: - continue - - domain_name = custom_domain.domain - user = custom_domain.user - - CustomDomain.delete(custom_domain.id) - Session.commit() - - LOG.d("Domain %s deleted", domain_name) - - send_email( - user.email, - f"Your domain {domain_name} has been deleted", - f"""Domain {domain_name} along with its aliases are deleted successfully. - -Regards, -SimpleLogin team. -""", - retries=3, - ) - elif job.name == config.JOB_SEND_USER_REPORT: - export_job = ExportUserDataJob.create_from_job(job) - if export_job: - export_job.run() - elif job.name == config.JOB_SEND_PROTON_WELCOME_1: - user_id = job.payload.get("user_id") - user = User.get(user_id) - if user and user.activated: - LOG.d("send proton welcome email to user %s", user) - welcome_proton(user) - else: - LOG.e("Unknown job name %s", job.name) - time.sleep(10) diff --git a/migrations/versions/2022_062714_bfebc2d5c719_add_state_to_job.py b/migrations/versions/2022_062714_bfebc2d5c719_add_state_to_job.py new file mode 100644 index 00000000..87a6e77b --- /dev/null +++ b/migrations/versions/2022_062714_bfebc2d5c719_add_state_to_job.py @@ -0,0 +1,33 @@ +"""Add state to job + +Revision ID: bfebc2d5c719 +Revises: d1fb679f7eec +Create Date: 2022-06-27 14:56:58.797121 + +""" +import sqlalchemy_utils +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'bfebc2d5c719' +down_revision = 'd1fb679f7eec' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('job', sa.Column('attempts', sa.Integer(), server_default='0', nullable=False)) + op.add_column('job', sa.Column('state', sa.Integer(), server_default='0', nullable=False)) + op.add_column('job', sa.Column('taken_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('job', 'taken_at') + op.drop_column('job', 'state') + op.drop_column('job', 'attempts') + # ### end Alembic commands ###