From 0da1811311aedc5c1698fa2d6a5d012332f3d203 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Mon, 18 Mar 2024 16:00:21 +0100 Subject: [PATCH] Cleanup old data (#2066) * Cleanup tasks * Update * Added tests * Create cron job * Delete old data cron * Fix import * import fix * Added delete + script to disable pgp for proton mboxes --- app/config.py | 2 + app/s3.py | 59 +++++++-------- cron.py | 14 ++++ crontab.yml | 6 ++ tasks/__init__.py | 0 tasks/cleanup_old_imports.py | 19 +++++ tasks/cleanup_old_jobs.py | 24 +++++++ tasks/cleanup_old_notifications.py | 12 ++++ tests/cron/test_get_alias_for_hibp.py | 28 +++++++- tests/tasks/__init__.py | 0 tests/tasks/test_cleanup_old_imports.py | 35 +++++++++ tests/tasks/test_cleanup_old_jobs.py | 72 +++++++++++++++++++ tests/tasks/test_cleanup_old_notifications.py | 26 +++++++ 13 files changed, 262 insertions(+), 35 deletions(-) create mode 100644 tasks/__init__.py create mode 100644 tasks/cleanup_old_imports.py create mode 100644 tasks/cleanup_old_jobs.py create mode 100644 tasks/cleanup_old_notifications.py create mode 100644 tests/tasks/__init__.py create mode 100644 tests/tasks/test_cleanup_old_imports.py create mode 100644 tests/tasks/test_cleanup_old_jobs.py create mode 100644 tests/tasks/test_cleanup_old_notifications.py diff --git a/app/config.py b/app/config.py index c55c555f..7a59e91b 100644 --- a/app/config.py +++ b/app/config.py @@ -434,6 +434,8 @@ HIBP_MAX_ALIAS_CHECK = 10_000 HIBP_RPM = 100 HIBP_SKIP_PARTNER_ALIAS = os.environ.get("HIBP_SKIP_PARTNER_ALIAS") +KEEP_OLD_DATA_DAYS = 30 + POSTMASTER = os.environ.get("POSTMASTER") # store temporary files, especially for debugging diff --git a/app/s3.py b/app/s3.py index 32d80bbd..47f9cf99 100644 --- a/app/s3.py +++ b/app/s3.py @@ -5,19 +5,9 @@ from typing import Optional import boto3 import requests -from app.config import ( - AWS_REGION, - BUCKET, - AWS_ACCESS_KEY_ID, - AWS_SECRET_ACCESS_KEY, - LOCAL_FILE_UPLOAD, - UPLOAD_DIR, - URL, - AWS_ENDPOINT_URL, -) +from app import config from app.log import LOG - _s3_client = None @@ -25,12 +15,12 @@ def _get_s3client(): global _s3_client if _s3_client is None: args = { - "aws_access_key_id": AWS_ACCESS_KEY_ID, - "aws_secret_access_key": AWS_SECRET_ACCESS_KEY, - "region_name": AWS_REGION, + "aws_access_key_id": config.AWS_ACCESS_KEY_ID, + "aws_secret_access_key": config.AWS_SECRET_ACCESS_KEY, + "region_name": config.AWS_REGION, } - if AWS_ENDPOINT_URL: - args["endpoint_url"] = AWS_ENDPOINT_URL + if config.AWS_ENDPOINT_URL: + args["endpoint_url"] = config.AWS_ENDPOINT_URL _s3_client = boto3.client("s3", **args) return _s3_client @@ -38,8 +28,8 @@ def _get_s3client(): def upload_from_bytesio(key: str, bs: BytesIO, content_type="application/octet-stream"): bs.seek(0) - if LOCAL_FILE_UPLOAD: - file_path = os.path.join(UPLOAD_DIR, key) + if config.LOCAL_FILE_UPLOAD: + file_path = os.path.join(config.UPLOAD_DIR, key) file_dir = os.path.dirname(file_path) os.makedirs(file_dir, exist_ok=True) with open(file_path, "wb") as f: @@ -47,7 +37,7 @@ def upload_from_bytesio(key: str, bs: BytesIO, content_type="application/octet-s else: _get_s3client().put_object( - Bucket=BUCKET, + Bucket=config.BUCKET, Key=key, Body=bs, ContentType=content_type, @@ -57,8 +47,8 @@ def upload_from_bytesio(key: str, bs: BytesIO, content_type="application/octet-s def upload_email_from_bytesio(path: str, bs: BytesIO, filename): bs.seek(0) - if LOCAL_FILE_UPLOAD: - file_path = os.path.join(UPLOAD_DIR, path) + if config.LOCAL_FILE_UPLOAD: + file_path = os.path.join(config.UPLOAD_DIR, path) file_dir = os.path.dirname(file_path) os.makedirs(file_dir, exist_ok=True) with open(file_path, "wb") as f: @@ -66,7 +56,7 @@ def upload_email_from_bytesio(path: str, bs: BytesIO, filename): else: _get_s3client().put_object( - Bucket=BUCKET, + Bucket=config.BUCKET, Key=path, Body=bs, # Support saving a remote file using Http header @@ -77,12 +67,12 @@ def upload_email_from_bytesio(path: str, bs: BytesIO, filename): def download_email(path: str) -> Optional[str]: - if LOCAL_FILE_UPLOAD: - file_path = os.path.join(UPLOAD_DIR, path) + if config.LOCAL_FILE_UPLOAD: + file_path = os.path.join(config.UPLOAD_DIR, path) with open(file_path, "rb") as f: return f.read() resp = _get_s3client().get_object( - Bucket=BUCKET, + Bucket=config.BUCKET, Key=path, ) if not resp or "Body" not in resp: @@ -96,29 +86,30 @@ def upload_from_url(url: str, upload_path): def get_url(key: str, expires_in=3600) -> str: - if LOCAL_FILE_UPLOAD: - return URL + "/static/upload/" + key + if config.LOCAL_FILE_UPLOAD: + return config.URL + "/static/upload/" + key else: return _get_s3client().generate_presigned_url( ExpiresIn=expires_in, ClientMethod="get_object", - Params={"Bucket": BUCKET, "Key": key}, + Params={"Bucket": config.BUCKET, "Key": key}, ) def delete(path: str): - if LOCAL_FILE_UPLOAD: - os.remove(os.path.join(UPLOAD_DIR, path)) + if config.LOCAL_FILE_UPLOAD: + file_path = os.path.join(config.UPLOAD_DIR, path) + os.remove(file_path) else: - _get_s3client().delete_object(Bucket=BUCKET, Key=path) + _get_s3client().delete_object(Bucket=config.BUCKET, Key=path) def create_bucket_if_not_exists(): s3client = _get_s3client() buckets = s3client.list_buckets() for bucket in buckets["Buckets"]: - if bucket["Name"] == BUCKET: + if bucket["Name"] == config.BUCKET: LOG.i("Bucket already exists") return - s3client.create_bucket(Bucket=BUCKET) - LOG.i(f"Bucket {BUCKET} created") + s3client.create_bucket(Bucket=config.BUCKET) + LOG.i(f"Bucket {config.BUCKET} created") diff --git a/cron.py b/cron.py index b84aba79..7466af78 100644 --- a/cron.py +++ b/cron.py @@ -61,6 +61,9 @@ from app.pgp_utils import load_public_key_and_check, PGPException from app.proton.utils import get_proton_partner from app.utils import sanitize_email from server import create_light_app +from tasks.cleanup_old_imports import cleanup_old_imports +from tasks.cleanup_old_jobs import cleanup_old_jobs +from tasks.cleanup_old_notifications import cleanup_old_notifications DELETE_GRACE_DAYS = 30 @@ -1221,6 +1224,13 @@ def clear_users_scheduled_to_be_deleted(dry_run=False): Session.commit() +def delete_old_data(): + oldest_valid = arrow.now().shift(days=-config.KEEP_OLD_DATA_DAYS) + cleanup_old_imports(oldest_valid) + cleanup_old_jobs(oldest_valid) + cleanup_old_notifications(oldest_valid) + + if __name__ == "__main__": LOG.d("Start running cronjob") parser = argparse.ArgumentParser() @@ -1235,6 +1245,7 @@ if __name__ == "__main__": "notify_manual_subscription_end", "notify_premium_end", "delete_logs", + "delete_old_data", "poll_apple_subscription", "sanity_check", "delete_old_monitoring", @@ -1263,6 +1274,9 @@ if __name__ == "__main__": elif args.job == "delete_logs": LOG.d("Deleted Logs") delete_logs() + elif args.job == "delete_old_data": + LOG.d("Delete old data") + delete_old_data() elif args.job == "poll_apple_subscription": LOG.d("Poll Apple Subscriptions") poll_apple_subscription() diff --git a/crontab.yml b/crontab.yml index 75a5d201..e780ac80 100644 --- a/crontab.yml +++ b/crontab.yml @@ -37,6 +37,12 @@ jobs: schedule: "15 5 * * *" captureStderr: true + - name: SimpleLogin Delete Old data + command: python /code/cron.py -j delete_old_data + shell: /bin/bash + schedule: "30 5 * * *" + captureStderr: true + - name: SimpleLogin Poll Apple Subscriptions command: python /code/cron.py -j poll_apple_subscription shell: /bin/bash diff --git a/tasks/__init__.py b/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tasks/cleanup_old_imports.py b/tasks/cleanup_old_imports.py new file mode 100644 index 00000000..8a056d05 --- /dev/null +++ b/tasks/cleanup_old_imports.py @@ -0,0 +1,19 @@ +import arrow + +from app import s3 +from app.log import LOG +from app.models import BatchImport + + +def cleanup_old_imports(oldest_allowed: arrow.Arrow): + LOG.i(f"Deleting imports older than {oldest_allowed}") + for batch_import in ( + BatchImport.filter(BatchImport.created_at < oldest_allowed).yield_per(500).all() + ): + LOG.i( + f"Deleting batch import {batch_import} with file {batch_import.file.path}" + ) + file = batch_import.file + if file is not None: + s3.delete(file.path) + BatchImport.delete(batch_import.id, commit=True) diff --git a/tasks/cleanup_old_jobs.py b/tasks/cleanup_old_jobs.py new file mode 100644 index 00000000..cdb7910d --- /dev/null +++ b/tasks/cleanup_old_jobs.py @@ -0,0 +1,24 @@ +import arrow +from sqlalchemy import or_, and_ + +from app import config +from app.db import Session +from app.log import LOG +from app.models import Job, JobState + + +def cleanup_old_jobs(oldest_allowed: arrow.Arrow): + LOG.i(f"Deleting jobs older than {oldest_allowed}") + count = Job.filter( + or_( + Job.state == JobState.done.value, + Job.state == JobState.error.value, + and_( + Job.state == JobState.taken.value, + Job.attempts >= config.JOB_MAX_ATTEMPTS, + ), + ), + Job.updated_at < oldest_allowed, + ).delete() + Session.commit() + LOG.i(f"Deleted {count} jobs") diff --git a/tasks/cleanup_old_notifications.py b/tasks/cleanup_old_notifications.py new file mode 100644 index 00000000..2b1689a7 --- /dev/null +++ b/tasks/cleanup_old_notifications.py @@ -0,0 +1,12 @@ +import arrow + +from app.db import Session +from app.log import LOG +from app.models import Notification + + +def cleanup_old_notifications(oldest_allowed: arrow.Arrow): + LOG.i(f"Deleting notifications older than {oldest_allowed}") + count = Notification.filter(Notification.created_at < oldest_allowed).delete() + Session.commit() + LOG.i(f"Deleted {count} notifications") diff --git a/tests/cron/test_get_alias_for_hibp.py b/tests/cron/test_get_alias_for_hibp.py index ca7d82ae..ba41666c 100644 --- a/tests/cron/test_get_alias_for_hibp.py +++ b/tests/cron/test_get_alias_for_hibp.py @@ -28,7 +28,7 @@ def test_get_alias_for_free_user_has_no_alias(): assert len(aliases) == 0 -def test_get_alias_for_lifetime(): +def test_get_alias_for_lifetime_with_null_hibp_date(): user = create_new_user() user.lifetime = True alias_id = Alias.create_new_random(user).id @@ -39,6 +39,19 @@ def test_get_alias_for_lifetime(): assert alias_id == aliases[0].id +def test_get_alias_for_lifetime_with_old_hibp_date(): + user = create_new_user() + user.lifetime = True + alias = Alias.create_new_random(user) + alias.hibp_last_check = arrow.now().shift(days=-1) + alias_id = alias.id + Session.commit() + aliases = list( + cron.get_alias_to_check_hibp(arrow.now(), [], alias_id, alias_id + 1) + ) + assert alias_id == aliases[0].id + + def create_partner_sub(user: User): pu = PartnerUser.create( partner_id=get_proton_partner().id, @@ -114,3 +127,16 @@ def test_skipped_user_is_not_checked(): cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1) ) assert len(aliases) == 0 + + +def test_already_checked_is_not_checked(): + user = create_new_user() + user.lifetime = True + alias = Alias.create_new_random(user) + alias.hibp_last_check = arrow.now().shift(days=1) + alias_id = alias.id + Session.commit() + aliases = list( + cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1) + ) + assert len(aliases) == 0 diff --git a/tests/tasks/__init__.py b/tests/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/tasks/test_cleanup_old_imports.py b/tests/tasks/test_cleanup_old_imports.py new file mode 100644 index 00000000..14deff51 --- /dev/null +++ b/tests/tasks/test_cleanup_old_imports.py @@ -0,0 +1,35 @@ +import tempfile +from io import BytesIO + +import arrow + +from app import s3, config +from app.models import File, BatchImport +from tasks.cleanup_old_imports import cleanup_old_imports +from tests.utils import random_token, create_new_user + + +def test_cleanup_old_imports(): + BatchImport.filter().delete() + with tempfile.TemporaryDirectory() as tmpdir: + config.UPLOAD_DIR = tmpdir + user = create_new_user() + path = random_token() + s3.upload_from_bytesio(path, BytesIO("data".encode("utf-8"))) + file = File.create(path=path, commit=True) # noqa: F821 + now = arrow.now() + delete_batch_import_id = BatchImport.create( + user_id=user.id, + file_id=file.id, + created_at=now.shift(minutes=-1), + flush=True, + ).id + keep_batch_import_id = BatchImport.create( + user_id=user.id, + file_id=file.id, + created_at=now.shift(minutes=+1), + commit=True, + ).id + cleanup_old_imports(now) + assert BatchImport.get(id=delete_batch_import_id) is None + assert BatchImport.get(id=keep_batch_import_id) is not None diff --git a/tests/tasks/test_cleanup_old_jobs.py b/tests/tasks/test_cleanup_old_jobs.py new file mode 100644 index 00000000..81c86d37 --- /dev/null +++ b/tests/tasks/test_cleanup_old_jobs.py @@ -0,0 +1,72 @@ +import arrow + +from app import config +from app.models import Job, JobState +from tasks.cleanup_old_jobs import cleanup_old_jobs + + +def test_cleanup_old_jobs(): + Job.filter().delete() + now = arrow.now() + delete_ids = [ + Job.create( + updated_at=now.shift(minutes=-1), + state=JobState.done.value, + name="", + payload="", + flush=True, + ).id, + Job.create( + updated_at=now.shift(minutes=-1), + state=JobState.error.value, + name="", + payload="", + flush=True, + ).id, + Job.create( + updated_at=now.shift(minutes=-1), + state=JobState.taken.value, + attempts=config.JOB_MAX_ATTEMPTS, + name="", + payload="", + flush=True, + ).id, + ] + + keep_ids = [ + Job.create( + updated_at=now.shift(minutes=+1), + state=JobState.done.value, + name="", + payload="", + flush=True, + ).id, + Job.create( + updated_at=now.shift(minutes=+1), + state=JobState.error.value, + name="", + payload="", + flush=True, + ).id, + Job.create( + updated_at=now.shift(minutes=+1), + state=JobState.taken.value, + attempts=config.JOB_MAX_ATTEMPTS, + name="", + payload="", + flush=True, + ).id, + Job.create( + updated_at=now.shift(minutes=-1), + state=JobState.taken.value, + attempts=config.JOB_MAX_ATTEMPTS - 1, + name="", + payload="", + flush=True, + ).id, + ] + cleanup_old_jobs(now) + for delete_id in delete_ids: + assert Job.get(id=delete_id) is None + for keep_id in keep_ids: + assert Job.get(id=keep_id) is not None diff --git a/tests/tasks/test_cleanup_old_notifications.py b/tests/tasks/test_cleanup_old_notifications.py new file mode 100644 index 00000000..68381cbc --- /dev/null +++ b/tests/tasks/test_cleanup_old_notifications.py @@ -0,0 +1,26 @@ +import arrow + +from app.models import Notification +from tasks.cleanup_old_notifications import cleanup_old_notifications +from tests.utils import create_new_user + + +def test_cleanup_old_notifications(): + Notification.filter().delete() + user = create_new_user() + now = arrow.now() + delete_id = Notification.create( + user_id=user.id, + created_at=now.shift(minutes=-1), + message="", + flush=True, + ).id + keep_id = Notification.create( + user_id=user.id, + created_at=now.shift(minutes=+1), + message="", + flush=True, + ).id + cleanup_old_notifications(now) + assert Notification.get(id=delete_id) is None + assert Notification.get(id=keep_id) is not None