Check there are not duplicate emails

This commit is contained in:
Adrià Casajús 2023-04-11 17:13:17 +02:00
parent 1e07cd2365
commit ad11c194b7
No known key found for this signature in database
GPG Key ID: F0033226A5AFC9B9
3 changed files with 25 additions and 19 deletions

View File

@ -54,6 +54,7 @@ from app.models import (
IgnoreBounceSender, IgnoreBounceSender,
InvalidMailboxDomain, InvalidMailboxDomain,
VerpType, VerpType,
available_sl_email,
) )
from app.utils import ( from app.utils import (
random_string, random_string,
@ -1085,9 +1086,7 @@ def generate_reply_email(contact_email: str, alias: Alias) -> str:
# reply_email = f"ra+{random_string(random_length)}@{config.EMAIL_DOMAIN}" # reply_email = f"ra+{random_string(random_length)}@{config.EMAIL_DOMAIN}"
reply_email = f"{random_string(random_length)}@{reply_domain}" reply_email = f"{random_string(random_length)}@{reply_domain}"
if not Contact.get_by(reply_email=reply_email) and not Alias.get_by( if available_sl_email(reply_email):
email=reply_email
):
return reply_email return reply_email
raise Exception("Cannot generate reply email") raise Exception("Cannot generate reply email")

View File

@ -1293,15 +1293,24 @@ class OauthToken(Base, ModelMixin):
return self.expired < arrow.now() return self.expired < arrow.now()
def generate_email( def available_sl_email(email: str, check_trash: bool = True) -> bool:
if Alias.get_by(email=email) or Contact.get_by(reply_email=email):
return False
if check_trash and DeletedAlias.get_by(email=email):
return False
return True
def generate_random_alias_email(
scheme: int = AliasGeneratorEnum.word.value, scheme: int = AliasGeneratorEnum.word.value,
in_hex: bool = False, in_hex: bool = False,
alias_domain=config.FIRST_ALIAS_DOMAIN, alias_domain: str = config.FIRST_ALIAS_DOMAIN,
retries: int = 5, retries: int = 10,
) -> str: ) -> str:
"""generate an email address that does not exist before """generate an email address that does not exist before
:param alias_domain: the domain used to generate the alias. :param alias_domain: the domain used to generate the alias.
:param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated :param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated
:param retries: int, How many times we can try to generate an alias in case of collision
:type in_hex: bool, if the generate scheme is uuid, is hex favorable? :type in_hex: bool, if the generate scheme is uuid, is hex favorable?
""" """
if retries <= 0: if retries <= 0:
@ -1315,17 +1324,15 @@ def generate_email(
random_email = random_email.lower().strip() random_email = random_email.lower().strip()
# check that the client does not exist yet # check that the client does not exist yet
if ( if available_sl_email(random_email):
not Alias.get_by(email=random_email)
and not DeletedAlias.get_by(email=random_email)
and not Contact.get_by(reply_email=random_email)
):
LOG.d("generate email %s", random_email) LOG.d("generate email %s", random_email)
return random_email return random_email
# Rerun the function # Rerun the function
LOG.w("email %s already exists, generate a new email", random_email) LOG.w("email %s already exists, generate a new email", random_email)
return generate_email(scheme=scheme, in_hex=in_hex, retries=retries - 1) return generate_random_alias_email(
scheme=scheme, in_hex=in_hex, retries=retries - 1
)
class Alias(Base, ModelMixin): class Alias(Base, ModelMixin):
@ -1524,7 +1531,7 @@ class Alias(Base, ModelMixin):
suffix = user.get_random_alias_suffix() suffix = user.get_random_alias_suffix()
email = f"{prefix}.{suffix}@{config.FIRST_ALIAS_DOMAIN}" email = f"{prefix}.{suffix}@{config.FIRST_ALIAS_DOMAIN}"
if not cls.get_by(email=email) and not DeletedAlias.get_by(email=email): if available_sl_email(email, check_trash=False):
break break
return Alias.create( return Alias.create(
@ -1553,7 +1560,7 @@ class Alias(Base, ModelMixin):
if user.default_alias_custom_domain_id: if user.default_alias_custom_domain_id:
custom_domain = CustomDomain.get(user.default_alias_custom_domain_id) custom_domain = CustomDomain.get(user.default_alias_custom_domain_id)
random_email = generate_email( random_email = generate_random_alias_email(
scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain
) )
elif user.default_alias_public_domain_id: elif user.default_alias_public_domain_id:
@ -1561,12 +1568,12 @@ class Alias(Base, ModelMixin):
if sl_domain.premium_only and not user.is_premium(): if sl_domain.premium_only and not user.is_premium():
LOG.w("%s not premium, cannot use %s", user, sl_domain) LOG.w("%s not premium, cannot use %s", user, sl_domain)
else: else:
random_email = generate_email( random_email = generate_random_alias_email(
scheme=scheme, in_hex=in_hex, alias_domain=sl_domain.domain scheme=scheme, in_hex=in_hex, alias_domain=sl_domain.domain
) )
if not random_email: if not random_email:
random_email = generate_email(scheme=scheme, in_hex=in_hex) random_email = generate_random_alias_email(scheme=scheme, in_hex=in_hex)
alias = Alias.create( alias = Alias.create(
user_id=user.id, user_id=user.id,

View File

@ -8,7 +8,7 @@ from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN, NOREPLY
from app.db import Session from app.db import Session
from app.email_utils import parse_full_address, generate_reply_email from app.email_utils import parse_full_address, generate_reply_email
from app.models import ( from app.models import (
generate_email, generate_random_alias_email,
Alias, Alias,
Contact, Contact,
Mailbox, Mailbox,
@ -22,13 +22,13 @@ from tests.utils import login, create_new_user, random_token
def test_generate_email(flask_client): def test_generate_email(flask_client):
email = generate_email() email = generate_random_alias_email()
assert email.endswith("@" + EMAIL_DOMAIN) assert email.endswith("@" + EMAIL_DOMAIN)
with pytest.raises(ValueError): with pytest.raises(ValueError):
UUID(email.split("@")[0], version=4) UUID(email.split("@")[0], version=4)
email_uuid = generate_email(scheme=2) email_uuid = generate_random_alias_email(scheme=2)
assert UUID(email_uuid.split("@")[0], version=4) assert UUID(email_uuid.split("@")[0], version=4)