mirror of
https://github.com/simple-login/app.git
synced 2024-09-28 20:51:29 +02:00
Merge pull request #940 from simple-login/ac-check-auto-create
Add methods to check if an alias will be auto-created
This commit is contained in:
commit
04399e827e
@ -1,18 +1,23 @@
|
||||
import re
|
||||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from email_validator import validate_email, EmailNotValidError
|
||||
from sqlalchemy.exc import IntegrityError, DataError
|
||||
|
||||
from app.config import BOUNCE_PREFIX_FOR_REPLY_PHASE, BOUNCE_PREFIX, BOUNCE_SUFFIX
|
||||
from app.config import (
|
||||
BOUNCE_PREFIX_FOR_REPLY_PHASE,
|
||||
BOUNCE_PREFIX,
|
||||
BOUNCE_SUFFIX,
|
||||
VERP_PREFIX,
|
||||
)
|
||||
from app.db import Session
|
||||
from app.email_utils import (
|
||||
get_email_domain_part,
|
||||
send_cannot_create_directory_alias,
|
||||
send_cannot_create_domain_alias,
|
||||
can_create_directory_for_address,
|
||||
send_cannot_create_directory_alias_disabled,
|
||||
get_email_local_part,
|
||||
send_cannot_create_domain_alias,
|
||||
)
|
||||
from app.errors import AliasInTrashError
|
||||
from app.log import LOG
|
||||
@ -27,10 +32,132 @@ from app.models import (
|
||||
Mailbox,
|
||||
EmailLog,
|
||||
Contact,
|
||||
AutoCreateRule,
|
||||
)
|
||||
from app.regex_utils import regex_match
|
||||
|
||||
|
||||
def get_user_if_alias_would_auto_create(
|
||||
address: str, notify_user: bool = False
|
||||
) -> Optional[User]:
|
||||
banned_prefix = f"{VERP_PREFIX}."
|
||||
if address.startswith(banned_prefix):
|
||||
LOG.w("alias %s can't start with %s", address, banned_prefix)
|
||||
return None
|
||||
|
||||
try:
|
||||
# Prevent addresses with unicode characters (🤯) in them for now.
|
||||
validate_email(address, check_deliverability=False, allow_smtputf8=False)
|
||||
except EmailNotValidError:
|
||||
return None
|
||||
|
||||
domain_and_rule = check_if_alias_can_be_auto_created_for_custom_domain(
|
||||
address, notify_user=notify_user
|
||||
)
|
||||
if domain_and_rule:
|
||||
return domain_and_rule[0].user
|
||||
directory = check_if_alias_can_be_auto_created_for_a_directory(
|
||||
address, notify_user=notify_user
|
||||
)
|
||||
if directory:
|
||||
return directory.user
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def check_if_alias_can_be_auto_created_for_custom_domain(
|
||||
address: str, notify_user: bool = True
|
||||
) -> Optional[Tuple[CustomDomain, Optional[AutoCreateRule]]]:
|
||||
"""
|
||||
Check if this address would generate an auto created alias.
|
||||
If that's the case return the domain that would create it and the rule that triggered it.
|
||||
If there's no rule it's a catchall creation
|
||||
"""
|
||||
alias_domain = get_email_domain_part(address)
|
||||
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
|
||||
|
||||
if not custom_domain:
|
||||
return None
|
||||
|
||||
user: User = custom_domain.user
|
||||
if user.disabled:
|
||||
LOG.i("Disabled user %s can't create new alias via custom domain", user)
|
||||
return None
|
||||
|
||||
if not user.can_create_new_alias():
|
||||
if notify_user:
|
||||
send_cannot_create_domain_alias(custom_domain.user, address, alias_domain)
|
||||
return None
|
||||
|
||||
if not custom_domain.catch_all:
|
||||
if len(custom_domain.auto_create_rules) == 0:
|
||||
return None
|
||||
local = get_email_local_part(address)
|
||||
|
||||
for rule in custom_domain.auto_create_rules:
|
||||
if regex_match(rule.regex, local):
|
||||
LOG.d(
|
||||
"%s passes %s on %s",
|
||||
address,
|
||||
rule.regex,
|
||||
custom_domain,
|
||||
)
|
||||
return custom_domain, rule
|
||||
else: # no rule passes
|
||||
LOG.d("no rule passed to create %s", local)
|
||||
return None
|
||||
LOG.d("Create alias via catchall")
|
||||
|
||||
return custom_domain, None
|
||||
|
||||
|
||||
def check_if_alias_can_be_auto_created_for_a_directory(
|
||||
address: str, notify_user: bool = True
|
||||
) -> Optional[Directory]:
|
||||
"""
|
||||
Try to create an alias with directory
|
||||
If an alias would be created, return the dictionary that would trigger the creation. Otherwise, return None.
|
||||
"""
|
||||
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
|
||||
if not can_create_directory_for_address(address):
|
||||
return None
|
||||
|
||||
# alias contains one of the 3 special directory separator: "/", "+" or "#"
|
||||
if "/" in address:
|
||||
sep = "/"
|
||||
elif "+" in address:
|
||||
sep = "+"
|
||||
elif "#" in address:
|
||||
sep = "#"
|
||||
else:
|
||||
# if there's no directory separator in the alias, no way to auto-create it
|
||||
return None
|
||||
|
||||
directory_name = address[: address.find(sep)]
|
||||
LOG.d("directory_name %s", directory_name)
|
||||
|
||||
directory = Directory.get_by(name=directory_name)
|
||||
if not directory:
|
||||
return None
|
||||
|
||||
user: User = directory.user
|
||||
if user.disabled:
|
||||
LOG.i("Disabled %s can't create new alias with directory", user)
|
||||
return None
|
||||
|
||||
if not user.can_create_new_alias():
|
||||
if notify_user:
|
||||
send_cannot_create_directory_alias(user, address, directory_name)
|
||||
return None
|
||||
|
||||
if directory.disabled:
|
||||
if notify_user:
|
||||
send_cannot_create_directory_alias_disabled(user, address, directory_name)
|
||||
return None
|
||||
|
||||
return directory
|
||||
|
||||
|
||||
def try_auto_create(address: str) -> Optional[Alias]:
|
||||
"""Try to auto-create the alias using directory or catch-all domain"""
|
||||
# VERP for reply phase is {BOUNCE_PREFIX_FOR_REPLY_PHASE}+{email_log.id}+@{alias_domain}
|
||||
@ -60,124 +187,72 @@ def try_auto_create_directory(address: str) -> Optional[Alias]:
|
||||
"""
|
||||
Try to create an alias with directory
|
||||
"""
|
||||
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
|
||||
if can_create_directory_for_address(address):
|
||||
# if there's no directory separator in the alias, no way to auto-create it
|
||||
if "/" not in address and "+" not in address and "#" not in address:
|
||||
return None
|
||||
directory = check_if_alias_can_be_auto_created_for_a_directory(
|
||||
address, notify_user=True
|
||||
)
|
||||
if not directory:
|
||||
return None
|
||||
|
||||
# alias contains one of the 3 special directory separator: "/", "+" or "#"
|
||||
if "/" in address:
|
||||
sep = "/"
|
||||
elif "+" in address:
|
||||
sep = "+"
|
||||
else:
|
||||
sep = "#"
|
||||
try:
|
||||
LOG.d("create alias %s for directory %s", address, directory)
|
||||
|
||||
directory_name = address[: address.find(sep)]
|
||||
LOG.d("directory_name %s", directory_name)
|
||||
mailboxes = directory.mailboxes
|
||||
|
||||
directory = Directory.get_by(name=directory_name)
|
||||
if not directory:
|
||||
return None
|
||||
|
||||
user: User = directory.user
|
||||
if user.disabled:
|
||||
LOG.i("Disabled %s can't create new alias with directory", user)
|
||||
return None
|
||||
|
||||
if not user.can_create_new_alias():
|
||||
send_cannot_create_directory_alias(user, address, directory_name)
|
||||
return None
|
||||
|
||||
if directory.disabled:
|
||||
send_cannot_create_directory_alias_disabled(user, address, directory_name)
|
||||
return None
|
||||
|
||||
try:
|
||||
LOG.d("create alias %s for directory %s", address, directory)
|
||||
|
||||
mailboxes = directory.mailboxes
|
||||
|
||||
alias = Alias.create(
|
||||
email=address,
|
||||
user_id=directory.user_id,
|
||||
directory_id=directory.id,
|
||||
mailbox_id=mailboxes[0].id,
|
||||
alias = Alias.create(
|
||||
email=address,
|
||||
user_id=directory.user_id,
|
||||
directory_id=directory.id,
|
||||
mailbox_id=mailboxes[0].id,
|
||||
)
|
||||
if not directory.user.disable_automatic_alias_note:
|
||||
alias.note = f"Created by directory {directory.name}"
|
||||
Session.flush()
|
||||
for i in range(1, len(mailboxes)):
|
||||
AliasMailbox.create(
|
||||
alias_id=alias.id,
|
||||
mailbox_id=mailboxes[i].id,
|
||||
)
|
||||
if not user.disable_automatic_alias_note:
|
||||
alias.note = f"Created by directory {directory.name}"
|
||||
Session.flush()
|
||||
for i in range(1, len(mailboxes)):
|
||||
AliasMailbox.create(
|
||||
alias_id=alias.id,
|
||||
mailbox_id=mailboxes[i].id,
|
||||
)
|
||||
|
||||
Session.commit()
|
||||
return alias
|
||||
except AliasInTrashError:
|
||||
LOG.w(
|
||||
"Alias %s was deleted before, cannot auto-create using directory %s, user %s",
|
||||
address,
|
||||
directory_name,
|
||||
user,
|
||||
)
|
||||
return None
|
||||
except IntegrityError:
|
||||
LOG.w("Alias %s already exists", address)
|
||||
Session.rollback()
|
||||
alias = Alias.get_by(email=address)
|
||||
return alias
|
||||
Session.commit()
|
||||
return alias
|
||||
except AliasInTrashError:
|
||||
LOG.w(
|
||||
"Alias %s was deleted before, cannot auto-create using directory %s, user %s",
|
||||
address,
|
||||
directory.name,
|
||||
directory.user,
|
||||
)
|
||||
return None
|
||||
except IntegrityError:
|
||||
LOG.w("Alias %s already exists", address)
|
||||
Session.rollback()
|
||||
alias = Alias.get_by(email=address)
|
||||
return alias
|
||||
|
||||
|
||||
def try_auto_create_via_domain(address: str) -> Optional[Alias]:
|
||||
"""Try to create an alias with catch-all or auto-create rules on custom domain"""
|
||||
|
||||
# try to create alias on-the-fly with custom-domain catch-all feature
|
||||
# check if alias is custom-domain alias and if the custom-domain has catch-all enabled
|
||||
alias_domain = get_email_domain_part(address)
|
||||
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
|
||||
|
||||
if not custom_domain:
|
||||
can_create = check_if_alias_can_be_auto_created_for_custom_domain(address)
|
||||
if not can_create:
|
||||
return None
|
||||
custom_domain, rule = can_create
|
||||
|
||||
domain_user: User = custom_domain.user
|
||||
if domain_user.disabled:
|
||||
LOG.i("Disabled user %s can't create new alias via custom domain", domain_user)
|
||||
return None
|
||||
|
||||
if not custom_domain.catch_all and len(custom_domain.auto_create_rules) == 0:
|
||||
return None
|
||||
elif not custom_domain.catch_all and len(custom_domain.auto_create_rules) > 0:
|
||||
local = get_email_local_part(address)
|
||||
|
||||
for rule in custom_domain.auto_create_rules:
|
||||
if regex_match(rule.regex, local):
|
||||
LOG.d(
|
||||
"%s passes %s on %s",
|
||||
address,
|
||||
rule.regex,
|
||||
custom_domain,
|
||||
)
|
||||
alias_note = f"Created by rule {rule.order} with regex {rule.regex}"
|
||||
mailboxes = rule.mailboxes
|
||||
break
|
||||
else: # no rule passes
|
||||
LOG.d("no rule passed to create %s", local)
|
||||
return
|
||||
else: # catch-all is enabled
|
||||
if rule:
|
||||
alias_note = f"Created by rule {rule.order} with regex {rule.regex}"
|
||||
mailboxes = rule.mailboxes
|
||||
else:
|
||||
alias_note = "Created by catchall option"
|
||||
mailboxes = custom_domain.mailboxes
|
||||
alias_note = "Created by catch-all option"
|
||||
|
||||
if not domain_user.can_create_new_alias():
|
||||
send_cannot_create_domain_alias(domain_user, address, alias_domain)
|
||||
return None
|
||||
|
||||
# a rule can have 0 mailboxes. Happened when a mailbox is deleted
|
||||
if not mailboxes:
|
||||
LOG.d("use %s default mailbox for %s %s", domain_user, address, custom_domain)
|
||||
mailboxes = [domain_user.default_mailbox]
|
||||
LOG.d(
|
||||
"use %s default mailbox for %s %s",
|
||||
custom_domain.user,
|
||||
address,
|
||||
custom_domain,
|
||||
)
|
||||
mailboxes = [custom_domain.user.default_mailbox]
|
||||
|
||||
try:
|
||||
LOG.d("create alias %s for domain %s", address, custom_domain)
|
||||
@ -203,7 +278,7 @@ def try_auto_create_via_domain(address: str) -> Optional[Alias]:
|
||||
"Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s",
|
||||
address,
|
||||
custom_domain,
|
||||
domain_user,
|
||||
custom_domain.user,
|
||||
)
|
||||
return None
|
||||
except IntegrityError:
|
||||
|
@ -1,7 +1,23 @@
|
||||
from app.alias_utils import delete_alias, check_alias_prefix
|
||||
from typing import List
|
||||
|
||||
from app.alias_utils import (
|
||||
delete_alias,
|
||||
check_alias_prefix,
|
||||
get_user_if_alias_would_auto_create,
|
||||
try_auto_create,
|
||||
)
|
||||
from app.config import ALIAS_DOMAINS
|
||||
from app.db import Session
|
||||
from app.models import Alias, DeletedAlias
|
||||
from tests.utils import create_new_user
|
||||
from app.models import (
|
||||
Alias,
|
||||
DeletedAlias,
|
||||
CustomDomain,
|
||||
AutoCreateRule,
|
||||
Directory,
|
||||
DirectoryMailbox,
|
||||
User,
|
||||
)
|
||||
from tests.utils import create_new_user, random_domain, random_token
|
||||
|
||||
|
||||
def test_delete_alias(flask_client):
|
||||
@ -44,3 +60,75 @@ def test_check_alias_prefix(flask_client):
|
||||
assert not check_alias_prefix("a b")
|
||||
assert not check_alias_prefix("+👌")
|
||||
assert not check_alias_prefix("too-long" * 10)
|
||||
|
||||
|
||||
def get_auto_create_alias_tests(user: User) -> List:
|
||||
user.lifetime = True
|
||||
catchall = CustomDomain.create(
|
||||
user_id=user.id,
|
||||
catch_all=True,
|
||||
domain=random_domain(),
|
||||
verified=True,
|
||||
flush=True,
|
||||
)
|
||||
no_catchall = CustomDomain.create(
|
||||
user_id=user.id,
|
||||
catch_all=False,
|
||||
domain=random_domain(),
|
||||
verified=True,
|
||||
flush=True,
|
||||
)
|
||||
no_catchall_with_rule = CustomDomain.create(
|
||||
user_id=user.id,
|
||||
catch_all=False,
|
||||
domain=random_domain(),
|
||||
verified=True,
|
||||
flush=True,
|
||||
)
|
||||
AutoCreateRule.create(
|
||||
custom_domain_id=no_catchall_with_rule.id,
|
||||
order=0,
|
||||
regex="ok-.*",
|
||||
flush=True,
|
||||
)
|
||||
dir_name = random_token()
|
||||
directory = Directory.create(name=dir_name, user_id=user.id, flush=True)
|
||||
DirectoryMailbox.create(
|
||||
directory_id=directory.id, mailbox_id=user.default_mailbox_id, flush=True
|
||||
)
|
||||
Session.commit()
|
||||
|
||||
return [
|
||||
(f"nonexistant@{catchall.domain}", True),
|
||||
(f"nonexistant@{no_catchall.domain}", False),
|
||||
(f"nonexistant@{no_catchall_with_rule.domain}", False),
|
||||
(f"ok-nonexistant@{no_catchall_with_rule.domain}", True),
|
||||
(f"{dir_name}+something@nowhere.net", False),
|
||||
(f"{dir_name}#something@nowhere.net", False),
|
||||
(f"{dir_name}/something@nowhere.net", False),
|
||||
(f"{dir_name}+something@{ALIAS_DOMAINS[0]}", True),
|
||||
(f"{dir_name}#something@{ALIAS_DOMAINS[0]}", True),
|
||||
(f"{dir_name}/something@{ALIAS_DOMAINS[0]}", True),
|
||||
]
|
||||
|
||||
|
||||
def test_get_user_if_alias_would_auto_create(flask_client):
|
||||
user = create_new_user()
|
||||
for test_id, (address, expected_ok) in enumerate(get_auto_create_alias_tests(user)):
|
||||
result = get_user_if_alias_would_auto_create(address)
|
||||
if expected_ok:
|
||||
assert (
|
||||
isinstance(result, User) and result.id == user.id
|
||||
), f"Case {test_id} - Failed address {address}"
|
||||
else:
|
||||
assert not result, f"Case {test_id} - Failed address {address}"
|
||||
|
||||
|
||||
def test_auto_create_alias(flask_client):
|
||||
user = create_new_user()
|
||||
for test_id, (address, expected_ok) in enumerate(get_auto_create_alias_tests(user)):
|
||||
result = try_auto_create(address)
|
||||
if expected_ok:
|
||||
assert result, f"Case {test_id} - Failed address {address}"
|
||||
else:
|
||||
assert result is None, f"Case {test_id} - Failed address {address}"
|
||||
|
Loading…
Reference in New Issue
Block a user