diff --git a/app/email_utils.py b/app/email_utils.py index 63f2dc74..ddbfbaab 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -54,6 +54,7 @@ from app.models import ( IgnoreBounceSender, InvalidMailboxDomain, VerpType, + available_sl_email, ) from app.utils import ( random_string, @@ -1085,9 +1086,7 @@ def generate_reply_email(contact_email: str, alias: Alias) -> str: # reply_email = f"ra+{random_string(random_length)}@{config.EMAIL_DOMAIN}" reply_email = f"{random_string(random_length)}@{reply_domain}" - if not Contact.get_by(reply_email=reply_email) and not Alias.get_by( - email=reply_email - ): + if available_sl_email(reply_email): return reply_email raise Exception("Cannot generate reply email") diff --git a/app/models.py b/app/models.py index da91c9e2..da43039c 100644 --- a/app/models.py +++ b/app/models.py @@ -1293,15 +1293,24 @@ class OauthToken(Base, ModelMixin): return self.expired < arrow.now() -def generate_email( +def available_sl_email(email: str, check_trash: bool = True) -> bool: + if Alias.get_by(email=email) or Contact.get_by(reply_email=email): + return False + if check_trash and DeletedAlias.get_by(email=email): + return False + return True + + +def generate_random_alias_email( scheme: int = AliasGeneratorEnum.word.value, in_hex: bool = False, - alias_domain=config.FIRST_ALIAS_DOMAIN, - retries: int = 5, + alias_domain: str = config.FIRST_ALIAS_DOMAIN, + retries: int = 10, ) -> str: """generate an email address that does not exist before :param alias_domain: the domain used to generate the alias. :param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated + :param retries: int, How many times we can try to generate an alias in case of collision :type in_hex: bool, if the generate scheme is uuid, is hex favorable? """ if retries <= 0: @@ -1315,17 +1324,15 @@ def generate_email( random_email = random_email.lower().strip() # check that the client does not exist yet - if ( - not Alias.get_by(email=random_email) - and not DeletedAlias.get_by(email=random_email) - and not Contact.get_by(reply_email=random_email) - ): + if available_sl_email(random_email): LOG.d("generate email %s", random_email) return random_email # Rerun the function LOG.w("email %s already exists, generate a new email", random_email) - return generate_email(scheme=scheme, in_hex=in_hex, retries=retries - 1) + return generate_random_alias_email( + scheme=scheme, in_hex=in_hex, retries=retries - 1 + ) class Alias(Base, ModelMixin): @@ -1524,7 +1531,7 @@ class Alias(Base, ModelMixin): suffix = user.get_random_alias_suffix() email = f"{prefix}.{suffix}@{config.FIRST_ALIAS_DOMAIN}" - if not cls.get_by(email=email) and not DeletedAlias.get_by(email=email): + if available_sl_email(email, check_trash=False): break return Alias.create( @@ -1553,7 +1560,7 @@ class Alias(Base, ModelMixin): if user.default_alias_custom_domain_id: custom_domain = CustomDomain.get(user.default_alias_custom_domain_id) - random_email = generate_email( + random_email = generate_random_alias_email( scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain ) elif user.default_alias_public_domain_id: @@ -1561,12 +1568,12 @@ class Alias(Base, ModelMixin): if sl_domain.premium_only and not user.is_premium(): LOG.w("%s not premium, cannot use %s", user, sl_domain) else: - random_email = generate_email( + random_email = generate_random_alias_email( scheme=scheme, in_hex=in_hex, alias_domain=sl_domain.domain ) if not random_email: - random_email = generate_email(scheme=scheme, in_hex=in_hex) + random_email = generate_random_alias_email(scheme=scheme, in_hex=in_hex) alias = Alias.create( user_id=user.id, diff --git a/tests/test_models.py b/tests/test_models.py index 1037afec..8b4d6b94 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -8,7 +8,7 @@ from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN, NOREPLY from app.db import Session from app.email_utils import parse_full_address, generate_reply_email from app.models import ( - generate_email, + generate_random_alias_email, Alias, Contact, Mailbox, @@ -22,13 +22,13 @@ from tests.utils import login, create_new_user, random_token def test_generate_email(flask_client): - email = generate_email() + email = generate_random_alias_email() assert email.endswith("@" + EMAIL_DOMAIN) with pytest.raises(ValueError): UUID(email.split("@")[0], version=4) - email_uuid = generate_email(scheme=2) + email_uuid = generate_random_alias_email(scheme=2) assert UUID(email_uuid.split("@")[0], version=4)