make sure a contact with website_email=reverse alias of another contact can't be created

This commit is contained in:
Son 2022-01-07 10:04:12 +01:00
parent bb6aec8b80
commit ad622df071
4 changed files with 61 additions and 12 deletions

View File

@ -220,7 +220,12 @@ def alias_contact_manager(alias_id):
reply_email=generate_reply_email(contact_email, current_user), reply_email=generate_reply_email(contact_email, current_user),
) )
LOG.d("create reverse-alias for %s", contact_addr) LOG.d(
"create reverse-alias for %s %s, reverse alias:%s",
contact_addr,
alias,
contact.reply_email,
)
Session.commit() Session.commit()
flash(f"Reverse alias for {contact_addr} is created", "success") flash(f"Reverse alias for {contact_addr} is created", "success")

View File

@ -14,3 +14,9 @@ class SubdomainInTrashError(Exception):
"""raised when a subdomain is deleted before """ """raised when a subdomain is deleted before """
pass pass
class CannotCreateContactForReverseAlias(Exception):
"""raised when a contact is created that has website_email=reverse_alias of another contact"""
pass

View File

@ -35,7 +35,12 @@ from app.config import (
MAX_NB_DIRECTORY, MAX_NB_DIRECTORY,
) )
from app.db import Session from app.db import Session
from app.errors import AliasInTrashError, DirectoryInTrashError, SubdomainInTrashError from app.errors import (
AliasInTrashError,
DirectoryInTrashError,
SubdomainInTrashError,
CannotCreateContactForReverseAlias,
)
from app.log import LOG from app.log import LOG
from app.oauth_models import Scope from app.oauth_models import Scope
from app.pw_models import PasswordOracle from app.pw_models import PasswordOracle
@ -109,7 +114,7 @@ class ModelMixin(object):
@classmethod @classmethod
def create(cls, **kw): def create(cls, **kw):
# whether should call Session.commit # whether to call Session.commit
commit = kw.pop("commit", False) commit = kw.pop("commit", False)
flush = kw.pop("flush", False) flush = kw.pop("flush", False)
@ -1264,8 +1269,9 @@ class Alias(Base, ModelMixin):
@classmethod @classmethod
def create(cls, **kw): def create(cls, **kw):
commit = kw.pop("commit", False) commit = kw.pop("commit", False)
flush = kw.pop("flush", False)
r = cls(**kw) new_alias = cls(**kw)
email = kw["email"] email = kw["email"]
# make sure email is lowercase and doesn't have any whitespace # make sure email is lowercase and doesn't have any whitespace
@ -1282,12 +1288,17 @@ class Alias(Base, ModelMixin):
if "custom_domain_id" not in kw: if "custom_domain_id" not in kw:
custom_domain = Alias.get_custom_domain(email) custom_domain = Alias.get_custom_domain(email)
if custom_domain: if custom_domain:
r.custom_domain_id = custom_domain.id new_alias.custom_domain_id = custom_domain.id
Session.add(new_alias)
Session.add(r)
if commit: if commit:
Session.commit() Session.commit()
return r
if flush:
Session.flush()
return new_alias
@classmethod @classmethod
def create_new(cls, user, prefix, note=None, mailbox_id=None): def create_new(cls, user, prefix, note=None, mailbox_id=None):
@ -1534,6 +1545,31 @@ class Contact(Base, ModelMixin):
def email(self): def email(self):
return self.website_email return self.website_email
@classmethod
def create(cls, **kw):
commit = kw.pop("commit", False)
flush = kw.pop("flush", False)
new_contact = cls(**kw)
website_email = kw["website_email"]
# make sure email is lowercase and doesn't have any whitespace
website_email = sanitize_email(website_email)
# make sure alias is not in global trash, i.e. DeletedAlias table
if Contact.get_by(reply_email=website_email):
raise CannotCreateContactForReverseAlias
Session.add(new_contact)
if commit:
Session.commit()
if flush:
Session.flush()
return new_contact
def website_send_to(self): def website_send_to(self):
"""return the email address with name. """return the email address with name.
to use when user wants to send an email from the alias to use when user wants to send an email from the alias

View File

@ -204,11 +204,6 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
contact.mail_from = mail_from contact.mail_from = mail_from
Session.commit() Session.commit()
else: else:
LOG.d(
"create contact %s for alias %s",
contact_email,
alias,
)
try: try:
contact = Contact.create( contact = Contact.create(
@ -225,6 +220,13 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
LOG.d("Create a contact with invalid email for %s", alias) LOG.d("Create a contact with invalid email for %s", alias)
contact.invalid_email = True contact.invalid_email = True
LOG.d(
"create contact %s for %s, reverse alias:%s",
contact_email,
alias,
contact.reply_email,
)
Session.commit() Session.commit()
except IntegrityError: except IntegrityError:
LOG.w("Contact %s %s already exist", alias, contact_email) LOG.w("Contact %s %s already exist", alias, contact_email)