From eba9feb856771d7c85be9d5da807a16bdc1d954c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Fri, 15 Mar 2024 13:44:24 +0100 Subject: [PATCH] Cleanup tasks --- app/s3.py | 59 +++++++++++-------------- tasks/__init__.py | 0 tasks/cleanup_old_imports.py | 18 ++++++++ tasks/cleanup_old_jobs.py | 18 ++++++++ tasks/cleanup_old_notifications.py | 11 +++++ tests/cron/test_get_alias_for_hibp.py | 28 +++++++++++- tests/tasks/__init__.py | 0 tests/tasks/test_cleanup_old_imports.py | 35 +++++++++++++++ 8 files changed, 134 insertions(+), 35 deletions(-) create mode 100644 tasks/__init__.py create mode 100644 tasks/cleanup_old_imports.py create mode 100644 tasks/cleanup_old_jobs.py create mode 100644 tasks/cleanup_old_notifications.py create mode 100644 tests/tasks/__init__.py create mode 100644 tests/tasks/test_cleanup_old_imports.py diff --git a/app/s3.py b/app/s3.py index 32d80bbd..47f9cf99 100644 --- a/app/s3.py +++ b/app/s3.py @@ -5,19 +5,9 @@ from typing import Optional import boto3 import requests -from app.config import ( - AWS_REGION, - BUCKET, - AWS_ACCESS_KEY_ID, - AWS_SECRET_ACCESS_KEY, - LOCAL_FILE_UPLOAD, - UPLOAD_DIR, - URL, - AWS_ENDPOINT_URL, -) +from app import config from app.log import LOG - _s3_client = None @@ -25,12 +15,12 @@ def _get_s3client(): global _s3_client if _s3_client is None: args = { - "aws_access_key_id": AWS_ACCESS_KEY_ID, - "aws_secret_access_key": AWS_SECRET_ACCESS_KEY, - "region_name": AWS_REGION, + "aws_access_key_id": config.AWS_ACCESS_KEY_ID, + "aws_secret_access_key": config.AWS_SECRET_ACCESS_KEY, + "region_name": config.AWS_REGION, } - if AWS_ENDPOINT_URL: - args["endpoint_url"] = AWS_ENDPOINT_URL + if config.AWS_ENDPOINT_URL: + args["endpoint_url"] = config.AWS_ENDPOINT_URL _s3_client = boto3.client("s3", **args) return _s3_client @@ -38,8 +28,8 @@ def _get_s3client(): def upload_from_bytesio(key: str, bs: BytesIO, content_type="application/octet-stream"): bs.seek(0) - if LOCAL_FILE_UPLOAD: - file_path = os.path.join(UPLOAD_DIR, key) + if config.LOCAL_FILE_UPLOAD: + file_path = os.path.join(config.UPLOAD_DIR, key) file_dir = os.path.dirname(file_path) os.makedirs(file_dir, exist_ok=True) with open(file_path, "wb") as f: @@ -47,7 +37,7 @@ def upload_from_bytesio(key: str, bs: BytesIO, content_type="application/octet-s else: _get_s3client().put_object( - Bucket=BUCKET, + Bucket=config.BUCKET, Key=key, Body=bs, ContentType=content_type, @@ -57,8 +47,8 @@ def upload_from_bytesio(key: str, bs: BytesIO, content_type="application/octet-s def upload_email_from_bytesio(path: str, bs: BytesIO, filename): bs.seek(0) - if LOCAL_FILE_UPLOAD: - file_path = os.path.join(UPLOAD_DIR, path) + if config.LOCAL_FILE_UPLOAD: + file_path = os.path.join(config.UPLOAD_DIR, path) file_dir = os.path.dirname(file_path) os.makedirs(file_dir, exist_ok=True) with open(file_path, "wb") as f: @@ -66,7 +56,7 @@ def upload_email_from_bytesio(path: str, bs: BytesIO, filename): else: _get_s3client().put_object( - Bucket=BUCKET, + Bucket=config.BUCKET, Key=path, Body=bs, # Support saving a remote file using Http header @@ -77,12 +67,12 @@ def upload_email_from_bytesio(path: str, bs: BytesIO, filename): def download_email(path: str) -> Optional[str]: - if LOCAL_FILE_UPLOAD: - file_path = os.path.join(UPLOAD_DIR, path) + if config.LOCAL_FILE_UPLOAD: + file_path = os.path.join(config.UPLOAD_DIR, path) with open(file_path, "rb") as f: return f.read() resp = _get_s3client().get_object( - Bucket=BUCKET, + Bucket=config.BUCKET, Key=path, ) if not resp or "Body" not in resp: @@ -96,29 +86,30 @@ def upload_from_url(url: str, upload_path): def get_url(key: str, expires_in=3600) -> str: - if LOCAL_FILE_UPLOAD: - return URL + "/static/upload/" + key + if config.LOCAL_FILE_UPLOAD: + return config.URL + "/static/upload/" + key else: return _get_s3client().generate_presigned_url( ExpiresIn=expires_in, ClientMethod="get_object", - Params={"Bucket": BUCKET, "Key": key}, + Params={"Bucket": config.BUCKET, "Key": key}, ) def delete(path: str): - if LOCAL_FILE_UPLOAD: - os.remove(os.path.join(UPLOAD_DIR, path)) + if config.LOCAL_FILE_UPLOAD: + file_path = os.path.join(config.UPLOAD_DIR, path) + os.remove(file_path) else: - _get_s3client().delete_object(Bucket=BUCKET, Key=path) + _get_s3client().delete_object(Bucket=config.BUCKET, Key=path) def create_bucket_if_not_exists(): s3client = _get_s3client() buckets = s3client.list_buckets() for bucket in buckets["Buckets"]: - if bucket["Name"] == BUCKET: + if bucket["Name"] == config.BUCKET: LOG.i("Bucket already exists") return - s3client.create_bucket(Bucket=BUCKET) - LOG.i(f"Bucket {BUCKET} created") + s3client.create_bucket(Bucket=config.BUCKET) + LOG.i(f"Bucket {config.BUCKET} created") diff --git a/tasks/__init__.py b/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tasks/cleanup_old_imports.py b/tasks/cleanup_old_imports.py new file mode 100644 index 00000000..fcd01960 --- /dev/null +++ b/tasks/cleanup_old_imports.py @@ -0,0 +1,18 @@ +import arrow + +from app import s3 +from app.log import LOG +from app.models import BatchImport + + +def cleanup_imports(oldest_allowed: arrow.Arrow): + for batch_import in ( + BatchImport.filter(BatchImport.created_at < oldest_allowed).yield_per(500).all() + ): + LOG.i( + f"Deleting batch import {batch_import} with file {batch_import.file.path}" + ) + file = batch_import.file + if file is not None: + s3.delete(file.path) + BatchImport.delete(batch_import.id) diff --git a/tasks/cleanup_old_jobs.py b/tasks/cleanup_old_jobs.py new file mode 100644 index 00000000..7a5679de --- /dev/null +++ b/tasks/cleanup_old_jobs.py @@ -0,0 +1,18 @@ +import arrow +from sqlalchemy import or_, and_ + +from app import config +from app.log import LOG +from app.models import Job, JobState + + +def cleanup_old_jobs(): + count = Job.filter( + or_( + Job.state == JobState.done, + Job.state == JobState.error, + and_(Job.state == JobState.taken, Job.attempts >= config.JOB_MAX_ATTEMPTS), + ), + Job.updated_at < arrow.now().shift(days=-15), + ).delete() + LOG.i(f"Deleted {count} jobs") diff --git a/tasks/cleanup_old_notifications.py b/tasks/cleanup_old_notifications.py new file mode 100644 index 00000000..68a1925d --- /dev/null +++ b/tasks/cleanup_old_notifications.py @@ -0,0 +1,11 @@ +import arrow + +from app.log import LOG +from app.models import Notification + + +def cleanup_old_notifications(): + count = Notification.filter( + Notification.created_at < arrow.now().shift(days=-15) + ).delete() + LOG.i(f"Deleted {count} notifications") diff --git a/tests/cron/test_get_alias_for_hibp.py b/tests/cron/test_get_alias_for_hibp.py index ca7d82ae..ba41666c 100644 --- a/tests/cron/test_get_alias_for_hibp.py +++ b/tests/cron/test_get_alias_for_hibp.py @@ -28,7 +28,7 @@ def test_get_alias_for_free_user_has_no_alias(): assert len(aliases) == 0 -def test_get_alias_for_lifetime(): +def test_get_alias_for_lifetime_with_null_hibp_date(): user = create_new_user() user.lifetime = True alias_id = Alias.create_new_random(user).id @@ -39,6 +39,19 @@ def test_get_alias_for_lifetime(): assert alias_id == aliases[0].id +def test_get_alias_for_lifetime_with_old_hibp_date(): + user = create_new_user() + user.lifetime = True + alias = Alias.create_new_random(user) + alias.hibp_last_check = arrow.now().shift(days=-1) + alias_id = alias.id + Session.commit() + aliases = list( + cron.get_alias_to_check_hibp(arrow.now(), [], alias_id, alias_id + 1) + ) + assert alias_id == aliases[0].id + + def create_partner_sub(user: User): pu = PartnerUser.create( partner_id=get_proton_partner().id, @@ -114,3 +127,16 @@ def test_skipped_user_is_not_checked(): cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1) ) assert len(aliases) == 0 + + +def test_already_checked_is_not_checked(): + user = create_new_user() + user.lifetime = True + alias = Alias.create_new_random(user) + alias.hibp_last_check = arrow.now().shift(days=1) + alias_id = alias.id + Session.commit() + aliases = list( + cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1) + ) + assert len(aliases) == 0 diff --git a/tests/tasks/__init__.py b/tests/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/tasks/test_cleanup_old_imports.py b/tests/tasks/test_cleanup_old_imports.py new file mode 100644 index 00000000..0de65f56 --- /dev/null +++ b/tests/tasks/test_cleanup_old_imports.py @@ -0,0 +1,35 @@ +import tempfile +from io import BytesIO + +import arrow + +from app import s3, config +from app.models import File, BatchImport +from tasks.cleanup_old_imports import cleanup_imports +from utils import random_token, create_new_user + + +def test_cleanup_old_imports(): + BatchImport.filter().delete() + with tempfile.TemporaryDirectory() as tmpdir: + config.UPLOAD_DIR = tmpdir + user = create_new_user() + path = random_token() + s3.upload_from_bytesio(path, BytesIO("data".encode("utf-8"))) + file = File.create(path=path, commit=True) # noqa: F821 + now = arrow.now() + delete_batch_import_id = BatchImport.create( + user_id=user.id, + file_id=file.id, + created_at=now.shift(minutes=-1), + flush=True, + ).id + keep_batch_import_id = BatchImport.create( + user_id=user.id, + file_id=file.id, + created_at=now.shift(minutes=+1), + commit=True, + ).id + cleanup_imports(now) + assert BatchImport.get(id=delete_batch_import_id) is None + assert BatchImport.get(id=keep_batch_import_id) is not None