app-MAIL-temp/app/alias_utils.py

261 lines
8.0 KiB
Python
Raw Normal View History

import re
from typing import Optional
2021-09-17 17:42:16 +02:00
from email_validator import validate_email, EmailNotValidError
2021-01-11 15:45:41 +01:00
from sqlalchemy.exc import IntegrityError, DataError
from app.config import BOUNCE_PREFIX_FOR_REPLY_PHASE
from app.email_utils import (
get_email_domain_part,
send_cannot_create_directory_alias,
send_cannot_create_domain_alias,
2020-10-15 16:21:31 +02:00
can_create_directory_for_address,
send_cannot_create_directory_alias_disabled,
2021-09-17 17:42:16 +02:00
get_email_local_part,
)
from app.errors import AliasInTrashError
from app.extensions import db
from app.log import LOG
from app.models import (
Alias,
CustomDomain,
Directory,
User,
DeletedAlias,
DomainDeletedAlias,
AliasMailbox,
Mailbox,
EmailLog,
Contact,
)
def try_auto_create(address: str) -> Optional[Alias]:
2020-08-27 10:20:48 +02:00
"""Try to auto-create the alias using directory or catch-all domain"""
if address.startswith(f"{BOUNCE_PREFIX_FOR_REPLY_PHASE}+"):
2021-09-08 11:29:55 +02:00
LOG.e("alias %s can't start with %s", address, BOUNCE_PREFIX_FOR_REPLY_PHASE)
return None
2021-09-17 17:42:16 +02:00
try:
# NOT allow unicode for now
validate_email(address, check_deliverability=False, allow_smtputf8=False)
except EmailNotValidError:
return None
alias = try_auto_create_catch_all_domain(address)
if not alias:
alias = try_auto_create_directory(address)
return alias
def try_auto_create_directory(address: str) -> Optional[Alias]:
"""
Try to create an alias with directory
"""
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
2020-10-15 16:21:31 +02:00
if can_create_directory_for_address(address):
# if there's no directory separator in the alias, no way to auto-create it
if "/" not in address and "+" not in address and "#" not in address:
return None
# alias contains one of the 3 special directory separator: "/", "+" or "#"
if "/" in address:
sep = "/"
elif "+" in address:
sep = "+"
else:
sep = "#"
directory_name = address[: address.find(sep)]
LOG.d("directory_name %s", directory_name)
directory = Directory.get_by(name=directory_name)
if not directory:
return None
dir_user: User = directory.user
if not dir_user.can_create_new_alias():
send_cannot_create_directory_alias(dir_user, address, directory_name)
return None
if directory.disabled:
send_cannot_create_directory_alias_disabled(
dir_user, address, directory_name
)
return None
try:
LOG.d("create alias %s for directory %s", address, directory)
mailboxes = directory.mailboxes
alias = Alias.create(
email=address,
user_id=directory.user_id,
directory_id=directory.id,
mailbox_id=mailboxes[0].id,
)
db.session.flush()
for i in range(1, len(mailboxes)):
AliasMailbox.create(
2020-08-27 10:20:48 +02:00
alias_id=alias.id,
mailbox_id=mailboxes[i].id,
)
db.session.commit()
return alias
except AliasInTrashError:
2021-09-08 11:29:55 +02:00
LOG.w(
"Alias %s was deleted before, cannot auto-create using directory %s, user %s",
address,
directory_name,
dir_user,
)
return None
except IntegrityError:
2021-09-08 11:29:55 +02:00
LOG.w("Alias %s already exists", address)
db.session.rollback()
alias = Alias.get_by(email=address)
return alias
def try_auto_create_catch_all_domain(address: str) -> Optional[Alias]:
"""Try to create an alias with catch-all domain"""
# try to create alias on-the-fly with custom-domain catch-all feature
# check if alias is custom-domain alias and if the custom-domain has catch-all enabled
alias_domain = get_email_domain_part(address)
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
if not custom_domain:
return None
# custom_domain exists
if not custom_domain.catch_all and not custom_domain.auto_create_regex:
return None
if custom_domain.auto_create_regex:
local = get_email_local_part(address)
regex = re.compile(custom_domain.auto_create_regex)
if not re.fullmatch(regex, local):
LOG.d(
"%s can't be auto created on %s as it fails regex %s",
address,
custom_domain,
custom_domain.auto_create_regex,
)
return None
# custom_domain has catch-all enabled or the address passes the regex
domain_user: User = custom_domain.user
if not domain_user.can_create_new_alias():
send_cannot_create_domain_alias(domain_user, address, alias_domain)
return None
try:
LOG.d("create alias %s for domain %s", address, custom_domain)
mailboxes = custom_domain.mailboxes
alias = Alias.create(
email=address,
user_id=custom_domain.user_id,
custom_domain_id=custom_domain.id,
automatic_creation=True,
mailbox_id=mailboxes[0].id,
)
db.session.flush()
for i in range(1, len(mailboxes)):
AliasMailbox.create(
2020-08-27 10:20:48 +02:00
alias_id=alias.id,
mailbox_id=mailboxes[i].id,
)
db.session.commit()
return alias
except AliasInTrashError:
2021-09-08 11:29:55 +02:00
LOG.w(
"Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s",
address,
custom_domain,
domain_user,
)
return None
except IntegrityError:
2021-09-08 11:29:55 +02:00
LOG.w("Alias %s already exists", address)
db.session.rollback()
alias = Alias.get_by(email=address)
return alias
2021-01-11 15:45:41 +01:00
except DataError:
2021-09-08 11:29:55 +02:00
LOG.w("Cannot create alias %s", address)
2021-01-11 15:45:41 +01:00
db.session.rollback()
return None
def delete_alias(alias: Alias, user: User):
"""
Delete an alias and add it to either global or domain trash
Should be used instead of Alias.delete, DomainDeletedAlias.create, DeletedAlias.create
"""
# save deleted alias to either global or domain trash
if alias.custom_domain_id:
2020-08-23 20:24:46 +02:00
if not DomainDeletedAlias.get_by(
email=alias.email, domain_id=alias.custom_domain_id
):
2021-09-08 11:29:55 +02:00
LOG.d("add %s to domain %s trash", alias, alias.custom_domain_id)
db.session.add(
DomainDeletedAlias(
user_id=user.id, email=alias.email, domain_id=alias.custom_domain_id
)
)
db.session.commit()
else:
if not DeletedAlias.get_by(email=alias.email):
LOG.d("add %s to global trash", alias)
db.session.add(DeletedAlias(email=alias.email))
db.session.commit()
Alias.query.filter(Alias.id == alias.id).delete()
db.session.commit()
def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]:
"""
get list of aliases for a given mailbox
"""
ret = set(Alias.query.filter(Alias.mailbox_id == mailbox.id).all())
for alias in (
db.session.query(Alias)
.join(AliasMailbox, Alias.id == AliasMailbox.alias_id)
.filter(AliasMailbox.mailbox_id == mailbox.id)
):
ret.add(alias)
return list(ret)
def nb_email_log_for_mailbox(mailbox: Mailbox):
aliases = aliases_for_mailbox(mailbox)
alias_ids = [alias.id for alias in aliases]
return (
db.session.query(EmailLog)
.join(Contact, EmailLog.contact_id == Contact.id)
.filter(Contact.alias_id.in_(alias_ids))
.count()
)
2021-04-30 11:37:17 +02:00
# Only lowercase letters, numbers, dots (.), dashes (-) and underscores (_) are currently supported
_ALIAS_PREFIX_PATTERN = r"[0-9a-z-_.]{1,}"
def check_alias_prefix(alias_prefix) -> bool:
if len(alias_prefix) > 40:
return False
if re.fullmatch(_ALIAS_PREFIX_PATTERN, alias_prefix) is None:
return False
return True