mirror of
https://github.com/simple-login/app.git
synced 2024-11-16 08:58:30 +01:00
chore: unify change alias mailboxes logic (#2262)
This commit is contained in:
parent
bdb0c8bd08
commit
f3ca74d9b3
4 changed files with 134 additions and 42 deletions
54
app/alias_mailbox_utils.py
Normal file
54
app/alias_mailbox_utils.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
from app.db import Session
|
||||
from app.models import Alias, AliasMailbox, Mailbox
|
||||
|
||||
_MAX_MAILBOXES_PER_ALIAS = 20
|
||||
|
||||
|
||||
class CannotSetMailboxesForAliasCause(Enum):
|
||||
Forbidden = "Forbidden"
|
||||
EmptyMailboxes = "Must choose at least one mailbox"
|
||||
TooManyMailboxes = "Too many mailboxes"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SetMailboxesForAliasResult:
|
||||
performed_change: bool
|
||||
reason: Optional[CannotSetMailboxesForAliasCause]
|
||||
|
||||
|
||||
def set_mailboxes_for_alias(
|
||||
user_id: int, alias: Alias, mailbox_ids: List[int]
|
||||
) -> Optional[CannotSetMailboxesForAliasCause]:
|
||||
if len(mailbox_ids) == 0:
|
||||
return CannotSetMailboxesForAliasCause.EmptyMailboxes
|
||||
if len(mailbox_ids) > _MAX_MAILBOXES_PER_ALIAS:
|
||||
return CannotSetMailboxesForAliasCause.TooManyMailboxes
|
||||
|
||||
mailboxes = (
|
||||
Session.query(Mailbox)
|
||||
.filter(
|
||||
Mailbox.id.in_(mailbox_ids),
|
||||
Mailbox.user_id == user_id,
|
||||
Mailbox.verified == True, # noqa: E712
|
||||
)
|
||||
.all()
|
||||
)
|
||||
if len(mailboxes) != len(mailbox_ids):
|
||||
return CannotSetMailboxesForAliasCause.Forbidden
|
||||
|
||||
# first remove all existing alias-mailboxes links
|
||||
AliasMailbox.filter_by(alias_id=alias.id).delete()
|
||||
Session.flush()
|
||||
|
||||
# then add all new mailboxes, being the first the one associated with the alias
|
||||
for i, mailbox in enumerate(mailboxes):
|
||||
if i == 0:
|
||||
alias.mailbox_id = mailboxes[0].id
|
||||
else:
|
||||
AliasMailbox.create(alias_id=alias.id, mailbox_id=mailbox.id)
|
||||
|
||||
return None
|
|
@ -5,6 +5,7 @@ from flask import request
|
|||
|
||||
from app import alias_utils
|
||||
from app.alias_audit_log_utils import emit_alias_audit_log, AliasAuditLogAction
|
||||
from app.alias_mailbox_utils import set_mailboxes_for_alias
|
||||
from app.api.base import api_bp, require_api_auth
|
||||
from app.api.serializer import (
|
||||
AliasInfo,
|
||||
|
@ -27,7 +28,7 @@ from app.errors import (
|
|||
)
|
||||
from app.extensions import limiter
|
||||
from app.log import LOG
|
||||
from app.models import Alias, Contact, Mailbox, AliasMailbox, AliasDeleteReason
|
||||
from app.models import Alias, Contact, Mailbox, AliasDeleteReason
|
||||
|
||||
|
||||
@deprecated
|
||||
|
@ -297,30 +298,11 @@ def update_alias(alias_id):
|
|||
|
||||
if "mailbox_ids" in data:
|
||||
mailbox_ids = [int(m_id) for m_id in data.get("mailbox_ids")]
|
||||
mailboxes: [Mailbox] = []
|
||||
|
||||
# check if all mailboxes belong to user
|
||||
for mailbox_id in mailbox_ids:
|
||||
mailbox = Mailbox.get(mailbox_id)
|
||||
if not mailbox or mailbox.user_id != user.id or not mailbox.verified:
|
||||
return jsonify(error="Forbidden"), 400
|
||||
mailboxes.append(mailbox)
|
||||
|
||||
if not mailboxes:
|
||||
return jsonify(error="Must choose at least one mailbox"), 400
|
||||
|
||||
# <<< update alias mailboxes >>>
|
||||
# first remove all existing alias-mailboxes links
|
||||
AliasMailbox.filter_by(alias_id=alias.id).delete()
|
||||
Session.flush()
|
||||
|
||||
# then add all new mailboxes
|
||||
for i, mailbox in enumerate(mailboxes):
|
||||
if i == 0:
|
||||
alias.mailbox_id = mailboxes[0].id
|
||||
else:
|
||||
AliasMailbox.create(alias_id=alias.id, mailbox_id=mailbox.id)
|
||||
# <<< END update alias mailboxes >>>
|
||||
err = set_mailboxes_for_alias(
|
||||
user_id=user.id, alias=alias, mailbox_ids=mailbox_ids
|
||||
)
|
||||
if err:
|
||||
return jsonify(error=err.value), 400
|
||||
|
||||
mailbox_ids_string = ",".join(map(str, mailbox_ids))
|
||||
changed_fields.append(f"mailbox_ids ({mailbox_ids_string})")
|
||||
|
|
|
@ -2,8 +2,10 @@ from flask import g, request
|
|||
from flask import jsonify
|
||||
|
||||
from app.api.base import api_bp, require_api_auth
|
||||
from app.custom_domain_utils import set_custom_domain_mailboxes
|
||||
from app.db import Session
|
||||
from app.models import CustomDomain, DomainDeletedAlias, Mailbox, DomainMailbox
|
||||
from app.log import LOG
|
||||
from app.models import CustomDomain, DomainDeletedAlias
|
||||
|
||||
|
||||
def custom_domain_to_dict(custom_domain: CustomDomain):
|
||||
|
@ -100,23 +102,14 @@ def update_custom_domain(custom_domain_id):
|
|||
|
||||
if "mailbox_ids" in data:
|
||||
mailbox_ids = [int(m_id) for m_id in data.get("mailbox_ids")]
|
||||
if mailbox_ids:
|
||||
# check if mailbox is not tempered with
|
||||
mailboxes = []
|
||||
for mailbox_id in mailbox_ids:
|
||||
mailbox = Mailbox.get(mailbox_id)
|
||||
if not mailbox or mailbox.user_id != user.id or not mailbox.verified:
|
||||
return jsonify(error="Forbidden"), 400
|
||||
mailboxes.append(mailbox)
|
||||
|
||||
# first remove all existing domain-mailboxes links
|
||||
DomainMailbox.filter_by(domain_id=custom_domain.id).delete()
|
||||
Session.flush()
|
||||
|
||||
for mailbox in mailboxes:
|
||||
DomainMailbox.create(domain_id=custom_domain.id, mailbox_id=mailbox.id)
|
||||
|
||||
result = set_custom_domain_mailboxes(user.id, custom_domain, mailbox_ids)
|
||||
if result.success:
|
||||
changed = True
|
||||
else:
|
||||
LOG.info(
|
||||
f"Prevented from updating mailboxes [custom_domain_id={custom_domain.id}]: {result.reason.value}"
|
||||
)
|
||||
return jsonify(error="Forbidden"), 400
|
||||
|
||||
if changed:
|
||||
Session.commit()
|
||||
|
|
63
tests/test_alias_mailbox_utils.py
Normal file
63
tests/test_alias_mailbox_utils.py
Normal file
|
@ -0,0 +1,63 @@
|
|||
from typing import Tuple
|
||||
|
||||
from app.alias_mailbox_utils import (
|
||||
set_mailboxes_for_alias,
|
||||
CannotSetMailboxesForAliasCause,
|
||||
)
|
||||
from app.models import Alias, Mailbox, User, AliasMailbox
|
||||
from tests.utils import create_new_user, random_email
|
||||
|
||||
|
||||
def setup() -> Tuple[User, Alias]:
|
||||
user = create_new_user()
|
||||
alias = Alias.create(
|
||||
user_id=user.id,
|
||||
email=random_email(),
|
||||
mailbox_id=user.default_mailbox_id,
|
||||
commit=True,
|
||||
)
|
||||
return user, alias
|
||||
|
||||
|
||||
def test_set_mailboxes_for_alias_empty_list():
|
||||
user, alias = setup()
|
||||
err = set_mailboxes_for_alias(user.id, alias, [])
|
||||
assert err is CannotSetMailboxesForAliasCause.EmptyMailboxes
|
||||
|
||||
|
||||
def test_set_mailboxes_for_alias_mailbox_for_other_user():
|
||||
user, alias = setup()
|
||||
another_user = create_new_user()
|
||||
err = set_mailboxes_for_alias(user.id, alias, [another_user.default_mailbox_id])
|
||||
assert err is CannotSetMailboxesForAliasCause.Forbidden
|
||||
|
||||
|
||||
def test_set_mailboxes_for_alias_mailbox_not_exists():
|
||||
user, alias = setup()
|
||||
err = set_mailboxes_for_alias(user.id, alias, [9999999])
|
||||
assert err is CannotSetMailboxesForAliasCause.Forbidden
|
||||
|
||||
|
||||
def test_set_mailboxes_for_alias_mailbox_success():
|
||||
user, alias = setup()
|
||||
mb1 = Mailbox.create(
|
||||
user_id=user.id,
|
||||
email=random_email(),
|
||||
verified=True,
|
||||
)
|
||||
mb2 = Mailbox.create(
|
||||
user_id=user.id,
|
||||
email=random_email(),
|
||||
verified=True,
|
||||
commit=True,
|
||||
)
|
||||
err = set_mailboxes_for_alias(user.id, alias, [mb1.id, mb2.id])
|
||||
assert err is None
|
||||
|
||||
db_alias = Alias.get_by(id=alias.id)
|
||||
assert db_alias is not None
|
||||
assert db_alias.mailbox_id == mb1.id
|
||||
|
||||
alias_mailboxes = AliasMailbox.filter_by(alias_id=alias.id).all()
|
||||
assert len(alias_mailboxes) == 1
|
||||
assert alias_mailboxes[0].mailbox_id == mb2.id
|
Loading…
Reference in a new issue