create normalize_reply_email(): handle case where reply email contains space, quote, etc

This commit is contained in:
Son NK 2020-11-22 13:07:09 +01:00
parent a1d5b01143
commit bcdf522174
5 changed files with 54 additions and 6 deletions

View File

@ -39,7 +39,7 @@ from app.dns_utils import get_mx_domains
from app.extensions import db
from app.log import LOG
from app.models import Mailbox, User, SentAlert, CustomDomain, SLDomain, Contact
from app.utils import random_string, convert_to_id
from app.utils import random_string, convert_to_id, convert_to_alphanumeric
def render(template_name, **kwargs) -> str:
@ -727,6 +727,7 @@ def generate_reply_email(contact_email: str) -> str:
contact_email = contact_email.lower().strip().replace(" ", "")
contact_email = contact_email[:45]
contact_email = contact_email.replace("@", ".at.")
contact_email = convert_to_alphanumeric(contact_email)
# not use while to avoid infinite loop
for _ in range(1000):
@ -747,3 +748,23 @@ def generate_reply_email(contact_email: str) -> str:
def is_reply_email(address: str) -> bool:
return address.startswith("reply+") or address.startswith("ra+")
# allow also + and @ that are present in a reply address
_ALLOWED_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.+@"
def normalize_reply_email(reply_email: str) -> str:
"""Handle the case where reply email contains *strange* char that was wrongly generated in the past"""
if not reply_email.isascii():
reply_email = convert_to_id(reply_email)
ret = []
# drop all control characters like shift, separator, etc
for c in reply_email:
if c not in _ALLOWED_CHARS:
ret.append("_")
else:
ret.append(c)
return "".join(ret)

View File

@ -38,8 +38,24 @@ def convert_to_id(s: str):
s = s.replace(" ", "")
s = s.lower()
s = unidecode(s)
return s
_ALLOWED_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-."
def convert_to_alphanumeric(s: str) -> str:
ret = []
# drop all control characters like shift, separator, etc
for c in s:
if c not in _ALLOWED_CHARS:
ret.append("_")
else:
ret.append(c)
return "".join(ret)
def encode_url(url):
return urllib.parse.quote(url, safe="")

View File

@ -24,6 +24,7 @@ from app.email_utils import (
render,
email_can_be_used_as_mailbox,
send_email_with_rate_control,
normalize_reply_email,
)
from app.extensions import db
from app.log import LOG
@ -392,8 +393,12 @@ def sanity_check():
LOG.exception("Mailbox %s address not sanitized", mailbox)
for contact in Contact.query.all():
if not contact.reply_email.isascii():
LOG.exception("Contact %s reply email is not ascii", contact)
if normalize_reply_email(contact.reply_email) != contact.reply_email:
LOG.exception(
"Contact %s reply email is not normalized %s",
contact,
contact.reply_email,
)
for domain in CustomDomain.query.all():
if domain.name and "\n" in domain.name:

View File

@ -103,6 +103,7 @@ from app.email_utils import (
get_header_unicode,
generate_reply_email,
is_reply_email,
normalize_reply_email,
)
from app.extensions import db
from app.greylisting import greylisting_needed
@ -777,9 +778,8 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
LOG.warning(f"Reply email {reply_email} has wrong domain")
return False, "550 SL E2"
# handle case where reply email is generated with non-ascii char
if not reply_email.isascii():
reply_email = convert_to_id(reply_email)
# handle case where reply email is generated with non-allowed char
reply_email = normalize_reply_email(reply_email)
contact = Contact.get_by(reply_email=reply_email)
if not contact:

View File

@ -17,6 +17,7 @@ from app.email_utils import (
add_header,
to_bytes,
generate_reply_email,
normalize_reply_email,
)
from app.extensions import db
from app.models import User, CustomDomain
@ -408,3 +409,8 @@ def test_generate_reply_email(flask_client):
# make sure reply_email only contain lowercase
reply_email = generate_reply_email("TEST@example.org")
assert reply_email.startswith("ra+test.at.example.org")
def test_normalize_reply_email(flask_client):
assert normalize_reply_email("re+abcd@sl.local") == "re+abcd@sl.local"
assert normalize_reply_email('re+"ab cd"@sl.local') == "re+_ab_cd_@sl.local"