diff --git a/app/email/spam.py b/app/email/spam.py index c8a481be..656fcffe 100644 --- a/app/email/spam.py +++ b/app/email/spam.py @@ -5,14 +5,14 @@ from email.message import Message import aiospamc from app.config import SPAMASSASSIN_HOST -from app.email_utils import to_bytes from app.log import LOG +from app.message_utils import message_to_bytes from app.models import EmailLog from app.spamassassin_utils import SpamAssassin async def get_spam_score_async(message: Message) -> float: - sa_input = to_bytes(message) + sa_input = message_to_bytes(message) # Spamassassin requires to have an ending linebreak if not sa_input.endswith(b"\n"): @@ -41,7 +41,7 @@ def get_spam_score( Return the spam score and spam report """ LOG.d("get spam score for %s", email_log) - sa_input = to_bytes(message) + sa_input = message_to_bytes(message) # Spamassassin requires to have an ending linebreak if not sa_input.endswith(b"\n"): diff --git a/app/email_utils.py b/app/email_utils.py index 8978b67a..f1486f69 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -14,12 +14,11 @@ from email.message import Message, EmailMessage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import make_msgid, formatdate -from smtplib import SMTP, SMTPServerDisconnected, SMTPException, SMTPRecipientsRefused +from smtplib import SMTP, SMTPException from typing import Tuple, List, Optional, Union import arrow import dkim -import newrelic.agent import re2 as re import spf from aiosmtpd.smtp import Envelope @@ -63,6 +62,8 @@ from app.db import Session from app.dns_utils import get_mx_domains from app.email import headers from app.log import LOG +from app.mail_sender import sl_sendmail +from app.message_utils import message_to_bytes from app.models import ( Mailbox, User, @@ -477,7 +478,7 @@ def add_dkim_signature_with_header( # Generate message signature if DKIM_PRIVATE_KEY: sig = dkim.sign( - to_bytes(msg), + message_to_bytes(msg), DKIM_SELECTOR, email_domain.encode(), DKIM_PRIVATE_KEY.encode(), @@ -836,7 +837,7 @@ def copy(msg: Message) -> Message: return message_from_string(msg.as_string()) except (UnicodeEncodeError, LookupError): LOG.w("as_string() fails, try bytes parsing") - return message_from_bytes(to_bytes(msg)) + return message_from_bytes(message_to_bytes(msg)) def to_bytes(msg: Message): @@ -1307,82 +1308,6 @@ def get_smtp_server(): return smtp -def sl_sendmail( - from_addr, - to_addr, - msg: Message, - mail_options=(), - rcpt_options=(), - is_forward: bool = False, - retries=2, - ignore_smtp_error=False, -): - """replace smtp.sendmail""" - if NOT_SEND_EMAIL: - LOG.d( - "send email with subject '%s', from '%s' to '%s'", - msg[headers.SUBJECT], - msg[headers.FROM], - msg[headers.TO], - ) - return - - try: - start = time.time() - if POSTFIX_SUBMISSION_TLS: - smtp_port = 587 - else: - smtp_port = POSTFIX_PORT - - with SMTP(POSTFIX_SERVER, smtp_port) as smtp: - if POSTFIX_SUBMISSION_TLS: - smtp.starttls() - - elapsed = time.time() - start - LOG.d("getting a smtp connection takes seconds %s", elapsed) - newrelic.agent.record_custom_metric("Custom/smtp_connection_time", elapsed) - - # smtp.send_message has UnicodeEncodeError - # encode message raw directly instead - LOG.d( - "Sendmail mail_from:%s, rcpt_to:%s, header_from:%s, header_to:%s, header_cc:%s", - from_addr, - to_addr, - msg[headers.FROM], - msg[headers.TO], - msg[headers.CC], - ) - smtp.sendmail( - from_addr, - to_addr, - to_bytes(msg), - mail_options, - rcpt_options, - ) - except (SMTPServerDisconnected, SMTPRecipientsRefused) as e: - if retries > 0: - LOG.w( - "SMTPServerDisconnected or SMTPRecipientsRefused error %s, retry", - e, - exc_info=True, - ) - time.sleep(0.3 * retries) - sl_sendmail( - from_addr, - to_addr, - msg, - mail_options, - rcpt_options, - is_forward, - retries=retries - 1, - ) - else: - if ignore_smtp_error: - LOG.w("Ignore smtp error %s", e) - else: - raise - - def get_queue_id(msg: Message) -> Optional[str]: """Get the Postfix queue-id from a message""" header_values = msg.get_all(headers.RSPAMD_QUEUE_ID) diff --git a/app/handler/dmarc.py b/app/handler/dmarc.py index 62eed243..b5fb52fa 100644 --- a/app/handler/dmarc.py +++ b/app/handler/dmarc.py @@ -17,11 +17,11 @@ from app.email_utils import ( send_email_with_rate_control, render, add_or_replace_header, - to_bytes, add_header, ) from app.handler.spamd_result import SpamdResult, Phase, DmarcCheckResult from app.log import LOG +from app.message_utils import message_to_bytes from app.models import Alias, Contact, Notification, EmailLog, RefusedEmail @@ -102,7 +102,7 @@ def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> Emai random_name = str(uuid.uuid4()) s3_report_path = f"refused-emails/full-{random_name}.eml" s3.upload_email_from_bytesio( - s3_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}" + s3_report_path, BytesIO(message_to_bytes(msg)), f"full-{random_name}" ) refused_email = RefusedEmail.create( full_report_path=s3_report_path, user_id=alias.user_id, flush=True diff --git a/app/mail_sender.py b/app/mail_sender.py new file mode 100644 index 00000000..aaafbfa8 --- /dev/null +++ b/app/mail_sender.py @@ -0,0 +1,131 @@ +import time +from concurrent.futures import ThreadPoolExecutor +from mailbox import Message +from smtplib import SMTP, SMTPServerDisconnected, SMTPRecipientsRefused +from typing import Optional, Dict + +import newrelic +from attr import dataclass + +from app.config import ( + NOT_SEND_EMAIL, + POSTFIX_SUBMISSION_TLS, + POSTFIX_PORT, + POSTFIX_SERVER, +) +from app.email import headers +from app.log import LOG +from app.message_utils import message_to_bytes + + +@dataclass +class SendRequest: + from_address: str + to_address: str + msg: Message + mail_options: Dict = {} + rcpt_options: Dict = {} + is_forward: bool = False + ignore_smtp_errors: bool = False + + +class MailSender: + def __init__(self): + self._pool: Optional[ThreadPoolExecutor] = None + + def enable_background_pool(self, max_workers=10): + self._pool = ThreadPoolExecutor(max_workers=max_workers) + + def send(self, send_request: SendRequest, retries: int = 2): + """replace smtp.sendmail""" + if NOT_SEND_EMAIL: + LOG.d( + "send email with subject '%s', from '%s' to '%s'", + send_request.msg[headers.SUBJECT], + send_request.msg[headers.FROM], + send_request.msg[headers.TO], + ) + return + if not self._pool: + self._send_to_smtp(send_request, retries) + else: + self._pool.submit(self._send_to_smtp, (send_request, retries)) + + def _send_to_smtp(self, send_request: SendRequest, retries: int): + try: + start = time.time() + if POSTFIX_SUBMISSION_TLS: + smtp_port = 587 + else: + smtp_port = POSTFIX_PORT + + with SMTP(POSTFIX_SERVER, smtp_port) as smtp: + if POSTFIX_SUBMISSION_TLS: + smtp.starttls() + + elapsed = time.time() - start + LOG.d("getting a smtp connection takes seconds %s", elapsed) + newrelic.agent.record_custom_metric( + "Custom/smtp_connection_time", elapsed + ) + + # smtp.send_message has UnicodeEncodeError + # encode message raw directly instead + LOG.d( + "Sendmail mail_from:%s, rcpt_to:%s, header_from:%s, header_to:%s, header_cc:%s", + send_request.from_address, + send_request.to_address, + send_request.msg[headers.FROM], + send_request.msg[headers.TO], + send_request.msg[headers.CC], + ) + smtp.sendmail( + send_request.from_address, + send_request.to_address, + message_to_bytes(send_request.msg), + send_request.mail_options, + send_request.rcpt_options, + ) + + newrelic.agent.record_custom_metric( + "Custom/smtp_sending_time", time.time() - start + ) + except (SMTPServerDisconnected, SMTPRecipientsRefused) as e: + if retries > 0: + LOG.w( + "SMTPServerDisconnected or SMTPRecipientsRefused error %s, retry", + e, + exc_info=True, + ) + time.sleep(0.3 * send_request.retries) + self._send_to_smtp(send_request, retries - 1) + else: + if send_request.ignore_smtp_error: + LOG.w("Ignore smtp error %s", e) + else: + raise + + +mail_sender = MailSender() + + +def sl_sendmail( + from_address: str, + to_address: str, + msg: Message, + mail_options=(), + rcpt_options=(), + is_forward: bool = False, + retries=2, + ignore_smtp_error=False, +): + send_request = SendRequest( + from_address, + to_address, + msg, + mail_options, + rcpt_options, + is_forward, + ignore_smtp_error, + ) + mail_sender.send(send_request, retries) diff --git a/app/message_utils.py b/app/message_utils.py new file mode 100644 index 00000000..a5a199bb --- /dev/null +++ b/app/message_utils.py @@ -0,0 +1,21 @@ +from email import policy +from email.message import Message + +from app.log import LOG + + +def message_to_bytes(msg: Message) -> bytes: + """replace Message.as_bytes() method by trying different policies""" + for generator_policy in [None, policy.SMTP, policy.SMTPUTF8]: + try: + return msg.as_bytes(policy=generator_policy) + except: + LOG.w("as_bytes() fails with %s policy", policy, exc_info=True) + + msg_string = msg.as_string() + try: + return msg_string.encode() + except: + LOG.w("as_string().encode() fails", exc_info=True) + + return msg_string.encode(errors="replace") diff --git a/email_handler.py b/email_handler.py index 25be0ba1..43ef991f 100644 --- a/email_handler.py +++ b/email_handler.py @@ -104,7 +104,6 @@ from app.email_utils import ( send_email_with_rate_control, get_email_domain_part, copy, - to_bytes, send_email_at_most_times, is_valid_alias_address_domain, should_add_dkim_signature, @@ -118,7 +117,6 @@ from app.email_utils import ( should_disable, parse_id_from_bounce, spf_pass, - sl_sendmail, sanitize_header, get_queue_id, should_ignore_bounce, @@ -147,6 +145,8 @@ from app.handler.spamd_result import ( SPFCheckResult, ) from app.log import LOG, set_message_id +from app.mail_sender import sl_sendmail +from app.message_utils import message_to_bytes from app.models import ( Alias, Contact, @@ -501,7 +501,7 @@ def prepare_pgp_message( # encrypt # use pgpy as fallback - msg_bytes = to_bytes(clone_msg) + msg_bytes = message_to_bytes(clone_msg) try: encrypted_data = pgp_utils.encrypt_file(BytesIO(msg_bytes), pgp_fingerprint) second.set_payload(encrypted_data) @@ -527,11 +527,11 @@ def sign_msg(msg: Message) -> Message: signature.add_header("Content-Disposition", 'attachment; filename="signature.asc"') try: - signature.set_payload(sign_data(to_bytes(msg).replace(b"\n", b"\r\n"))) + signature.set_payload(sign_data(message_to_bytes(msg).replace(b"\n", b"\r\n"))) except Exception: LOG.e("Cannot sign, try using pgpy") signature.set_payload( - sign_data_with_pgpy(to_bytes(msg).replace(b"\n", b"\r\n")) + sign_data_with_pgpy(message_to_bytes(msg).replace(b"\n", b"\r\n")) ) container.attach(signature) @@ -543,7 +543,9 @@ def handle_email_sent_to_ourself(alias, from_addr: str, msg: Message, user): # store the refused email random_name = str(uuid.uuid4()) full_report_path = f"refused-emails/cycle-{random_name}.eml" - s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name) + s3.upload_email_from_bytesio( + full_report_path, BytesIO(message_to_bytes(msg)), random_name + ) refused_email = RefusedEmail.create( path=None, full_report_path=full_report_path, user_id=alias.user_id ) @@ -1394,7 +1396,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog): full_report_path = f"refused-emails/full-{random_name}.eml" s3.upload_email_from_bytesio( - full_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}" + full_report_path, BytesIO(message_to_bytes(msg)), f"full-{random_name}" ) file_path = None @@ -1413,7 +1415,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog): else: file_path = f"refused-emails/{random_name}.eml" s3.upload_email_from_bytesio( - file_path, BytesIO(to_bytes(orig_msg)), random_name + file_path, BytesIO(message_to_bytes(orig_msg)), random_name ) refused_email = RefusedEmail.create( @@ -1733,14 +1735,16 @@ def handle_bounce_reply_phase(envelope, msg: Message, email_log: EmailLog): random_name = str(uuid.uuid4()) full_report_path = f"refused-emails/full-{random_name}.eml" - s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name) + s3.upload_email_from_bytesio( + full_report_path, BytesIO(message_to_bytes(msg)), random_name + ) orig_msg = get_orig_message_from_bounce(msg) file_path = None if orig_msg: file_path = f"refused-emails/{random_name}.eml" s3.upload_email_from_bytesio( - file_path, BytesIO(to_bytes(orig_msg)), random_name + file_path, BytesIO(message_to_bytes(orig_msg)), random_name ) refused_email = RefusedEmail.create( @@ -1809,13 +1813,15 @@ def handle_spam( random_name = str(uuid.uuid4()) full_report_path = f"spams/full-{random_name}.eml" - s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name) + s3.upload_email_from_bytesio( + full_report_path, BytesIO(message_to_bytes(msg)), random_name + ) file_path = None if orig_msg: file_path = f"spams/{random_name}.eml" s3.upload_email_from_bytesio( - file_path, BytesIO(to_bytes(orig_msg)), random_name + file_path, BytesIO(message_to_bytes(orig_msg)), random_name ) refused_email = RefusedEmail.create( diff --git a/tests/test_email_utils.py b/tests/test_email_utils.py index 40f1f66d..513ddb2c 100644 --- a/tests/test_email_utils.py +++ b/tests/test_email_utils.py @@ -19,7 +19,6 @@ from app.email_utils import ( get_header_from_bounce, is_valid_email, add_header, - to_bytes, generate_reply_email, normalize_reply_email, get_encoding, @@ -161,23 +160,6 @@ def test_send_email_with_rate_control(flask_client): ) -def test_copy(): - email_str = """ - From: abcd@gmail.com - To: hey@example.org - Subject: subject - - Body - """ - msg = email.message_from_string(email_str) - msg2 = copy(msg) - assert to_bytes(msg) == to_bytes(msg2) - - msg = email.message_from_string("👌") - msg2 = copy(msg) - assert to_bytes(msg) == to_bytes(msg2) - - def test_get_spam_from_header(): is_spam, _ = get_spam_from_header( """No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID, @@ -476,19 +458,6 @@ Content-Type: text/html; charset=us-ascii assert "old" not in new_msg.as_string() -def test_to_bytes(): - msg = email.message_from_string("☕️ emoji") - assert to_bytes(msg) - # \n is appended when message is converted to bytes - assert to_bytes(msg).decode() == "\n☕️ emoji" - - msg = email.message_from_string("ascii") - assert to_bytes(msg) == b"\nascii" - - msg = email.message_from_string("éèà€") - assert to_bytes(msg).decode() == "\néèà€" - - def test_generate_reply_email(flask_client): user = create_new_user() reply_email = generate_reply_email("test@example.org", user) diff --git a/tests/test_message_utils.py b/tests/test_message_utils.py new file mode 100644 index 00000000..9c368923 --- /dev/null +++ b/tests/test_message_utils.py @@ -0,0 +1,35 @@ +import email +from app.email_utils import ( + copy, +) +from app.message_utils import message_to_bytes + + +def test_copy(): + email_str = """ + From: abcd@gmail.com + To: hey@example.org + Subject: subject + + Body + """ + msg = email.message_from_string(email_str) + msg2 = copy(msg) + assert message_to_bytes(msg) == message_to_bytes(msg2) + + msg = email.message_from_string("👌") + msg2 = copy(msg) + assert message_to_bytes(msg) == message_to_bytes(msg2) + + +def test_to_bytes(): + msg = email.message_from_string("☕️ emoji") + assert message_to_bytes(msg) + # \n is appended when message is converted to bytes + assert message_to_bytes(msg).decode() == "\n☕️ emoji" + + msg = email.message_from_string("ascii") + assert message_to_bytes(msg) == b"\nascii" + + msg = email.message_from_string("éèà€") + assert message_to_bytes(msg).decode() == "\néèà€"