handle UnicodeEncodeError in copy() and to_bytes()

This commit is contained in:
Son NK 2020-11-10 16:02:19 +01:00
parent c3f73b25b2
commit 632a5bbbc8
2 changed files with 29 additions and 5 deletions

View File

@ -634,10 +634,11 @@ def parseaddr_unicode(addr) -> (str, str):
def copy(msg: Message) -> Message:
"""return a copy of message"""
try:
return email.message_from_bytes(to_bytes(msg))
except UnicodeEncodeError:
LOG.warning("to_bytes() fails, try string")
# prefer the unicode way
return email.message_from_string(msg.as_string())
except UnicodeEncodeError:
LOG.warning("as_string() fails, try to_bytes")
return email.message_from_bytes(to_bytes(msg))
def to_bytes(msg: Message):
@ -650,7 +651,13 @@ def to_bytes(msg: Message):
return msg.as_bytes(policy=email.policy.SMTP)
except UnicodeEncodeError:
LOG.warning("as_bytes fails with SMTP policy, try SMTPUTF8 policy")
return msg.as_bytes(policy=email.policy.SMTPUTF8)
try:
return msg.as_bytes(policy=email.policy.SMTPUTF8)
except UnicodeEncodeError:
LOG.warning(
"as_bytes fails with SMTPUTF8 policy, try converting to string"
)
return msg.as_string().encode()
def should_add_dkim_signature(domain: str) -> bool:

View File

@ -15,6 +15,7 @@ from app.email_utils import (
get_header_from_bounce,
is_valid_email,
add_header,
to_bytes,
)
from app.extensions import db
from app.models import User, CustomDomain
@ -138,8 +139,11 @@ def test_copy():
"""
msg = email.message_from_string(email_str)
msg2 = copy(msg)
assert to_bytes(msg) == to_bytes(msg2)
assert msg.as_bytes() == msg2.as_bytes()
msg = email.message_from_string("👌")
msg2 = copy(msg)
assert to_bytes(msg) == to_bytes(msg2)
def test_get_spam_from_header():
@ -369,3 +373,16 @@ Content-Type: text/html;
assert "</table>" in new_msg.as_string()
assert "html header" in new_msg.as_string()
assert "text header" in new_msg.as_string()
def test_to_bytes():
msg = email.message_from_string("☕️ emoji")
assert to_bytes(msg)
# \n is appended when message is converted to bytes
assert to_bytes(msg).decode() == "\n☕️ emoji"
msg = email.message_from_string("ascii")
assert to_bytes(msg) == b"\nascii"
msg = email.message_from_string("éèà€")
assert to_bytes(msg).decode() == "\néèà€"