add_header()

This commit is contained in:
Son NK 2020-11-07 13:00:12 +01:00
parent f57f29a97b
commit e659680875
2 changed files with 113 additions and 0 deletions

View File

@ -653,3 +653,40 @@ def is_valid_email(email_address: str) -> bool:
return validate_email(
email_address=email_address, check_mx=False, use_blacklist=False
)
def add_header(msg: Message, text_header, html_header) -> Message:
if msg.get_content_type() == "text/plain":
payload = msg.get_payload()
if type(payload) is str:
clone_msg = copy(msg)
payload = f"{text_header}\n---\n{payload}"
clone_msg.set_payload(payload)
return clone_msg
elif msg.get_content_type() == "text/html":
payload = msg.get_payload()
if type(payload) is str:
new_payload = f"""
<table width="100%" style="width: 100%; -premailer-width: 100%; -premailer-cellpadding: 0; -premailer-cellspacing: 0; margin: 0; padding: 0;">
<tr>
<td style="border-bottom:1px dashed #5675E2; padding: 10px 0px">{html_header}</td>
</tr>
<tr>
<td>{payload}</td>
</tr>
</table>
"""
clone_msg = copy(msg)
clone_msg.set_payload(new_payload)
return clone_msg
elif msg.get_content_type() in ("multipart/alternative", "multipart/related"):
new_parts = []
for part in msg.get_payload():
new_parts.append(add_header(part, text_header, html_header))
clone_msg = copy(msg)
clone_msg.set_payload(new_parts)
return clone_msg
LOG.d("No header added for %s", msg.get_content_type())
return msg

View File

@ -14,6 +14,7 @@ from app.email_utils import (
get_spam_from_header,
get_header_from_bounce,
is_valid_email,
add_header,
)
from app.extensions import db
from app.models import User, CustomDomain
@ -293,3 +294,78 @@ def test_is_valid_email():
assert not is_valid_email("with space@gmail.com")
assert not is_valid_email("strange char !ç@gmail.com")
assert not is_valid_email("emoji👌@gmail.com")
def test_add_header_plain_text():
msg = email.message_from_string(
"""Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit
Test-Header: Test-Value
coucou
"""
)
new_msg = add_header(msg, "text header", "html header")
assert "text header" in new_msg.as_string()
assert "html header" not in new_msg.as_string()
def test_add_header_html():
msg = email.message_from_string(
"""Content-Type: text/html; charset=us-ascii
Content-Transfer-Encoding: 7bit
Test-Header: Test-Value
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=us-ascii">
</head>
<body style="word-wrap: break-word;" class="">
<b class="">bold</b>
</body>
</html>
"""
)
new_msg = add_header(msg, "text header", "html header")
assert "Test-Header: Test-Value" in new_msg.as_string()
assert "<table" in new_msg.as_string()
assert "</table>" in new_msg.as_string()
assert "html header" in new_msg.as_string()
assert "text header" not in new_msg.as_string()
def test_add_header_multipart_alternative():
msg = email.message_from_string(
"""Content-Type: multipart/alternative;
boundary="foo"
Content-Transfer-Encoding: 7bit
Test-Header: Test-Value
--foo
Content-Transfer-Encoding: 7bit
Content-Type: text/plain;
charset=us-ascii
bold
--foo
Content-Transfer-Encoding: 7bit
Content-Type: text/html;
charset=us-ascii
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=us-ascii">
</head>
<body style="word-wrap: break-word;" class="">
<b class="">bold</b>
</body>
</html>
"""
)
new_msg = add_header(msg, "text header", "html header")
assert "Test-Header: Test-Value" in new_msg.as_string()
assert "<table" in new_msg.as_string()
assert "</table>" in new_msg.as_string()
assert "html header" in new_msg.as_string()
assert "text header" in new_msg.as_string()