mirror of
https://github.com/simple-login/app.git
synced 2024-09-30 05:31:30 +02:00
Merge pull request #337 from simple-login/header-encoding
Fix the encoding issue when adding header
This commit is contained in:
commit
84e64d4c4f
@ -1,5 +1,7 @@
|
|||||||
|
import base64
|
||||||
import email
|
import email
|
||||||
import os
|
import os
|
||||||
|
import quopri
|
||||||
import random
|
import random
|
||||||
import re
|
import re
|
||||||
from email.header import decode_header
|
from email.header import decode_header
|
||||||
@ -671,28 +673,70 @@ def is_valid_email(email_address: str) -> bool:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_encoding(msg: Message) -> str:
|
||||||
|
"""
|
||||||
|
Return the message encoding, possible values:
|
||||||
|
- quoted-printable
|
||||||
|
- base64
|
||||||
|
- 7bit: default if unknown or empty
|
||||||
|
"""
|
||||||
|
cte = str(msg.get("content-transfer-encoding", "")).lower()
|
||||||
|
if cte in ("", "7bit"):
|
||||||
|
return "7bit"
|
||||||
|
|
||||||
|
if cte in ("quoted-printable", "base64"):
|
||||||
|
return cte
|
||||||
|
|
||||||
|
LOG.exception("Unknown encoding %s", cte)
|
||||||
|
|
||||||
|
return "7bit"
|
||||||
|
|
||||||
|
|
||||||
|
def encode_text(text: str, encoding: str = "7bit") -> str:
|
||||||
|
if encoding == "quoted-printable":
|
||||||
|
encoded = quopri.encodestring(text.encode("utf-8"))
|
||||||
|
return str(encoded, "utf-8")
|
||||||
|
elif encoding == "base64":
|
||||||
|
encoded = base64.b64encode(text.encode("utf-8"))
|
||||||
|
return str(encoded, "utf-8")
|
||||||
|
else: # 7bit - no encoding
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
def add_header(msg: Message, text_header, html_header) -> Message:
|
def add_header(msg: Message, text_header, html_header) -> Message:
|
||||||
if msg.get_content_type() == "text/plain":
|
if msg.get_content_type() == "text/plain":
|
||||||
|
encoding = get_encoding(msg)
|
||||||
payload = msg.get_payload()
|
payload = msg.get_payload()
|
||||||
if type(payload) is str:
|
if type(payload) is str:
|
||||||
clone_msg = copy(msg)
|
clone_msg = copy(msg)
|
||||||
payload = f"{text_header}\n---\n{payload}"
|
to_append = encode_text(f"{text_header}\n---\n", encoding)
|
||||||
|
payload = f"{to_append}{payload}"
|
||||||
clone_msg.set_payload(payload)
|
clone_msg.set_payload(payload)
|
||||||
return clone_msg
|
return clone_msg
|
||||||
elif msg.get_content_type() == "text/html":
|
elif msg.get_content_type() == "text/html":
|
||||||
|
encoding = get_encoding(msg)
|
||||||
payload = msg.get_payload()
|
payload = msg.get_payload()
|
||||||
if type(payload) is str:
|
if type(payload) is str:
|
||||||
|
new_payload = (
|
||||||
new_payload = f"""
|
encode_text(
|
||||||
|
f"""
|
||||||
<table width="100%" style="width: 100%; -premailer-width: 100%; -premailer-cellpadding: 0; -premailer-cellspacing: 0; margin: 0; padding: 0;">
|
<table width="100%" style="width: 100%; -premailer-width: 100%; -premailer-cellpadding: 0; -premailer-cellspacing: 0; margin: 0; padding: 0;">
|
||||||
<tr>
|
<tr>
|
||||||
<td style="border-bottom:1px dashed #5675E2; padding: 10px 0px">{html_header}</td>
|
<td style="border-bottom:1px dashed #5675E2; padding: 10px 0px">{html_header}</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>{payload}</td>
|
<td>""",
|
||||||
|
encoding,
|
||||||
|
)
|
||||||
|
+ payload
|
||||||
|
+ encode_text(
|
||||||
|
"""</td>
|
||||||
</tr>
|
</tr>
|
||||||
</table>
|
</table>
|
||||||
"""
|
""",
|
||||||
|
encoding,
|
||||||
|
)
|
||||||
|
)
|
||||||
clone_msg = copy(msg)
|
clone_msg = copy(msg)
|
||||||
clone_msg.set_payload(new_payload)
|
clone_msg.set_payload(new_payload)
|
||||||
return clone_msg
|
return clone_msg
|
||||||
|
@ -18,6 +18,8 @@ from app.email_utils import (
|
|||||||
to_bytes,
|
to_bytes,
|
||||||
generate_reply_email,
|
generate_reply_email,
|
||||||
normalize_reply_email,
|
normalize_reply_email,
|
||||||
|
get_encoding,
|
||||||
|
encode_text,
|
||||||
)
|
)
|
||||||
from app.extensions import db
|
from app.extensions import db
|
||||||
from app.models import User, CustomDomain
|
from app.models import User, CustomDomain
|
||||||
@ -416,3 +418,28 @@ def test_generate_reply_email(flask_client):
|
|||||||
def test_normalize_reply_email(flask_client):
|
def test_normalize_reply_email(flask_client):
|
||||||
assert normalize_reply_email("re+abcd@sl.local") == "re+abcd@sl.local"
|
assert normalize_reply_email("re+abcd@sl.local") == "re+abcd@sl.local"
|
||||||
assert normalize_reply_email('re+"ab cd"@sl.local') == "re+_ab_cd_@sl.local"
|
assert normalize_reply_email('re+"ab cd"@sl.local') == "re+_ab_cd_@sl.local"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_encoding():
|
||||||
|
msg = email.message_from_string("")
|
||||||
|
assert get_encoding(msg) == "7bit"
|
||||||
|
|
||||||
|
msg = email.message_from_string("Content-TRANSFER-encoding: Invalid")
|
||||||
|
assert get_encoding(msg) == "7bit"
|
||||||
|
|
||||||
|
msg = email.message_from_string("Content-TRANSFER-encoding: quoted-printable")
|
||||||
|
assert get_encoding(msg) == "quoted-printable"
|
||||||
|
|
||||||
|
msg = email.message_from_string("Content-TRANSFER-encoding: base64")
|
||||||
|
assert get_encoding(msg) == "base64"
|
||||||
|
|
||||||
|
|
||||||
|
def test_encode_text():
|
||||||
|
assert encode_text("") == ""
|
||||||
|
assert encode_text("ascii") == "ascii"
|
||||||
|
assert encode_text("ascii", "base64") == "YXNjaWk="
|
||||||
|
assert encode_text("ascii", "quoted-printable") == "ascii"
|
||||||
|
|
||||||
|
assert encode_text("mèo méo") == "mèo méo"
|
||||||
|
assert encode_text("mèo méo", "base64") == "bcOobyBtw6lv"
|
||||||
|
assert encode_text("mèo méo", "quoted-printable") == "m=C3=A8o m=C3=A9o"
|
||||||
|
Loading…
Reference in New Issue
Block a user