From 7ec7e06c2b1dcd98a088b14a4c8667a86c8127f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Thu, 31 Aug 2023 13:42:44 +0200 Subject: [PATCH] Move alias transfer util outside the views to make it importable (#1855) --- app/alias_utils.py | 59 ++++++++++++++++++++++++ app/dashboard/views/alias_transfer.py | 64 +------------------------- tests/dashboard/test_alias_transfer.py | 4 +- 3 files changed, 63 insertions(+), 64 deletions(-) diff --git a/app/alias_utils.py b/app/alias_utils.py index 3034ee88..b02cb1fc 100644 --- a/app/alias_utils.py +++ b/app/alias_utils.py @@ -21,6 +21,8 @@ from app.email_utils import ( send_cannot_create_directory_alias_disabled, get_email_local_part, send_cannot_create_domain_alias, + send_email, + render, ) from app.errors import AliasInTrashError from app.log import LOG @@ -36,6 +38,8 @@ from app.models import ( EmailLog, Contact, AutoCreateRule, + AliasUsedOn, + ClientUser, ) from app.regex_utils import regex_match @@ -399,3 +403,58 @@ def alias_export_csv(user, csv_direct_export=False): output.headers["Content-Disposition"] = "attachment; filename=aliases.csv" output.headers["Content-type"] = "text/csv" return output + + +def transfer_alias(alias, new_user, new_mailboxes: [Mailbox]): + # cannot transfer alias which is used for receiving newsletter + if User.get_by(newsletter_alias_id=alias.id): + raise Exception("Cannot transfer alias that's used to receive newsletter") + + # update user_id + Session.query(Contact).filter(Contact.alias_id == alias.id).update( + {"user_id": new_user.id} + ) + + Session.query(AliasUsedOn).filter(AliasUsedOn.alias_id == alias.id).update( + {"user_id": new_user.id} + ) + + Session.query(ClientUser).filter(ClientUser.alias_id == alias.id).update( + {"user_id": new_user.id} + ) + + # remove existing mailboxes from the alias + Session.query(AliasMailbox).filter(AliasMailbox.alias_id == alias.id).delete() + + # set mailboxes + alias.mailbox_id = new_mailboxes.pop().id + for mb in new_mailboxes: + AliasMailbox.create(alias_id=alias.id, mailbox_id=mb.id) + + # alias has never been transferred before + if not alias.original_owner_id: + alias.original_owner_id = alias.user_id + + # inform previous owner + old_user = alias.user + send_email( + old_user.email, + f"Alias {alias.email} has been received", + render( + "transactional/alias-transferred.txt", + alias=alias, + ), + render( + "transactional/alias-transferred.html", + alias=alias, + ), + ) + + # now the alias belongs to the new user + alias.user_id = new_user.id + + # set some fields back to default + alias.disable_pgp = False + alias.pinned = False + + Session.commit() diff --git a/app/dashboard/views/alias_transfer.py b/app/dashboard/views/alias_transfer.py index af8fab50..8b2e7676 100644 --- a/app/dashboard/views/alias_transfer.py +++ b/app/dashboard/views/alias_transfer.py @@ -7,79 +7,19 @@ from flask import render_template, redirect, url_for, flash, request from flask_login import login_required, current_user from app import config +from app.alias_utils import transfer_alias from app.dashboard.base import dashboard_bp from app.dashboard.views.enter_sudo import sudo_required from app.db import Session -from app.email_utils import send_email, render from app.extensions import limiter from app.log import LOG from app.models import ( Alias, - Contact, - AliasUsedOn, - AliasMailbox, - User, - ClientUser, ) from app.models import Mailbox from app.utils import CSRFValidationForm -def transfer(alias, new_user, new_mailboxes: [Mailbox]): - # cannot transfer alias which is used for receiving newsletter - if User.get_by(newsletter_alias_id=alias.id): - raise Exception("Cannot transfer alias that's used to receive newsletter") - - # update user_id - Session.query(Contact).filter(Contact.alias_id == alias.id).update( - {"user_id": new_user.id} - ) - - Session.query(AliasUsedOn).filter(AliasUsedOn.alias_id == alias.id).update( - {"user_id": new_user.id} - ) - - Session.query(ClientUser).filter(ClientUser.alias_id == alias.id).update( - {"user_id": new_user.id} - ) - - # remove existing mailboxes from the alias - Session.query(AliasMailbox).filter(AliasMailbox.alias_id == alias.id).delete() - - # set mailboxes - alias.mailbox_id = new_mailboxes.pop().id - for mb in new_mailboxes: - AliasMailbox.create(alias_id=alias.id, mailbox_id=mb.id) - - # alias has never been transferred before - if not alias.original_owner_id: - alias.original_owner_id = alias.user_id - - # inform previous owner - old_user = alias.user - send_email( - old_user.email, - f"Alias {alias.email} has been received", - render( - "transactional/alias-transferred.txt", - alias=alias, - ), - render( - "transactional/alias-transferred.html", - alias=alias, - ), - ) - - # now the alias belongs to the new user - alias.user_id = new_user.id - - # set some fields back to default - alias.disable_pgp = False - alias.pinned = False - - Session.commit() - - def hmac_alias_transfer_token(transfer_token: str) -> str: alias_hmac = hmac.new( config.ALIAS_TRANSFER_TOKEN_SECRET.encode("utf-8"), @@ -214,7 +154,7 @@ def alias_transfer_receive_route(): mailboxes, token, ) - transfer(alias, current_user, mailboxes) + transfer_alias(alias, current_user, mailboxes) # reset transfer token alias.transfer_token = None diff --git a/tests/dashboard/test_alias_transfer.py b/tests/dashboard/test_alias_transfer.py index 604c7074..32a063f6 100644 --- a/tests/dashboard/test_alias_transfer.py +++ b/tests/dashboard/test_alias_transfer.py @@ -1,4 +1,4 @@ -from app.dashboard.views import alias_transfer +import app.alias_utils from app.db import Session from app.models import ( Alias, @@ -29,7 +29,7 @@ def test_alias_transfer(flask_client): user_id=new_user.id, email="hey2@example.com", verified=True, commit=True ) - alias_transfer.transfer(alias, new_user, new_user.mailboxes()) + app.alias_utils.transfer_alias(alias, new_user, new_user.mailboxes()) # refresh from db alias = Alias.get(alias.id)