mirror of
https://github.com/simple-login/app.git
synced 2024-09-30 05:31:30 +02:00
use getaddresses to parse multiple address from To, CC header. Remove get_addrs_from_header()
This commit is contained in:
parent
8aed5ced3f
commit
f069d2f083
@ -538,24 +538,6 @@ def get_orig_message_from_spamassassin_report(msg: Message) -> Message:
|
|||||||
return part
|
return part
|
||||||
|
|
||||||
|
|
||||||
def get_addrs_from_header(msg: Message, header) -> [str]:
|
|
||||||
"""Get all addresses contained in `header`
|
|
||||||
Used for To or CC header.
|
|
||||||
"""
|
|
||||||
ret = []
|
|
||||||
header_content = msg.get_all(header)
|
|
||||||
if not header_content:
|
|
||||||
return ret
|
|
||||||
|
|
||||||
for addrs in header_content:
|
|
||||||
addrs = get_header_unicode(addrs)
|
|
||||||
for addr in addrs.split(","):
|
|
||||||
ret.append(addr.strip())
|
|
||||||
|
|
||||||
# do not return empty string
|
|
||||||
return [r for r in ret if r]
|
|
||||||
|
|
||||||
|
|
||||||
def get_spam_info(msg: Message, max_score=None) -> (bool, str):
|
def get_spam_info(msg: Message, max_score=None) -> (bool, str):
|
||||||
"""parse SpamAssassin header to detect whether a message is classified as spam.
|
"""parse SpamAssassin header to detect whether a message is classified as spam.
|
||||||
Return (is spam, spam status detail)
|
Return (is spam, spam status detail)
|
||||||
|
@ -41,7 +41,7 @@ from email.encoders import encode_noop
|
|||||||
from email.message import Message
|
from email.message import Message
|
||||||
from email.mime.application import MIMEApplication
|
from email.mime.application import MIMEApplication
|
||||||
from email.mime.multipart import MIMEMultipart
|
from email.mime.multipart import MIMEMultipart
|
||||||
from email.utils import formataddr, make_msgid, formatdate
|
from email.utils import formataddr, make_msgid, formatdate, getaddresses
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from smtplib import SMTP, SMTPRecipientsRefused
|
from smtplib import SMTP, SMTPRecipientsRefused
|
||||||
from typing import List, Tuple, Optional
|
from typing import List, Tuple, Optional
|
||||||
@ -88,7 +88,6 @@ from app.email_utils import (
|
|||||||
render,
|
render,
|
||||||
get_orig_message_from_bounce,
|
get_orig_message_from_bounce,
|
||||||
delete_all_headers_except,
|
delete_all_headers_except,
|
||||||
get_addrs_from_header,
|
|
||||||
get_spam_info,
|
get_spam_info,
|
||||||
get_orig_message_from_spamassassin_report,
|
get_orig_message_from_spamassassin_report,
|
||||||
parseaddr_unicode,
|
parseaddr_unicode,
|
||||||
@ -242,16 +241,12 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str):
|
|||||||
"""
|
"""
|
||||||
Replace CC or To header by Reply emails in forward phase
|
Replace CC or To header by Reply emails in forward phase
|
||||||
"""
|
"""
|
||||||
addrs = get_addrs_from_header(msg, header)
|
|
||||||
|
|
||||||
# Nothing to do
|
|
||||||
if not addrs:
|
|
||||||
return
|
|
||||||
|
|
||||||
new_addrs: [str] = []
|
new_addrs: [str] = []
|
||||||
|
|
||||||
for addr in addrs:
|
for contact_name, contact_email in getaddresses(msg.get_all(header, [])):
|
||||||
contact_name, contact_email = parseaddr_unicode(addr)
|
# convert back to original then parse again to make sure contact_name is unicode
|
||||||
|
addr = formataddr((contact_name, contact_email))
|
||||||
|
contact_name, contact = parseaddr_unicode(addr)
|
||||||
|
|
||||||
# no transformation when alias is already in the header
|
# no transformation when alias is already in the header
|
||||||
if contact_email == alias.email:
|
if contact_email == alias.email:
|
||||||
@ -309,17 +304,9 @@ def replace_header_when_reply(msg: Message, alias: Alias, header: str):
|
|||||||
"""
|
"""
|
||||||
Replace CC or To Reply emails by original emails
|
Replace CC or To Reply emails by original emails
|
||||||
"""
|
"""
|
||||||
addrs = get_addrs_from_header(msg, header)
|
|
||||||
|
|
||||||
# Nothing to do
|
|
||||||
if not addrs:
|
|
||||||
return
|
|
||||||
|
|
||||||
new_addrs: [str] = []
|
new_addrs: [str] = []
|
||||||
|
|
||||||
for addr in addrs:
|
for _, reply_email in getaddresses(msg.get_all(header, [])):
|
||||||
_, reply_email = parseaddr_unicode(addr)
|
|
||||||
|
|
||||||
# no transformation when alias is already in the header
|
# no transformation when alias is already in the header
|
||||||
if reply_email == alias.email:
|
if reply_email == alias.email:
|
||||||
continue
|
continue
|
||||||
@ -330,7 +317,7 @@ def replace_header_when_reply(msg: Message, alias: Alias, header: str):
|
|||||||
"%s email in reply phase %s must be reply emails", header, reply_email
|
"%s email in reply phase %s must be reply emails", header, reply_email
|
||||||
)
|
)
|
||||||
# still keep this email in header
|
# still keep this email in header
|
||||||
new_addrs.append(addr)
|
new_addrs.append(reply_email)
|
||||||
else:
|
else:
|
||||||
new_addrs.append(formataddr((contact.name, contact.website_email)))
|
new_addrs.append(formataddr((contact.name, contact.website_email)))
|
||||||
|
|
||||||
|
@ -17,7 +17,6 @@ from app.email_utils import (
|
|||||||
add_header,
|
add_header,
|
||||||
to_bytes,
|
to_bytes,
|
||||||
generate_reply_email,
|
generate_reply_email,
|
||||||
get_addrs_from_header,
|
|
||||||
)
|
)
|
||||||
from app.extensions import db
|
from app.extensions import db
|
||||||
from app.models import User, CustomDomain
|
from app.models import User, CustomDomain
|
||||||
@ -409,22 +408,3 @@ def test_generate_reply_email(flask_client):
|
|||||||
# make sure reply_email only contain lowercase
|
# make sure reply_email only contain lowercase
|
||||||
reply_email = generate_reply_email("TEST@example.org")
|
reply_email = generate_reply_email("TEST@example.org")
|
||||||
assert reply_email.startswith("ra+test.at.example.org")
|
assert reply_email.startswith("ra+test.at.example.org")
|
||||||
|
|
||||||
|
|
||||||
def test_get_addrs_from_header():
|
|
||||||
msg = email.message_from_string("""To: abcd@test.org""")
|
|
||||||
assert get_addrs_from_header(msg, "To") == ["abcd@test.org"]
|
|
||||||
|
|
||||||
msg = email.message_from_string("""To: abcd@test.org, xyz@test.org""")
|
|
||||||
assert get_addrs_from_header(msg, "To") == ["abcd@test.org", "xyz@test.org"]
|
|
||||||
|
|
||||||
msg = email.message_from_string("""To: ABCD <abcd@test.org>, XYZ <xyz@test.org>""")
|
|
||||||
assert get_addrs_from_header(msg, "To") == [
|
|
||||||
"ABCD <abcd@test.org>",
|
|
||||||
"XYZ <xyz@test.org>",
|
|
||||||
]
|
|
||||||
|
|
||||||
msg = email.message_from_string(
|
|
||||||
"""To: =?unknown-8bit?q?test=40example=2eorg=2c_xyz=40test=2eorg?="""
|
|
||||||
)
|
|
||||||
assert get_addrs_from_header(msg, "To") == ["test@example.org", "xyz@test.org"]
|
|
||||||
|
Loading…
Reference in New Issue
Block a user