diff --git a/app/email_utils.py b/app/email_utils.py index f81dec7f..0e519b38 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -1,5 +1,7 @@ +import base64 import email import os +import quopri import random import re from email.header import decode_header @@ -671,28 +673,70 @@ def is_valid_email(email_address: str) -> bool: ) +def get_encoding(msg: Message) -> str: + """ + Return the message encoding, possible values: + - quoted-printable + - base64 + - 7bit: default if unknown or empty + """ + cte = str(msg.get("content-transfer-encoding", "")).lower() + if cte in ("", "7bit"): + return "7bit" + + if cte in ("quoted-printable", "base64"): + return cte + + LOG.exception("Unknown encoding %s", cte) + + return "7bit" + + +def encode_text(text: str, encoding: str = "7bit") -> str: + if encoding == "quoted-printable": + encoded = quopri.encodestring(text.encode("utf-8")) + return str(encoded, "utf-8") + elif encoding == "base64": + encoded = base64.b64encode(text.encode("utf-8")) + return str(encoded, "utf-8") + else: # 7bit - no encoding + return text + + def add_header(msg: Message, text_header, html_header) -> Message: if msg.get_content_type() == "text/plain": + encoding = get_encoding(msg) payload = msg.get_payload() if type(payload) is str: clone_msg = copy(msg) - payload = f"{text_header}\n---\n{payload}" + to_append = encode_text(f"{text_header}\n---\n", encoding) + payload = f"{to_append}{payload}" clone_msg.set_payload(payload) return clone_msg elif msg.get_content_type() == "text/html": + encoding = get_encoding(msg) payload = msg.get_payload() if type(payload) is str: - - new_payload = f""" + new_payload = ( + encode_text( + f"""
{html_header} | |
{payload} | +""", + encoding, + ) + + payload + + encode_text( + """ |