From 38d377acb316e0ead04b475c90620cc27534440d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Fri, 20 Sep 2024 10:11:57 +0200 Subject: [PATCH] Extract contact creation logic to an external function (#2228) * Extract contact creation logic to an external function * PR comments --- app/contact_utils.py | 89 +++++++++++++++++++++++++++ email_handler.py | 113 ++++------------------------------ tests/test_contact_utils.py | 117 ++++++++++++++++++++++++++++++++++++ 3 files changed, 218 insertions(+), 101 deletions(-) create mode 100644 app/contact_utils.py create mode 100644 tests/test_contact_utils.py diff --git a/app/contact_utils.py b/app/contact_utils.py new file mode 100644 index 00000000..b0b6bd1c --- /dev/null +++ b/app/contact_utils.py @@ -0,0 +1,89 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +from sqlalchemy.exc import IntegrityError + +from app.db import Session +from app.email_utils import generate_reply_email +from app.email_validation import is_valid_email +from app.log import LOG +from app.models import Contact, Alias +from app.utils import sanitize_email + + +class ContactCreateError(Enum): + InvalidEmail = "Invalid email" + + +@dataclass +class ContactCreateResult: + contact: Optional[Contact] + error: Optional[ContactCreateError] + + +def __update_contact_if_needed( + contact: Contact, name: Optional[str], mail_from: Optional[str] +) -> ContactCreateResult: + if name and contact.name != name: + LOG.d(f"Setting {contact} name to {name}") + contact.name = name + Session.commit() + if mail_from and contact.mail_from is None: + LOG.d(f"Setting {contact} mail_from to {mail_from}") + contact.mail_from = mail_from + Session.commit() + return ContactCreateResult(contact, None) + + +def create_contact( + email: str, + name: Optional[str], + alias: Alias, + mail_from: Optional[str] = None, + allow_empty_email: bool = False, + automatic_created: bool = False, + from_partner: bool = False, +) -> ContactCreateResult: + if name is not None: + name = name[: Contact.MAX_NAME_LENGTH] + if name is not None and "\x00" in name: + LOG.w("Cannot use contact name because has \\x00") + name = "" + if not is_valid_email(email): + LOG.w(f"invalid contact email {email}") + if not allow_empty_email: + return ContactCreateResult(None, ContactCreateError.InvalidEmail) + LOG.d("Create a contact with invalid email for %s", alias) + # either reuse a contact with empty email or create a new contact with empty email + email = "" + email = sanitize_email(email, not_lower=True) + contact = Contact.get_by(alias_id=alias.id, website_email=email) + if contact is not None: + return __update_contact_if_needed(contact, name, mail_from) + reply_email = generate_reply_email(email, alias) + try: + flags = Contact.FLAG_PARTNER_CREATED if from_partner else 0 + contact = Contact.create( + user_id=alias.user_id, + alias_id=alias.id, + website_email=email, + name=name, + reply_email=reply_email, + mail_from=mail_from, + automatic_created=automatic_created, + flags=flags, + invalid_email=email == "", + commit=True, + ) + LOG.d( + f"Created contact {contact} for alias {alias} with email {email} invalid_email={contact.invalid_email}" + ) + except IntegrityError: + Session.rollback() + LOG.info( + f"Contact with email {email} for alias_id {alias.id} already existed, fetching from DB" + ) + contact = Contact.get_by(alias_id=alias.id, website_email=email) + return __update_contact_if_needed(contact, name, mail_from) + return ContactCreateResult(contact, None) diff --git a/email_handler.py b/email_handler.py index 69f3785d..39acfe18 100644 --- a/email_handler.py +++ b/email_handler.py @@ -52,7 +52,7 @@ from flanker.addresslib import address from flanker.addresslib.address import EmailAddress from sqlalchemy.exc import IntegrityError -from app import pgp_utils, s3, config +from app import pgp_utils, s3, config, contact_utils from app.alias_utils import try_auto_create, change_alias_status from app.config import ( EMAIL_DOMAIN, @@ -195,79 +195,16 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con mail_from, ) contact_email = mail_from - - if not is_valid_email(contact_email): - LOG.w( - "invalid contact email %s. Parse from %s %s", - contact_email, - from_header, - mail_from, - ) - # either reuse a contact with empty email or create a new contact with empty email - contact_email = "" - - contact_email = sanitize_email(contact_email, not_lower=True) - - if contact_name and "\x00" in contact_name: - LOG.w("issue with contact name %s", contact_name) - contact_name = "" - - contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) - if contact: - if contact.name != contact_name: - LOG.d( - "Update contact %s name %s to %s", - contact, - contact.name, - contact_name, - ) - contact.name = contact_name - Session.commit() - - # contact created in the past does not have mail_from and from_header field - if not contact.mail_from and mail_from: - LOG.d( - "Set contact mail_from %s: %s to %s", - contact, - contact.mail_from, - mail_from, - ) - contact.mail_from = mail_from - Session.commit() - else: - alias_id = alias.id - try: - contact_email_for_reply = ( - contact_email if is_valid_email(contact_email) else "" - ) - contact = Contact.create( - user_id=alias.user_id, - alias_id=alias_id, - website_email=contact_email, - name=contact_name, - mail_from=mail_from, - reply_email=generate_reply_email(contact_email_for_reply, alias), - automatic_created=True, - ) - if not contact_email: - LOG.d("Create a contact with invalid email for %s", alias) - contact.invalid_email = True - - LOG.d( - "create contact %s for %s, reverse alias:%s", - contact_email, - alias, - contact.reply_email, - ) - - Session.commit() - except IntegrityError: - LOG.info( - f"Contact with email {contact_email} for alias_id {alias_id} already existed, fetching from DB" - ) - contact = Contact.get_by(alias_id=alias_id, website_email=contact_email) - - return contact + contact_result = contact_utils.create_contact( + email=contact_email, + name=contact_name, + alias=alias, + mail_from=mail_from, + allow_empty_email=True, + automatic_created=True, + from_partner=False, + ) + return contact_result.contact def get_or_create_reply_to_contact( @@ -292,33 +229,7 @@ def get_or_create_reply_to_contact( ) return None - contact = Contact.get_by(alias_id=alias.id, website_email=contact_address) - if contact: - return contact - else: - LOG.d( - "create contact %s for alias %s via reply-to header %s", - contact_address, - alias, - reply_to_header, - ) - - try: - contact = Contact.create( - user_id=alias.user_id, - alias_id=alias.id, - website_email=contact_address, - name=contact_name, - reply_email=generate_reply_email(contact_address, alias), - automatic_created=True, - ) - Session.commit() - except IntegrityError: - LOG.w("Contact %s %s already exist", alias, contact_address) - Session.rollback() - contact = Contact.get_by(alias_id=alias.id, website_email=contact_address) - - return contact + return contact_utils.create_contact(contact_address, contact_name, alias).contact def replace_header_when_forward(msg: Message, alias: Alias, header: str): diff --git a/tests/test_contact_utils.py b/tests/test_contact_utils.py new file mode 100644 index 00000000..f2be004f --- /dev/null +++ b/tests/test_contact_utils.py @@ -0,0 +1,117 @@ +from typing import Optional + +import pytest +from app.contact_utils import create_contact, ContactCreateError +from app.db import Session +from app.models import ( + Alias, + Contact, +) +from tests.utils import create_new_user, random_email, random_token + + +def create_provider(): + # name auto_created from_partner + yield ["name", "a@b.c", True, True] + yield [None, None, True, True] + yield [None, None, False, True] + yield [None, None, True, False] + yield [None, None, False, False] + + +@pytest.mark.parametrize( + "name, mail_from, automatic_created, from_partner", create_provider() +) +def test_create_contact( + name: Optional[str], + mail_from: Optional[str], + automatic_created: bool, + from_partner: bool, +): + user = create_new_user() + alias = Alias.create_new_random(user) + Session.commit() + email = random_email() + contact_result = create_contact( + email, + name, + alias, + mail_from=mail_from, + automatic_created=automatic_created, + from_partner=from_partner, + ) + assert contact_result.error is None + contact = contact_result.contact + assert contact.user_id == user.id + assert contact.alias_id == alias.id + assert contact.website_email == email + assert contact.name == name + assert contact.mail_from == mail_from + assert contact.automatic_created == automatic_created + assert not contact.invalid_email + expected_flags = Contact.FLAG_PARTNER_CREATED if from_partner else 0 + assert contact.flags == expected_flags + + +def test_create_contact_email_email_not_allowed(): + user = create_new_user() + alias = Alias.create_new_random(user) + Session.commit() + contact_result = create_contact("", "", alias) + assert contact_result.contact is None + assert contact_result.error == ContactCreateError.InvalidEmail + + +def test_create_contact_email_email_allowed(): + user = create_new_user() + alias = Alias.create_new_random(user) + Session.commit() + contact_result = create_contact("", "", alias, allow_empty_email=True) + assert contact_result.error is None + assert contact_result.contact is not None + assert contact_result.contact.website_email == "" + assert contact_result.contact.invalid_email + + +def test_do_not_allow_invalid_email(): + user = create_new_user() + alias = Alias.create_new_random(user) + Session.commit() + contact_result = create_contact("potato", "", alias) + assert contact_result.contact is None + assert contact_result.error == ContactCreateError.InvalidEmail + contact_result = create_contact("asdf\x00@gmail.com", "", alias) + assert contact_result.contact is None + assert contact_result.error == ContactCreateError.InvalidEmail + + +def test_update_name_for_existing(): + user = create_new_user() + alias = Alias.create_new_random(user) + Session.commit() + email = random_email() + contact_result = create_contact(email, "", alias) + assert contact_result.error is None + assert contact_result.contact is not None + assert contact_result.contact.name == "" + name = random_token() + contact_result = create_contact(email, name, alias) + assert contact_result.error is None + assert contact_result.contact is not None + assert contact_result.contact.name == name + + +def test_update_mail_from_for_existing(): + user = create_new_user() + alias = Alias.create_new_random(user) + Session.commit() + email = random_email() + contact_result = create_contact(email, "", alias) + assert contact_result.error is None + assert contact_result.contact is not None + assert contact_result.contact.mail_from is None + mail_from = random_email() + contact_result = create_contact(email, "", alias, mail_from=mail_from) + assert contact_result.error is None + assert contact_result.contact is not None + assert contact_result.contact.mail_from == mail_from