From c1f5c07d86a79aa9b6f76c16fd16405fcf446308 Mon Sep 17 00:00:00 2001 From: Son NK <> Date: Sat, 4 Apr 2020 15:24:27 +0200 Subject: [PATCH] Move alias auto-creation to alias_utils --- app/alias_utils.py | 126 +++++++++++++++++++++++++++++++++++++++++++++ email_handler.py | 116 +---------------------------------------- 2 files changed, 127 insertions(+), 115 deletions(-) create mode 100644 app/alias_utils.py diff --git a/app/alias_utils.py b/app/alias_utils.py new file mode 100644 index 00000000..d99b2fa6 --- /dev/null +++ b/app/alias_utils.py @@ -0,0 +1,126 @@ +from typing import Optional + +from app.email_utils import ( + get_email_domain_part, + send_cannot_create_directory_alias, + send_cannot_create_domain_alias, + email_belongs_to_alias_domains, +) +from app.extensions import db +from app.log import LOG +from app.models import ( + Alias, + CustomDomain, + Directory, + User, + DeletedAlias, +) + + +def try_auto_create(address: str) -> Optional[Alias]: + """Try to auto-create the alias using directory or catch-all domain + """ + alias = try_auto_create_catch_all_domain(address) + if not alias: + alias = try_auto_create_directory(address) + + return alias + + +def try_auto_create_directory(address: str) -> Optional[Alias]: + """ + Try to create an alias with directory + """ + # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format + if email_belongs_to_alias_domains(address): + # if there's no directory separator in the alias, no way to auto-create it + if "/" not in address and "+" not in address and "#" not in address: + return None + + # alias contains one of the 3 special directory separator: "/", "+" or "#" + if "/" in address: + sep = "/" + elif "+" in address: + sep = "+" + else: + sep = "#" + + directory_name = address[: address.find(sep)] + LOG.d("directory_name %s", directory_name) + + directory = Directory.get_by(name=directory_name) + if not directory: + return None + + dir_user: User = directory.user + + if not dir_user.can_create_new_alias(): + send_cannot_create_directory_alias(dir_user, address, directory_name) + return None + + # if alias has been deleted before, do not auto-create it + if DeletedAlias.get_by(email=address, user_id=directory.user_id): + LOG.warning( + "Alias %s was deleted before, cannot auto-create using directory %s, user %s", + address, + directory_name, + dir_user, + ) + return None + + LOG.d("create alias %s for directory %s", address, directory) + + alias = Alias.create( + email=address, + user_id=directory.user_id, + directory_id=directory.id, + mailbox_id=dir_user.default_mailbox_id, + ) + db.session.commit() + return alias + + +def try_auto_create_catch_all_domain(address: str) -> Optional[Alias]: + """Try to create an alias with catch-all domain""" + + # try to create alias on-the-fly with custom-domain catch-all feature + # check if alias is custom-domain alias and if the custom-domain has catch-all enabled + alias_domain = get_email_domain_part(address) + custom_domain = CustomDomain.get_by(domain=alias_domain) + + if not custom_domain: + return None + + # custom_domain exists + if not custom_domain.catch_all: + return None + + # custom_domain has catch-all enabled + domain_user: User = custom_domain.user + + if not domain_user.can_create_new_alias(): + send_cannot_create_domain_alias(domain_user, address, alias_domain) + return None + + # if alias has been deleted before, do not auto-create it + if DeletedAlias.get_by(email=address, user_id=custom_domain.user_id): + LOG.warning( + "Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s", + address, + custom_domain, + domain_user, + ) + return None + + LOG.d("create alias %s for domain %s", address, custom_domain) + + alias = Alias.create( + email=address, + user_id=custom_domain.user_id, + custom_domain_id=custom_domain.id, + automatic_creation=True, + mailbox_id=domain_user.default_mailbox_id, + ) + + db.session.commit() + return alias diff --git a/email_handler.py b/email_handler.py index b3bf1c81..8379e5bc 100644 --- a/email_handler.py +++ b/email_handler.py @@ -40,12 +40,12 @@ from email.mime.multipart import MIMEMultipart from email.utils import parseaddr from io import BytesIO from smtplib import SMTP -from typing import Optional from aiosmtpd.controller import Controller from aiosmtpd.smtp import Envelope from app import pgp_utils, s3 +from app.alias_utils import try_auto_create from app.config import ( EMAIL_DOMAIN, POSTFIX_SERVER, @@ -57,11 +57,8 @@ from app.config import ( from app.email_utils import ( send_email, add_dkim_signature, - get_email_domain_part, add_or_replace_header, delete_header, - send_cannot_create_directory_alias, - send_cannot_create_domain_alias, email_belongs_to_alias_domains, render, get_orig_message_from_bounce, @@ -78,9 +75,7 @@ from app.models import ( Contact, EmailLog, CustomDomain, - Directory, User, - DeletedAlias, RefusedEmail, ) from app.utils import random_string @@ -106,115 +101,6 @@ def new_app(): return app -def try_auto_create(address: str) -> Optional[Alias]: - """Try to auto-create the alias using directory or catch-all domain - """ - alias = try_auto_create_catch_all_domain(address) - if not alias: - alias = try_auto_create_directory(address) - - return alias - - -def try_auto_create_directory(address: str) -> Optional[Alias]: - """ - Try to create an alias with directory - """ - # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format - if email_belongs_to_alias_domains(address): - # if there's no directory separator in the alias, no way to auto-create it - if "/" not in address and "+" not in address and "#" not in address: - return None - - # alias contains one of the 3 special directory separator: "/", "+" or "#" - if "/" in address: - sep = "/" - elif "+" in address: - sep = "+" - else: - sep = "#" - - directory_name = address[: address.find(sep)] - LOG.d("directory_name %s", directory_name) - - directory = Directory.get_by(name=directory_name) - if not directory: - return None - - dir_user: User = directory.user - - if not dir_user.can_create_new_alias(): - send_cannot_create_directory_alias(dir_user, address, directory_name) - return None - - # if alias has been deleted before, do not auto-create it - if DeletedAlias.get_by(email=address, user_id=directory.user_id): - LOG.warning( - "Alias %s was deleted before, cannot auto-create using directory %s, user %s", - address, - directory_name, - dir_user, - ) - return None - - LOG.d("create alias %s for directory %s", address, directory) - - alias = Alias.create( - email=address, - user_id=directory.user_id, - directory_id=directory.id, - mailbox_id=dir_user.default_mailbox_id, - ) - db.session.commit() - return alias - - -def try_auto_create_catch_all_domain(address: str) -> Optional[Alias]: - """Try to create an alias with catch-all domain""" - - # try to create alias on-the-fly with custom-domain catch-all feature - # check if alias is custom-domain alias and if the custom-domain has catch-all enabled - alias_domain = get_email_domain_part(address) - custom_domain = CustomDomain.get_by(domain=alias_domain) - - if not custom_domain: - return None - - # custom_domain exists - if not custom_domain.catch_all: - return None - - # custom_domain has catch-all enabled - domain_user: User = custom_domain.user - - if not domain_user.can_create_new_alias(): - send_cannot_create_domain_alias(domain_user, address, alias_domain) - return None - - # if alias has been deleted before, do not auto-create it - if DeletedAlias.get_by(email=address, user_id=custom_domain.user_id): - LOG.warning( - "Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s", - address, - custom_domain, - domain_user, - ) - return None - - LOG.d("create alias %s for domain %s", address, custom_domain) - - alias = Alias.create( - email=address, - user_id=custom_domain.user_id, - custom_domain_id=custom_domain.id, - automatic_creation=True, - mailbox_id=domain_user.default_mailbox_id, - ) - - db.session.commit() - return alias - - def get_or_create_contact(website_from_header: str, alias: Alias) -> Contact: """ website_from_header can be the full-form email, i.e. "First Last "