sanitize contact email

This commit is contained in:
Son NK 2021-01-11 12:27:02 +01:00
parent 2293c6d2e3
commit 01858ac452
4 changed files with 23 additions and 8 deletions

View File

@ -15,7 +15,12 @@ from app.api.serializer import (
get_alias_infos_with_pagination_v3, get_alias_infos_with_pagination_v3,
) )
from app.dashboard.views.alias_log import get_alias_log from app.dashboard.views.alias_log import get_alias_log
from app.email_utils import parseaddr_unicode, is_valid_email, generate_reply_email from app.email_utils import (
parseaddr_unicode,
is_valid_email,
generate_reply_email,
)
from app.utils import sanitize_email
from app.extensions import db from app.extensions import db
from app.log import LOG from app.log import LOG
from app.models import Alias, Contact, Mailbox, AliasMailbox from app.models import Alias, Contact, Mailbox, AliasMailbox
@ -396,6 +401,8 @@ def create_contact_route(alias_id):
return jsonify(error="Contact cannot be empty"), 400 return jsonify(error="Contact cannot be empty"), 400
contact_name, contact_email = parseaddr_unicode(contact_addr) contact_name, contact_email = parseaddr_unicode(contact_addr)
contact_email = sanitize_email(contact_email)
if not is_valid_email(contact_email): if not is_valid_email(contact_email):
return jsonify(error=f"invalid contact email {contact_email}"), 400 return jsonify(error=f"invalid contact email {contact_email}"), 400

View File

@ -868,7 +868,7 @@ def generate_reply_email(contact_email: str, user: User) -> str:
# make sure contact_email can be ascii-encoded # make sure contact_email can be ascii-encoded
contact_email = convert_to_id(contact_email) contact_email = convert_to_id(contact_email)
contact_email = contact_email.lower().strip().replace(" ", "") contact_email = sanitize_email(contact_email)
contact_email = contact_email[:45] contact_email = contact_email[:45]
contact_email = contact_email.replace("@", ".at.") contact_email = contact_email.replace("@", ".at.")
contact_email = convert_to_alphanumeric(contact_email) contact_email = convert_to_alphanumeric(contact_email)

View File

@ -59,3 +59,9 @@ def convert_to_alphanumeric(s: str) -> str:
def encode_url(url): def encode_url(url):
return urllib.parse.quote(url, safe="") return urllib.parse.quote(url, safe="")
def sanitize_email(email_address: str) -> str:
if email_address:
return email_address.lower().strip().replace(" ", "")
return email_address

View File

@ -121,7 +121,7 @@ from app.models import (
) )
from app.pgp_utils import PGPException, sign_data_with_pgpy, sign_data from app.pgp_utils import PGPException, sign_data_with_pgpy, sign_data
from app.spamassassin_utils import SpamAssassin from app.spamassassin_utils import SpamAssassin
from app.utils import random_string from app.utils import random_string, sanitize_email
from init_app import load_pgp_public_keys from init_app import load_pgp_public_keys
from server import create_app, create_light_app from server import create_app, create_light_app
@ -182,6 +182,8 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
# either reuse a contact with empty email or create a new contact with empty email # either reuse a contact with empty email or create a new contact with empty email
contact_email = "" contact_email = ""
contact_email = sanitize_email(contact_email)
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
if contact: if contact:
if contact.name != contact_name: if contact.name != contact_name:
@ -255,7 +257,9 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str):
for contact_name, contact_email in getaddresses(headers): for contact_name, contact_email in getaddresses(headers):
# convert back to original then parse again to make sure contact_name is unicode # convert back to original then parse again to make sure contact_name is unicode
addr = formataddr((contact_name, contact_email)) addr = formataddr((contact_name, contact_email))
contact_name, contact = parseaddr_unicode(addr) contact_name, _ = parseaddr_unicode(addr)
contact_email = sanitize_email(contact_email)
# no transformation when alias is already in the header # no transformation when alias is already in the header
if contact_email == alias.email: if contact_email == alias.email:
@ -1561,10 +1565,8 @@ def handle(envelope: Envelope) -> str:
"""Return SMTP status""" """Return SMTP status"""
# sanitize mail_from, rcpt_tos # sanitize mail_from, rcpt_tos
mail_from = envelope.mail_from.lower().strip().replace(" ", "") mail_from = sanitize_email(envelope.mail_from)
rcpt_tos = [ rcpt_tos = [sanitize_email(rcpt_to) for rcpt_to in envelope.rcpt_tos]
rcpt_to.lower().strip().replace(" ", "") for rcpt_to in envelope.rcpt_tos
]
envelope.mail_from = mail_from envelope.mail_from = mail_from
envelope.rcpt_tos = rcpt_tos envelope.rcpt_tos = rcpt_tos