From f3ca74d9b356e96d349296d76913d2eb690cbfda Mon Sep 17 00:00:00 2001 From: Carlos Quintana <74399022+cquintana92@users.noreply.github.com> Date: Mon, 14 Oct 2024 20:25:17 +0200 Subject: [PATCH] chore: unify change alias mailboxes logic (#2262) --- app/alias_mailbox_utils.py | 54 ++++++++++++++++++++++++++ app/api/views/alias.py | 32 ++++------------ app/api/views/custom_domain.py | 27 +++++-------- tests/test_alias_mailbox_utils.py | 63 +++++++++++++++++++++++++++++++ 4 files changed, 134 insertions(+), 42 deletions(-) create mode 100644 app/alias_mailbox_utils.py create mode 100644 tests/test_alias_mailbox_utils.py diff --git a/app/alias_mailbox_utils.py b/app/alias_mailbox_utils.py new file mode 100644 index 00000000..ac23aa68 --- /dev/null +++ b/app/alias_mailbox_utils.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional + +from app.db import Session +from app.models import Alias, AliasMailbox, Mailbox + +_MAX_MAILBOXES_PER_ALIAS = 20 + + +class CannotSetMailboxesForAliasCause(Enum): + Forbidden = "Forbidden" + EmptyMailboxes = "Must choose at least one mailbox" + TooManyMailboxes = "Too many mailboxes" + + +@dataclass +class SetMailboxesForAliasResult: + performed_change: bool + reason: Optional[CannotSetMailboxesForAliasCause] + + +def set_mailboxes_for_alias( + user_id: int, alias: Alias, mailbox_ids: List[int] +) -> Optional[CannotSetMailboxesForAliasCause]: + if len(mailbox_ids) == 0: + return CannotSetMailboxesForAliasCause.EmptyMailboxes + if len(mailbox_ids) > _MAX_MAILBOXES_PER_ALIAS: + return CannotSetMailboxesForAliasCause.TooManyMailboxes + + mailboxes = ( + Session.query(Mailbox) + .filter( + Mailbox.id.in_(mailbox_ids), + Mailbox.user_id == user_id, + Mailbox.verified == True, # noqa: E712 + ) + .all() + ) + if len(mailboxes) != len(mailbox_ids): + return CannotSetMailboxesForAliasCause.Forbidden + + # first remove all existing alias-mailboxes links + AliasMailbox.filter_by(alias_id=alias.id).delete() + Session.flush() + + # then add all new mailboxes, being the first the one associated with the alias + for i, mailbox in enumerate(mailboxes): + if i == 0: + alias.mailbox_id = mailboxes[0].id + else: + AliasMailbox.create(alias_id=alias.id, mailbox_id=mailbox.id) + + return None diff --git a/app/api/views/alias.py b/app/api/views/alias.py index 572ef21b..e0c73526 100644 --- a/app/api/views/alias.py +++ b/app/api/views/alias.py @@ -5,6 +5,7 @@ from flask import request from app import alias_utils from app.alias_audit_log_utils import emit_alias_audit_log, AliasAuditLogAction +from app.alias_mailbox_utils import set_mailboxes_for_alias from app.api.base import api_bp, require_api_auth from app.api.serializer import ( AliasInfo, @@ -27,7 +28,7 @@ from app.errors import ( ) from app.extensions import limiter from app.log import LOG -from app.models import Alias, Contact, Mailbox, AliasMailbox, AliasDeleteReason +from app.models import Alias, Contact, Mailbox, AliasDeleteReason @deprecated @@ -297,30 +298,11 @@ def update_alias(alias_id): if "mailbox_ids" in data: mailbox_ids = [int(m_id) for m_id in data.get("mailbox_ids")] - mailboxes: [Mailbox] = [] - - # check if all mailboxes belong to user - for mailbox_id in mailbox_ids: - mailbox = Mailbox.get(mailbox_id) - if not mailbox or mailbox.user_id != user.id or not mailbox.verified: - return jsonify(error="Forbidden"), 400 - mailboxes.append(mailbox) - - if not mailboxes: - return jsonify(error="Must choose at least one mailbox"), 400 - - # <<< update alias mailboxes >>> - # first remove all existing alias-mailboxes links - AliasMailbox.filter_by(alias_id=alias.id).delete() - Session.flush() - - # then add all new mailboxes - for i, mailbox in enumerate(mailboxes): - if i == 0: - alias.mailbox_id = mailboxes[0].id - else: - AliasMailbox.create(alias_id=alias.id, mailbox_id=mailbox.id) - # <<< END update alias mailboxes >>> + err = set_mailboxes_for_alias( + user_id=user.id, alias=alias, mailbox_ids=mailbox_ids + ) + if err: + return jsonify(error=err.value), 400 mailbox_ids_string = ",".join(map(str, mailbox_ids)) changed_fields.append(f"mailbox_ids ({mailbox_ids_string})") diff --git a/app/api/views/custom_domain.py b/app/api/views/custom_domain.py index b42c957e..522377cc 100644 --- a/app/api/views/custom_domain.py +++ b/app/api/views/custom_domain.py @@ -2,8 +2,10 @@ from flask import g, request from flask import jsonify from app.api.base import api_bp, require_api_auth +from app.custom_domain_utils import set_custom_domain_mailboxes from app.db import Session -from app.models import CustomDomain, DomainDeletedAlias, Mailbox, DomainMailbox +from app.log import LOG +from app.models import CustomDomain, DomainDeletedAlias def custom_domain_to_dict(custom_domain: CustomDomain): @@ -100,23 +102,14 @@ def update_custom_domain(custom_domain_id): if "mailbox_ids" in data: mailbox_ids = [int(m_id) for m_id in data.get("mailbox_ids")] - if mailbox_ids: - # check if mailbox is not tempered with - mailboxes = [] - for mailbox_id in mailbox_ids: - mailbox = Mailbox.get(mailbox_id) - if not mailbox or mailbox.user_id != user.id or not mailbox.verified: - return jsonify(error="Forbidden"), 400 - mailboxes.append(mailbox) - - # first remove all existing domain-mailboxes links - DomainMailbox.filter_by(domain_id=custom_domain.id).delete() - Session.flush() - - for mailbox in mailboxes: - DomainMailbox.create(domain_id=custom_domain.id, mailbox_id=mailbox.id) - + result = set_custom_domain_mailboxes(user.id, custom_domain, mailbox_ids) + if result.success: changed = True + else: + LOG.info( + f"Prevented from updating mailboxes [custom_domain_id={custom_domain.id}]: {result.reason.value}" + ) + return jsonify(error="Forbidden"), 400 if changed: Session.commit() diff --git a/tests/test_alias_mailbox_utils.py b/tests/test_alias_mailbox_utils.py new file mode 100644 index 00000000..79394625 --- /dev/null +++ b/tests/test_alias_mailbox_utils.py @@ -0,0 +1,63 @@ +from typing import Tuple + +from app.alias_mailbox_utils import ( + set_mailboxes_for_alias, + CannotSetMailboxesForAliasCause, +) +from app.models import Alias, Mailbox, User, AliasMailbox +from tests.utils import create_new_user, random_email + + +def setup() -> Tuple[User, Alias]: + user = create_new_user() + alias = Alias.create( + user_id=user.id, + email=random_email(), + mailbox_id=user.default_mailbox_id, + commit=True, + ) + return user, alias + + +def test_set_mailboxes_for_alias_empty_list(): + user, alias = setup() + err = set_mailboxes_for_alias(user.id, alias, []) + assert err is CannotSetMailboxesForAliasCause.EmptyMailboxes + + +def test_set_mailboxes_for_alias_mailbox_for_other_user(): + user, alias = setup() + another_user = create_new_user() + err = set_mailboxes_for_alias(user.id, alias, [another_user.default_mailbox_id]) + assert err is CannotSetMailboxesForAliasCause.Forbidden + + +def test_set_mailboxes_for_alias_mailbox_not_exists(): + user, alias = setup() + err = set_mailboxes_for_alias(user.id, alias, [9999999]) + assert err is CannotSetMailboxesForAliasCause.Forbidden + + +def test_set_mailboxes_for_alias_mailbox_success(): + user, alias = setup() + mb1 = Mailbox.create( + user_id=user.id, + email=random_email(), + verified=True, + ) + mb2 = Mailbox.create( + user_id=user.id, + email=random_email(), + verified=True, + commit=True, + ) + err = set_mailboxes_for_alias(user.id, alias, [mb1.id, mb2.id]) + assert err is None + + db_alias = Alias.get_by(id=alias.id) + assert db_alias is not None + assert db_alias.mailbox_id == mb1.id + + alias_mailboxes = AliasMailbox.filter_by(alias_id=alias.id).all() + assert len(alias_mailboxes) == 1 + assert alias_mailboxes[0].mailbox_id == mb2.id