Add methods to check if alias will be auto-created

This commit is contained in:
Adrià Casajús 2022-04-28 14:43:24 +02:00
parent 93ae82aa46
commit 8e35a09788
No known key found for this signature in database
GPG Key ID: F0033226A5AFC9B9
2 changed files with 272 additions and 110 deletions

View File

@ -1,18 +1,23 @@
import re
from typing import Optional
from typing import Optional, Tuple
from email_validator import validate_email, EmailNotValidError
from sqlalchemy.exc import IntegrityError, DataError
from app.config import BOUNCE_PREFIX_FOR_REPLY_PHASE, BOUNCE_PREFIX, BOUNCE_SUFFIX
from app.config import (
BOUNCE_PREFIX_FOR_REPLY_PHASE,
BOUNCE_PREFIX,
BOUNCE_SUFFIX,
VERP_PREFIX,
)
from app.db import Session
from app.email_utils import (
get_email_domain_part,
send_cannot_create_directory_alias,
send_cannot_create_domain_alias,
can_create_directory_for_address,
send_cannot_create_directory_alias_disabled,
get_email_local_part,
send_cannot_create_domain_alias,
)
from app.errors import AliasInTrashError
from app.log import LOG
@ -27,10 +32,131 @@ from app.models import (
Mailbox,
EmailLog,
Contact,
AutoCreateRule,
)
from app.regex_utils import regex_match
def get_user_if_alias_would_auto_create(
address: str, notify_user: bool = False
) -> Optional[User]:
banned_prefix = f"{VERP_PREFIX}."
if address.startswith(banned_prefix):
LOG.w("alias %s can't start with %s", address, banned_prefix)
return None
try:
# Prevent addresses with unicode characters (🤯) in them for now.
validate_email(address, check_deliverability=False, allow_smtputf8=False)
except EmailNotValidError:
return None
will_create = check_if_alias_can_be_auto_created_for_custom_domain(
address, notify_user=notify_user
)
if will_create:
return will_create[0].user
directory = check_if_alias_can_be_auto_created_for_a_directory(
address, notify_user=notify_user
)
if directory:
return directory.user
return None
def check_if_alias_can_be_auto_created_for_custom_domain(
address: str, notify_user: bool = True
) -> Optional[Tuple[CustomDomain, Optional[AutoCreateRule]]]:
"""
Check if this address would generate an auto created alias.
If that's the case return the domain that would create it and the rule that triggered it.
If there's no rule it's a catchall creation
"""
alias_domain = get_email_domain_part(address)
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
if not custom_domain:
return None
user: User = custom_domain.user
if user.disabled:
LOG.i("Disabled user %s can't create new alias via custom domain", user)
return None
if not user.can_create_new_alias():
if notify_user:
send_cannot_create_domain_alias(custom_domain.user, address, alias_domain)
return None
if not custom_domain.catch_all:
if len(custom_domain.auto_create_rules) == 0:
return None
local = get_email_local_part(address)
for rule in custom_domain.auto_create_rules:
if regex_match(rule.regex, local):
LOG.d(
"%s passes %s on %s",
address,
rule.regex,
custom_domain,
)
return custom_domain, rule
else: # no rule passes
LOG.d("no rule passed to create %s", local)
return None
LOG.d("Create alias via catchall")
return custom_domain, None
def check_if_alias_can_be_auto_created_for_a_directory(
address: str, notify_user: bool = True
) -> Optional[Directory]:
"""
Try to create an alias with directory
"""
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
if not can_create_directory_for_address(address):
return None
# alias contains one of the 3 special directory separator: "/", "+" or "#"
if "/" in address:
sep = "/"
elif "+" in address:
sep = "+"
elif "#" in address:
sep = "#"
else:
# if there's no directory separator in the alias, no way to auto-create it
return None
directory_name = address[: address.find(sep)]
LOG.d("directory_name %s", directory_name)
directory = Directory.get_by(name=directory_name)
if not directory:
return None
user: User = directory.user
if user.disabled:
LOG.i("Disabled %s can't create new alias with directory", user)
return None
if not user.can_create_new_alias():
if notify_user:
send_cannot_create_directory_alias(user, address, directory_name)
return None
if directory.disabled:
if notify_user:
send_cannot_create_directory_alias_disabled(user, address, directory_name)
return None
return directory
def try_auto_create(address: str) -> Optional[Alias]:
"""Try to auto-create the alias using directory or catch-all domain"""
# VERP for reply phase is {BOUNCE_PREFIX_FOR_REPLY_PHASE}+{email_log.id}+@{alias_domain}
@ -60,124 +186,72 @@ def try_auto_create_directory(address: str) -> Optional[Alias]:
"""
Try to create an alias with directory
"""
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
if can_create_directory_for_address(address):
# if there's no directory separator in the alias, no way to auto-create it
if "/" not in address and "+" not in address and "#" not in address:
return None
directory = check_if_alias_can_be_auto_created_for_a_directory(
address, notify_user=True
)
if not directory:
return None
# alias contains one of the 3 special directory separator: "/", "+" or "#"
if "/" in address:
sep = "/"
elif "+" in address:
sep = "+"
else:
sep = "#"
try:
LOG.d("create alias %s for directory %s", address, directory)
directory_name = address[: address.find(sep)]
LOG.d("directory_name %s", directory_name)
mailboxes = directory.mailboxes
directory = Directory.get_by(name=directory_name)
if not directory:
return None
user: User = directory.user
if user.disabled:
LOG.i("Disabled %s can't create new alias with directory", user)
return None
if not user.can_create_new_alias():
send_cannot_create_directory_alias(user, address, directory_name)
return None
if directory.disabled:
send_cannot_create_directory_alias_disabled(user, address, directory_name)
return None
try:
LOG.d("create alias %s for directory %s", address, directory)
mailboxes = directory.mailboxes
alias = Alias.create(
email=address,
user_id=directory.user_id,
directory_id=directory.id,
mailbox_id=mailboxes[0].id,
alias = Alias.create(
email=address,
user_id=directory.user_id,
directory_id=directory.id,
mailbox_id=mailboxes[0].id,
)
if not directory.user.disable_automatic_alias_note:
alias.note = f"Created by directory {directory.name}"
Session.flush()
for i in range(1, len(mailboxes)):
AliasMailbox.create(
alias_id=alias.id,
mailbox_id=mailboxes[i].id,
)
if not user.disable_automatic_alias_note:
alias.note = f"Created by directory {directory.name}"
Session.flush()
for i in range(1, len(mailboxes)):
AliasMailbox.create(
alias_id=alias.id,
mailbox_id=mailboxes[i].id,
)
Session.commit()
return alias
except AliasInTrashError:
LOG.w(
"Alias %s was deleted before, cannot auto-create using directory %s, user %s",
address,
directory_name,
user,
)
return None
except IntegrityError:
LOG.w("Alias %s already exists", address)
Session.rollback()
alias = Alias.get_by(email=address)
return alias
Session.commit()
return alias
except AliasInTrashError:
LOG.w(
"Alias %s was deleted before, cannot auto-create using directory %s, user %s",
address,
directory.name,
directory.user,
)
return None
except IntegrityError:
LOG.w("Alias %s already exists", address)
Session.rollback()
alias = Alias.get_by(email=address)
return alias
def try_auto_create_via_domain(address: str) -> Optional[Alias]:
"""Try to create an alias with catch-all or auto-create rules on custom domain"""
# try to create alias on-the-fly with custom-domain catch-all feature
# check if alias is custom-domain alias and if the custom-domain has catch-all enabled
alias_domain = get_email_domain_part(address)
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
if not custom_domain:
can_create = check_if_alias_can_be_auto_created_for_custom_domain(address)
if not can_create:
return None
custom_domain, rule = can_create
domain_user: User = custom_domain.user
if domain_user.disabled:
LOG.i("Disabled user %s can't create new alias via custom domain", domain_user)
return None
if not custom_domain.catch_all and len(custom_domain.auto_create_rules) == 0:
return None
elif not custom_domain.catch_all and len(custom_domain.auto_create_rules) > 0:
local = get_email_local_part(address)
for rule in custom_domain.auto_create_rules:
if regex_match(rule.regex, local):
LOG.d(
"%s passes %s on %s",
address,
rule.regex,
custom_domain,
)
alias_note = f"Created by rule {rule.order} with regex {rule.regex}"
mailboxes = rule.mailboxes
break
else: # no rule passes
LOG.d("no rule passed to create %s", local)
return
else: # catch-all is enabled
if rule:
alias_note = f"Created by rule {rule.order} with regex {rule.regex}"
mailboxes = rule.mailboxes
else:
alias_note = "Created by catchall option"
mailboxes = custom_domain.mailboxes
alias_note = "Created by catch-all option"
if not domain_user.can_create_new_alias():
send_cannot_create_domain_alias(domain_user, address, alias_domain)
return None
# a rule can have 0 mailboxes. Happened when a mailbox is deleted
if not mailboxes:
LOG.d("use %s default mailbox for %s %s", domain_user, address, custom_domain)
mailboxes = [domain_user.default_mailbox]
LOG.d(
"use %s default mailbox for %s %s",
custom_domain.user,
address,
custom_domain,
)
mailboxes = [custom_domain.user.default_mailbox]
try:
LOG.d("create alias %s for domain %s", address, custom_domain)
@ -203,7 +277,7 @@ def try_auto_create_via_domain(address: str) -> Optional[Alias]:
"Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s",
address,
custom_domain,
domain_user,
custom_domain.user,
)
return None
except IntegrityError:

View File

@ -1,7 +1,23 @@
from app.alias_utils import delete_alias, check_alias_prefix
from typing import List
from app.alias_utils import (
delete_alias,
check_alias_prefix,
get_user_if_alias_would_auto_create,
try_auto_create,
)
from app.config import ALIAS_DOMAINS
from app.db import Session
from app.models import Alias, DeletedAlias
from tests.utils import create_new_user
from app.models import (
Alias,
DeletedAlias,
CustomDomain,
AutoCreateRule,
Directory,
DirectoryMailbox,
User,
)
from tests.utils import create_new_user, random_domain, random_token
def test_delete_alias(flask_client):
@ -44,3 +60,75 @@ def test_check_alias_prefix(flask_client):
assert not check_alias_prefix("a b")
assert not check_alias_prefix("+👌")
assert not check_alias_prefix("too-long" * 10)
def get_auto_create_alias_tests(user: User) -> List:
user.lifetime = True
catchall = CustomDomain.create(
user_id=user.id,
catch_all=True,
domain=random_domain(),
verified=True,
flush=True,
)
no_catchall = CustomDomain.create(
user_id=user.id,
catch_all=False,
domain=random_domain(),
verified=True,
flush=True,
)
no_catchall_with_rule = CustomDomain.create(
user_id=user.id,
catch_all=False,
domain=random_domain(),
verified=True,
flush=True,
)
AutoCreateRule.create(
custom_domain_id=no_catchall_with_rule.id,
order=0,
regex="ok-.*",
flush=True,
)
dir_name = random_token()
directory = Directory.create(name=dir_name, user_id=user.id, flush=True)
DirectoryMailbox.create(
directory_id=directory.id, mailbox_id=user.default_mailbox_id, flush=True
)
Session.commit()
return [
(f"nonexistant@{catchall.domain}", True),
(f"nonexistant@{no_catchall.domain}", False),
(f"nonexistant@{no_catchall_with_rule.domain}", False),
(f"ok-nonexistant@{no_catchall_with_rule.domain}", True),
(f"{dir_name}+something@nowhere.net", False),
(f"{dir_name}#something@nowhere.net", False),
(f"{dir_name}/something@nowhere.net", False),
(f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True),
(f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True),
]
def test_get_user_if_alias_would_auto_create(flask_client):
user = create_new_user()
for test_id, (address, expected_ok) in enumerate(get_auto_create_alias_tests(user)):
result = get_user_if_alias_would_auto_create(address)
if expected_ok:
assert (
isinstance(result, User) and result.id == user.id
), f"Case {test_id} - Failed address {address}"
else:
assert not result, f"Case {test_id} - Failed address {address}"
def test_auto_create_alias(flask_client):
user = create_new_user()
for test_id, (address, expected_ok) in enumerate(get_auto_create_alias_tests(user)):
result = try_auto_create(address)
if expected_ok:
assert result, f"Case {test_id} - Failed address {address}"
else:
assert result is None, f"Case {test_id} - Failed address {address}"