use flanker instead of parseaddr_unicode

This commit is contained in:
Son Nguyen Kim 2021-09-10 17:06:38 +02:00
parent 500ff00c7c
commit 3ad4b6b76f
4 changed files with 37 additions and 28 deletions

View File

@ -1,3 +1,5 @@
from flanker.addresslib import address
from flanker.addresslib.address import EmailAddress
from flask import g from flask import g
from flask import jsonify from flask import jsonify
from flask import request from flask import request
@ -16,7 +18,6 @@ from app.api.serializer import (
) )
from app.dashboard.views.alias_log import get_alias_log from app.dashboard.views.alias_log import get_alias_log
from app.email_utils import ( from app.email_utils import (
parseaddr_unicode,
is_valid_email, is_valid_email,
generate_reply_email, generate_reply_email,
) )
@ -400,7 +401,8 @@ def create_contact_route(alias_id):
if not contact_addr: if not contact_addr:
return jsonify(error="Contact cannot be empty"), 400 return jsonify(error="Contact cannot be empty"), 400
contact_name, contact_email = parseaddr_unicode(contact_addr) full_address: EmailAddress = address.parse(contact_addr)
contact_name, contact_email = full_address.display_name, full_address.address
if not is_valid_email(contact_email): if not is_valid_email(contact_email):
return jsonify(error=f"invalid contact email {contact_email}"), 400 return jsonify(error=f"invalid contact email {contact_email}"), 400

View File

@ -1,6 +1,8 @@
from dataclasses import dataclass from dataclasses import dataclass
from operator import or_ from operator import or_
from flanker.addresslib import address
from flanker.addresslib.address import EmailAddress
from flask import render_template, request, redirect, flash from flask import render_template, request, redirect, flash
from flask import url_for from flask import url_for
from flask_login import login_required, current_user from flask_login import login_required, current_user
@ -11,7 +13,6 @@ from wtforms import StringField, validators, ValidationError
from app.config import PAGE_LIMIT from app.config import PAGE_LIMIT
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.email_utils import ( from app.email_utils import (
parseaddr_unicode,
is_valid_email, is_valid_email,
generate_reply_email, generate_reply_email,
) )
@ -182,7 +183,11 @@ def alias_contact_manager(alias_id):
contact_addr = new_contact_form.email.data.strip() contact_addr = new_contact_form.email.data.strip()
try: try:
contact_name, contact_email = parseaddr_unicode(contact_addr) full_address: EmailAddress = address.parse(contact_addr)
contact_name, contact_email = (
full_address.display_name,
full_address.address,
)
contact_email = sanitize_email(contact_email) contact_email = sanitize_email(contact_email)
except Exception: except Exception:
flash(f"{contact_addr} is invalid", "error") flash(f"{contact_addr} is invalid", "error")

View File

@ -7,6 +7,7 @@ from typing import List, Tuple, Optional
import arrow import arrow
import sqlalchemy as sa import sqlalchemy as sa
from arrow import Arrow from arrow import Arrow
from flanker.addresslib import address
from flask import url_for from flask import url_for
from flask_login import UserMixin from flask_login import UserMixin
from sqlalchemy import text, desc, CheckConstraint, Index, Column from sqlalchemy import text, desc, CheckConstraint, Index, Column
@ -1447,12 +1448,10 @@ class Contact(db.Model, ModelMixin):
# if no name, try to parse it from website_from # if no name, try to parse it from website_from
if not name and self.website_from: if not name and self.website_from:
try: try:
from app.email_utils import parseaddr_unicode name = address.parse(self.website_from).display_name
name, _ = parseaddr_unicode(self.website_from)
except Exception: except Exception:
# Skip if website_from is wrongly formatted # Skip if website_from is wrongly formatted
LOG.w( LOG.e(
"Cannot parse contact %s website_from %s", self, self.website_from "Cannot parse contact %s website_from %s", self, self.website_from
) )
name = "" name = ""

View File

@ -95,7 +95,6 @@ from app.email_utils import (
delete_all_headers_except, delete_all_headers_except,
get_spam_info, get_spam_info,
get_orig_message_from_spamassassin_report, get_orig_message_from_spamassassin_report,
parseaddr_unicode,
send_email_with_rate_control, send_email_with_rate_control,
get_email_domain_part, get_email_domain_part,
copy, copy,
@ -180,16 +179,17 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
""" """
contact_from_header is the RFC 2047 format FROM header contact_from_header is the RFC 2047 format FROM header
""" """
contact_name, contact_email = parseaddr_unicode(from_header) full_address: EmailAddress = address.parse(from_header)
contact_name, contact_email = full_address.display_name, full_address.address
if not is_valid_email(contact_email): if not is_valid_email(contact_email):
# From header is wrongly formatted, try with mail_from # From header is wrongly formatted, try with mail_from
if mail_from and mail_from != "<>": if mail_from and mail_from != "<>":
LOG.w( LOG.w(
"Cannot parse email from from_header %s, parse from mail_from %s", "Cannot parse email from from_header %s, use mail_from %s",
from_header, from_header,
mail_from, mail_from,
) )
_, contact_email = parseaddr_unicode(mail_from) contact_email = mail_from
if not is_valid_email(contact_email): if not is_valid_email(contact_email):
LOG.w( LOG.w(
@ -273,25 +273,23 @@ def get_or_create_reply_to_contact(
""" """
Get or create the contact for the Reply-To header Get or create the contact for the Reply-To header
""" """
name, address = parseaddr_unicode(reply_to_header) full_address: EmailAddress = address.parse(reply_to_header)
if not is_valid_email(address): if not is_valid_email(full_address.address):
LOG.w( LOG.w(
"invalid reply-to address %s. Parse from %s", "invalid reply-to address %s. Parse from %s",
address, full_address,
reply_to_header, reply_to_header,
) )
return None return None
address = sanitize_email(address) contact = Contact.get_by(alias_id=alias.id, website_email=full_address.address)
contact = Contact.get_by(alias_id=alias.id, website_email=address)
if contact: if contact:
return contact return contact
else: else:
LOG.d( LOG.d(
"create contact %s for alias %s via reply-to header", "create contact %s for alias %s via reply-to header",
address, full_address.address,
alias, alias,
reply_to_header, reply_to_header,
) )
@ -300,15 +298,17 @@ def get_or_create_reply_to_contact(
contact = Contact.create( contact = Contact.create(
user_id=alias.user_id, user_id=alias.user_id,
alias_id=alias.id, alias_id=alias.id,
website_email=address, website_email=full_address.address,
name=name, name=full_address.display_name,
reply_email=generate_reply_email(address, alias.user), reply_email=generate_reply_email(full_address.address, alias.user),
) )
db.session.commit() db.session.commit()
except IntegrityError: except IntegrityError:
LOG.w("Contact %s %s already exist", alias, address) LOG.w("Contact %s %s already exist", alias, full_address.address)
db.session.rollback() db.session.rollback()
contact = Contact.get_by(alias_id=alias.id, website_email=address) contact = Contact.get_by(
alias_id=alias.id, website_email=full_address.address
)
return contact return contact
@ -336,7 +336,9 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str):
try: try:
# NOT allow unicode for contact address # NOT allow unicode for contact address
validate_email(contact_email, check_deliverability=False, allow_smtputf8=False) validate_email(
contact_email, check_deliverability=False, allow_smtputf8=False
)
except EmailNotValidError: except EmailNotValidError:
LOG.w("invalid contact email %s. %s. Skip", contact_email, headers) LOG.w("invalid contact email %s. %s. Skip", contact_email, headers)
continue continue
@ -572,13 +574,13 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
# handle_email_sent_to_ourself(alias, mb, msg, user) # handle_email_sent_to_ourself(alias, mb, msg, user)
# return [(True, "250 Message accepted for delivery")] # return [(True, "250 Message accepted for delivery")]
from_header = str(msg["From"]) from_header = get_header_unicode(msg["From"])
LOG.d("Create or get contact for from_header:%s", from_header) LOG.d("Create or get contact for from_header:%s", from_header)
contact = get_or_create_contact(from_header, envelope.mail_from, alias) contact = get_or_create_contact(from_header, envelope.mail_from, alias)
reply_to_contact = None reply_to_contact = None
if msg["Reply-To"]: if msg["Reply-To"]:
reply_to = str(msg["Reply-To"]) reply_to = get_header_unicode(msg["Reply-To"])
LOG.d("Create or get contact for from_header:%s", reply_to) LOG.d("Create or get contact for from_header:%s", reply_to)
# ignore when reply-to = alias # ignore when reply-to = alias
if reply_to == alias.email: if reply_to == alias.email:
@ -1299,7 +1301,8 @@ def handle_hotmail_complaint(msg: Message) -> bool:
LOG.e("cannot find the alias") LOG.e("cannot find the alias")
return False return False
_, alias_address = parseaddr_unicode(to_header) full_address: EmailAddress = address.parse(get_header_unicode(to_header))
alias_address = full_address.address
alias = Alias.get_by(email=alias_address) alias = Alias.get_by(email=alias_address)
if not alias: if not alias: