From 313a928070abcceaaa19adfd9e4600598d2575f3 Mon Sep 17 00:00:00 2001 From: Son Nguyen Kim Date: Mon, 5 Sep 2022 08:40:24 +0200 Subject: [PATCH] Create sl_formataddr to handle unicode for built-in formataddr (#1265) * Create sl_formataddr to handle unicode for built-in formataddr * fix circular import --- app/email_utils.py | 8 +++++++- app/models.py | 7 ++++--- email_handler.py | 17 +++++++++-------- tests/test_email_utils.py | 9 +++++++++ 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/app/email_utils.py b/app/email_utils.py index 08a90bdc..b8769957 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -14,7 +14,7 @@ from email.header import decode_header, Header from email.message import Message, EmailMessage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -from email.utils import make_msgid, formatdate +from email.utils import make_msgid, formatdate, formataddr from smtplib import SMTP, SMTPException from typing import Tuple, List, Optional, Union @@ -1463,3 +1463,9 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: if data[2] > (time.time() + VERP_MESSAGE_LIFETIME - VERP_TIME_START) / 60: return None return VerpType(data[0]), data[1] + + +def sl_formataddr(name_address_tuple: Tuple[str, str]): + """Same as formataddr but use utf-8 encoding by default""" + name, addr = name_address_tuple + return formataddr((name, Header(addr, "utf-8"))) diff --git a/app/models.py b/app/models.py index 0edcb858..77e112e7 100644 --- a/app/models.py +++ b/app/models.py @@ -8,7 +8,6 @@ import os import random import secrets import uuid -from email.utils import formataddr from typing import List, Tuple, Optional, Union import arrow @@ -27,8 +26,8 @@ from sqlalchemy.orm import deferred from sqlalchemy.sql import and_ from sqlalchemy_utils import ArrowType -from app import s3 from app import config +from app import s3 from app.db import Session from app.errors import ( AliasInTrashError, @@ -1807,7 +1806,9 @@ class Contact(Base, ModelMixin): else formatted_email ) - new_addr = formataddr((new_name, self.reply_email)).strip() + from app.email_utils import sl_formataddr + + new_addr = sl_formataddr((new_name, self.reply_email)).strip() return new_addr.strip() def last_reply(self) -> "EmailLog": diff --git a/email_handler.py b/email_handler.py index f7cb5ee1..b8522927 100644 --- a/email_handler.py +++ b/email_handler.py @@ -39,7 +39,7 @@ from email.encoders import encode_noop from email.message import Message from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart -from email.utils import formataddr, make_msgid, formatdate, getaddresses +from email.utils import make_msgid, formatdate, getaddresses from io import BytesIO from smtplib import SMTPRecipientsRefused, SMTPServerDisconnected from typing import List, Tuple, Optional @@ -122,6 +122,7 @@ from app.email_utils import ( save_envelope_for_debugging, get_verp_info_from_email, generate_verp_email, + sl_formataddr, ) from app.errors import ( NonReverseAliasInReplyPhase, @@ -134,14 +135,14 @@ from app.handler.dmarc import ( apply_dmarc_policy_for_reply_phase, apply_dmarc_policy_for_forward_phase, ) -from app.handler.spamd_result import ( - SpamdResult, - SPFCheckResult, -) from app.handler.provider_complaint import ( handle_hotmail_complaint, handle_yahoo_complaint, ) +from app.handler.spamd_result import ( + SpamdResult, + SPFCheckResult, +) from app.handler.unsubscribe_generator import UnsubscribeGenerator from app.handler.unsubscribe_handler import UnsubscribeHandler from app.log import LOG, set_message_id @@ -445,7 +446,7 @@ def replace_header_when_reply(msg: Message, alias: Alias, header: str): # still keep this email in header # new_addrs.append(reply_email) else: - new_addrs.append(formataddr((contact.name, contact.website_email))) + new_addrs.append(sl_formataddr((contact.name, contact.website_email))) if new_addrs: new_header = ",".join(new_addrs) @@ -1156,7 +1157,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str): # add alias name from alias if alias.name: LOG.d("Put alias name %s in from header", alias.name) - from_header = formataddr((alias.name, alias.email)) + from_header = sl_formataddr((alias.name, alias.email)) elif alias.custom_domain: # add alias name from domain if alias.custom_domain.name: @@ -1164,7 +1165,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str): "Put domain default alias name %s in from header", alias.custom_domain.name, ) - from_header = formataddr((alias.custom_domain.name, alias.email)) + from_header = sl_formataddr((alias.custom_domain.name, alias.email)) LOG.d("From header is %s", from_header) add_or_replace_header(msg, headers.FROM, from_header) diff --git a/tests/test_email_utils.py b/tests/test_email_utils.py index a1207b36..7ddf8360 100644 --- a/tests/test_email_utils.py +++ b/tests/test_email_utils.py @@ -1,6 +1,7 @@ import email import os from email.message import EmailMessage +from email.utils import formataddr import arrow import pytest @@ -37,6 +38,7 @@ from app.email_utils import ( is_invalid_mailbox_domain, generate_verp_email, get_verp_info_from_email, + sl_formataddr, ) from app.models import ( CustomDomain, @@ -777,3 +779,10 @@ def test_add_header_multipart_with_invalid_part(): assert part.get_payload().index("INJECT") > -1 else: assert part == "invalid" + + +def test_sl_formataddr(): + assert sl_formataddr(("é", "è@ç.à")) == "=?utf-8?b?w6k=?= <è@ç.à>" + # test that the same name-address can't be handled by the built-in formataddr + with pytest.raises(UnicodeEncodeError): + formataddr(("é", "è@ç.à"))