From 01858ac452eacfa3d45f8bbfef2dba8cf8429853 Mon Sep 17 00:00:00 2001 From: Son NK <> Date: Mon, 11 Jan 2021 12:27:02 +0100 Subject: [PATCH] sanitize contact email --- app/api/views/alias.py | 9 ++++++++- app/email_utils.py | 2 +- app/utils.py | 6 ++++++ email_handler.py | 14 ++++++++------ 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/app/api/views/alias.py b/app/api/views/alias.py index b78ea7eb..6a4610b8 100644 --- a/app/api/views/alias.py +++ b/app/api/views/alias.py @@ -15,7 +15,12 @@ from app.api.serializer import ( get_alias_infos_with_pagination_v3, ) from app.dashboard.views.alias_log import get_alias_log -from app.email_utils import parseaddr_unicode, is_valid_email, generate_reply_email +from app.email_utils import ( + parseaddr_unicode, + is_valid_email, + generate_reply_email, +) +from app.utils import sanitize_email from app.extensions import db from app.log import LOG from app.models import Alias, Contact, Mailbox, AliasMailbox @@ -396,6 +401,8 @@ def create_contact_route(alias_id): return jsonify(error="Contact cannot be empty"), 400 contact_name, contact_email = parseaddr_unicode(contact_addr) + contact_email = sanitize_email(contact_email) + if not is_valid_email(contact_email): return jsonify(error=f"invalid contact email {contact_email}"), 400 diff --git a/app/email_utils.py b/app/email_utils.py index 9af875f1..fdc67164 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -868,7 +868,7 @@ def generate_reply_email(contact_email: str, user: User) -> str: # make sure contact_email can be ascii-encoded contact_email = convert_to_id(contact_email) - contact_email = contact_email.lower().strip().replace(" ", "") + contact_email = sanitize_email(contact_email) contact_email = contact_email[:45] contact_email = contact_email.replace("@", ".at.") contact_email = convert_to_alphanumeric(contact_email) diff --git a/app/utils.py b/app/utils.py index 0d034a3f..61fd2ee7 100644 --- a/app/utils.py +++ b/app/utils.py @@ -59,3 +59,9 @@ def convert_to_alphanumeric(s: str) -> str: def encode_url(url): return urllib.parse.quote(url, safe="") + + +def sanitize_email(email_address: str) -> str: + if email_address: + return email_address.lower().strip().replace(" ", "") + return email_address diff --git a/email_handler.py b/email_handler.py index cf157159..b084d405 100644 --- a/email_handler.py +++ b/email_handler.py @@ -121,7 +121,7 @@ from app.models import ( ) from app.pgp_utils import PGPException, sign_data_with_pgpy, sign_data from app.spamassassin_utils import SpamAssassin -from app.utils import random_string +from app.utils import random_string, sanitize_email from init_app import load_pgp_public_keys from server import create_app, create_light_app @@ -182,6 +182,8 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con # either reuse a contact with empty email or create a new contact with empty email contact_email = "" + contact_email = sanitize_email(contact_email) + contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) if contact: if contact.name != contact_name: @@ -255,7 +257,9 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str): for contact_name, contact_email in getaddresses(headers): # convert back to original then parse again to make sure contact_name is unicode addr = formataddr((contact_name, contact_email)) - contact_name, contact = parseaddr_unicode(addr) + contact_name, _ = parseaddr_unicode(addr) + + contact_email = sanitize_email(contact_email) # no transformation when alias is already in the header if contact_email == alias.email: @@ -1561,10 +1565,8 @@ def handle(envelope: Envelope) -> str: """Return SMTP status""" # sanitize mail_from, rcpt_tos - mail_from = envelope.mail_from.lower().strip().replace(" ", "") - rcpt_tos = [ - rcpt_to.lower().strip().replace(" ", "") for rcpt_to in envelope.rcpt_tos - ] + mail_from = sanitize_email(envelope.mail_from) + rcpt_tos = [sanitize_email(rcpt_to) for rcpt_to in envelope.rcpt_tos] envelope.mail_from = mail_from envelope.rcpt_tos = rcpt_tos