From 8e35a0978809eb8a965951780f40e62cc7ae3752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Thu, 28 Apr 2022 14:43:24 +0200 Subject: [PATCH] Add methods to check if alias will be auto-created --- app/alias_utils.py | 288 ++++++++++++++++++++++++-------------- tests/test_alias_utils.py | 94 ++++++++++++- 2 files changed, 272 insertions(+), 110 deletions(-) diff --git a/app/alias_utils.py b/app/alias_utils.py index b0a89182..69a49fdf 100644 --- a/app/alias_utils.py +++ b/app/alias_utils.py @@ -1,18 +1,23 @@ import re -from typing import Optional +from typing import Optional, Tuple from email_validator import validate_email, EmailNotValidError from sqlalchemy.exc import IntegrityError, DataError -from app.config import BOUNCE_PREFIX_FOR_REPLY_PHASE, BOUNCE_PREFIX, BOUNCE_SUFFIX +from app.config import ( + BOUNCE_PREFIX_FOR_REPLY_PHASE, + BOUNCE_PREFIX, + BOUNCE_SUFFIX, + VERP_PREFIX, +) from app.db import Session from app.email_utils import ( get_email_domain_part, send_cannot_create_directory_alias, - send_cannot_create_domain_alias, can_create_directory_for_address, send_cannot_create_directory_alias_disabled, get_email_local_part, + send_cannot_create_domain_alias, ) from app.errors import AliasInTrashError from app.log import LOG @@ -27,10 +32,131 @@ from app.models import ( Mailbox, EmailLog, Contact, + AutoCreateRule, ) from app.regex_utils import regex_match +def get_user_if_alias_would_auto_create( + address: str, notify_user: bool = False +) -> Optional[User]: + banned_prefix = f"{VERP_PREFIX}." + if address.startswith(banned_prefix): + LOG.w("alias %s can't start with %s", address, banned_prefix) + return None + + try: + # Prevent addresses with unicode characters (🤯) in them for now. + validate_email(address, check_deliverability=False, allow_smtputf8=False) + except EmailNotValidError: + return None + + will_create = check_if_alias_can_be_auto_created_for_custom_domain( + address, notify_user=notify_user + ) + if will_create: + return will_create[0].user + directory = check_if_alias_can_be_auto_created_for_a_directory( + address, notify_user=notify_user + ) + if directory: + return directory.user + + return None + + +def check_if_alias_can_be_auto_created_for_custom_domain( + address: str, notify_user: bool = True +) -> Optional[Tuple[CustomDomain, Optional[AutoCreateRule]]]: + """ + Check if this address would generate an auto created alias. + If that's the case return the domain that would create it and the rule that triggered it. + If there's no rule it's a catchall creation + """ + alias_domain = get_email_domain_part(address) + custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain) + + if not custom_domain: + return None + + user: User = custom_domain.user + if user.disabled: + LOG.i("Disabled user %s can't create new alias via custom domain", user) + return None + + if not user.can_create_new_alias(): + if notify_user: + send_cannot_create_domain_alias(custom_domain.user, address, alias_domain) + return None + + if not custom_domain.catch_all: + if len(custom_domain.auto_create_rules) == 0: + return None + local = get_email_local_part(address) + + for rule in custom_domain.auto_create_rules: + if regex_match(rule.regex, local): + LOG.d( + "%s passes %s on %s", + address, + rule.regex, + custom_domain, + ) + return custom_domain, rule + else: # no rule passes + LOG.d("no rule passed to create %s", local) + return None + LOG.d("Create alias via catchall") + + return custom_domain, None + + +def check_if_alias_can_be_auto_created_for_a_directory( + address: str, notify_user: bool = True +) -> Optional[Directory]: + """ + Try to create an alias with directory + """ + # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format + if not can_create_directory_for_address(address): + return None + + # alias contains one of the 3 special directory separator: "/", "+" or "#" + if "/" in address: + sep = "/" + elif "+" in address: + sep = "+" + elif "#" in address: + sep = "#" + else: + # if there's no directory separator in the alias, no way to auto-create it + return None + + directory_name = address[: address.find(sep)] + LOG.d("directory_name %s", directory_name) + + directory = Directory.get_by(name=directory_name) + if not directory: + return None + + user: User = directory.user + if user.disabled: + LOG.i("Disabled %s can't create new alias with directory", user) + return None + + if not user.can_create_new_alias(): + if notify_user: + send_cannot_create_directory_alias(user, address, directory_name) + return None + + if directory.disabled: + if notify_user: + send_cannot_create_directory_alias_disabled(user, address, directory_name) + return None + + return directory + + def try_auto_create(address: str) -> Optional[Alias]: """Try to auto-create the alias using directory or catch-all domain""" # VERP for reply phase is {BOUNCE_PREFIX_FOR_REPLY_PHASE}+{email_log.id}+@{alias_domain} @@ -60,124 +186,72 @@ def try_auto_create_directory(address: str) -> Optional[Alias]: """ Try to create an alias with directory """ - # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format - if can_create_directory_for_address(address): - # if there's no directory separator in the alias, no way to auto-create it - if "/" not in address and "+" not in address and "#" not in address: - return None + directory = check_if_alias_can_be_auto_created_for_a_directory( + address, notify_user=True + ) + if not directory: + return None - # alias contains one of the 3 special directory separator: "/", "+" or "#" - if "/" in address: - sep = "/" - elif "+" in address: - sep = "+" - else: - sep = "#" + try: + LOG.d("create alias %s for directory %s", address, directory) - directory_name = address[: address.find(sep)] - LOG.d("directory_name %s", directory_name) + mailboxes = directory.mailboxes - directory = Directory.get_by(name=directory_name) - if not directory: - return None - - user: User = directory.user - if user.disabled: - LOG.i("Disabled %s can't create new alias with directory", user) - return None - - if not user.can_create_new_alias(): - send_cannot_create_directory_alias(user, address, directory_name) - return None - - if directory.disabled: - send_cannot_create_directory_alias_disabled(user, address, directory_name) - return None - - try: - LOG.d("create alias %s for directory %s", address, directory) - - mailboxes = directory.mailboxes - - alias = Alias.create( - email=address, - user_id=directory.user_id, - directory_id=directory.id, - mailbox_id=mailboxes[0].id, + alias = Alias.create( + email=address, + user_id=directory.user_id, + directory_id=directory.id, + mailbox_id=mailboxes[0].id, + ) + if not directory.user.disable_automatic_alias_note: + alias.note = f"Created by directory {directory.name}" + Session.flush() + for i in range(1, len(mailboxes)): + AliasMailbox.create( + alias_id=alias.id, + mailbox_id=mailboxes[i].id, ) - if not user.disable_automatic_alias_note: - alias.note = f"Created by directory {directory.name}" - Session.flush() - for i in range(1, len(mailboxes)): - AliasMailbox.create( - alias_id=alias.id, - mailbox_id=mailboxes[i].id, - ) - Session.commit() - return alias - except AliasInTrashError: - LOG.w( - "Alias %s was deleted before, cannot auto-create using directory %s, user %s", - address, - directory_name, - user, - ) - return None - except IntegrityError: - LOG.w("Alias %s already exists", address) - Session.rollback() - alias = Alias.get_by(email=address) - return alias + Session.commit() + return alias + except AliasInTrashError: + LOG.w( + "Alias %s was deleted before, cannot auto-create using directory %s, user %s", + address, + directory.name, + directory.user, + ) + return None + except IntegrityError: + LOG.w("Alias %s already exists", address) + Session.rollback() + alias = Alias.get_by(email=address) + return alias def try_auto_create_via_domain(address: str) -> Optional[Alias]: """Try to create an alias with catch-all or auto-create rules on custom domain""" - - # try to create alias on-the-fly with custom-domain catch-all feature - # check if alias is custom-domain alias and if the custom-domain has catch-all enabled - alias_domain = get_email_domain_part(address) - custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain) - - if not custom_domain: + can_create = check_if_alias_can_be_auto_created_for_custom_domain(address) + if not can_create: return None + custom_domain, rule = can_create - domain_user: User = custom_domain.user - if domain_user.disabled: - LOG.i("Disabled user %s can't create new alias via custom domain", domain_user) - return None - - if not custom_domain.catch_all and len(custom_domain.auto_create_rules) == 0: - return None - elif not custom_domain.catch_all and len(custom_domain.auto_create_rules) > 0: - local = get_email_local_part(address) - - for rule in custom_domain.auto_create_rules: - if regex_match(rule.regex, local): - LOG.d( - "%s passes %s on %s", - address, - rule.regex, - custom_domain, - ) - alias_note = f"Created by rule {rule.order} with regex {rule.regex}" - mailboxes = rule.mailboxes - break - else: # no rule passes - LOG.d("no rule passed to create %s", local) - return - else: # catch-all is enabled + if rule: + alias_note = f"Created by rule {rule.order} with regex {rule.regex}" + mailboxes = rule.mailboxes + else: + alias_note = "Created by catchall option" mailboxes = custom_domain.mailboxes - alias_note = "Created by catch-all option" - - if not domain_user.can_create_new_alias(): - send_cannot_create_domain_alias(domain_user, address, alias_domain) - return None # a rule can have 0 mailboxes. Happened when a mailbox is deleted if not mailboxes: - LOG.d("use %s default mailbox for %s %s", domain_user, address, custom_domain) - mailboxes = [domain_user.default_mailbox] + LOG.d( + "use %s default mailbox for %s %s", + custom_domain.user, + address, + custom_domain, + ) + mailboxes = [custom_domain.user.default_mailbox] try: LOG.d("create alias %s for domain %s", address, custom_domain) @@ -203,7 +277,7 @@ def try_auto_create_via_domain(address: str) -> Optional[Alias]: "Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s", address, custom_domain, - domain_user, + custom_domain.user, ) return None except IntegrityError: diff --git a/tests/test_alias_utils.py b/tests/test_alias_utils.py index f5dcaa6e..4ac200bd 100644 --- a/tests/test_alias_utils.py +++ b/tests/test_alias_utils.py @@ -1,7 +1,23 @@ -from app.alias_utils import delete_alias, check_alias_prefix +from typing import List + +from app.alias_utils import ( + delete_alias, + check_alias_prefix, + get_user_if_alias_would_auto_create, + try_auto_create, +) +from app.config import ALIAS_DOMAINS from app.db import Session -from app.models import Alias, DeletedAlias -from tests.utils import create_new_user +from app.models import ( + Alias, + DeletedAlias, + CustomDomain, + AutoCreateRule, + Directory, + DirectoryMailbox, + User, +) +from tests.utils import create_new_user, random_domain, random_token def test_delete_alias(flask_client): @@ -44,3 +60,75 @@ def test_check_alias_prefix(flask_client): assert not check_alias_prefix("a b") assert not check_alias_prefix("+👌") assert not check_alias_prefix("too-long" * 10) + + +def get_auto_create_alias_tests(user: User) -> List: + user.lifetime = True + catchall = CustomDomain.create( + user_id=user.id, + catch_all=True, + domain=random_domain(), + verified=True, + flush=True, + ) + no_catchall = CustomDomain.create( + user_id=user.id, + catch_all=False, + domain=random_domain(), + verified=True, + flush=True, + ) + no_catchall_with_rule = CustomDomain.create( + user_id=user.id, + catch_all=False, + domain=random_domain(), + verified=True, + flush=True, + ) + AutoCreateRule.create( + custom_domain_id=no_catchall_with_rule.id, + order=0, + regex="ok-.*", + flush=True, + ) + dir_name = random_token() + directory = Directory.create(name=dir_name, user_id=user.id, flush=True) + DirectoryMailbox.create( + directory_id=directory.id, mailbox_id=user.default_mailbox_id, flush=True + ) + Session.commit() + + return [ + (f"nonexistant@{catchall.domain}", True), + (f"nonexistant@{no_catchall.domain}", False), + (f"nonexistant@{no_catchall_with_rule.domain}", False), + (f"ok-nonexistant@{no_catchall_with_rule.domain}", True), + (f"{dir_name}+something@nowhere.net", False), + (f"{dir_name}#something@nowhere.net", False), + (f"{dir_name}/something@nowhere.net", False), + (f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True), + (f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True), + (f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True), + ] + + +def test_get_user_if_alias_would_auto_create(flask_client): + user = create_new_user() + for test_id, (address, expected_ok) in enumerate(get_auto_create_alias_tests(user)): + result = get_user_if_alias_would_auto_create(address) + if expected_ok: + assert ( + isinstance(result, User) and result.id == user.id + ), f"Case {test_id} - Failed address {address}" + else: + assert not result, f"Case {test_id} - Failed address {address}" + + +def test_auto_create_alias(flask_client): + user = create_new_user() + for test_id, (address, expected_ok) in enumerate(get_auto_create_alias_tests(user)): + result = try_auto_create(address) + if expected_ok: + assert result, f"Case {test_id} - Failed address {address}" + else: + assert result is None, f"Case {test_id} - Failed address {address}"