mirror of
https://github.com/simple-login/app.git
synced 2024-11-16 17:08:30 +01:00
0da1811311
* Cleanup tasks * Update * Added tests * Create cron job * Delete old data cron * Fix import * import fix * Added delete + script to disable pgp for proton mboxes
35 lines
1.1 KiB
Python
35 lines
1.1 KiB
Python
import tempfile
|
|
from io import BytesIO
|
|
|
|
import arrow
|
|
|
|
from app import s3, config
|
|
from app.models import File, BatchImport
|
|
from tasks.cleanup_old_imports import cleanup_old_imports
|
|
from tests.utils import random_token, create_new_user
|
|
|
|
|
|
def test_cleanup_old_imports():
|
|
BatchImport.filter().delete()
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
config.UPLOAD_DIR = tmpdir
|
|
user = create_new_user()
|
|
path = random_token()
|
|
s3.upload_from_bytesio(path, BytesIO("data".encode("utf-8")))
|
|
file = File.create(path=path, commit=True) # noqa: F821
|
|
now = arrow.now()
|
|
delete_batch_import_id = BatchImport.create(
|
|
user_id=user.id,
|
|
file_id=file.id,
|
|
created_at=now.shift(minutes=-1),
|
|
flush=True,
|
|
).id
|
|
keep_batch_import_id = BatchImport.create(
|
|
user_id=user.id,
|
|
file_id=file.id,
|
|
created_at=now.shift(minutes=+1),
|
|
commit=True,
|
|
).id
|
|
cleanup_old_imports(now)
|
|
assert BatchImport.get(id=delete_batch_import_id) is None
|
|
assert BatchImport.get(id=keep_batch_import_id) is not None
|