From 7ba9bcb9e281281c331d0bf545dd6dd328a6c56b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Mon, 30 May 2022 11:52:10 +0200 Subject: [PATCH] Save unsent emails to disk to be resent later (#1022) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Initial save to disk * Store unsent messages to disk so they can be retried later * Set back not sending emails * Fixed decorator * Add general exceptions to the catchall * Have dummy server just to make sure * Added several server test cases * ADded tests for bounced and error status * Moved dir creation to config parse time * Set LOG.e Co-authored-by: Adrià Casajús --- app/config.py | 8 ++ app/mail_sender.py | 91 ++++++++++++++--- tests/handler/test_provider_complaints.py | 4 +- tests/test_mail_sender.py | 116 ++++++++++++++++++++++ 4 files changed, 203 insertions(+), 16 deletions(-) create mode 100644 tests/test_mail_sender.py diff --git a/app/config.py b/app/config.py index b51ba660..770e6f77 100644 --- a/app/config.py +++ b/app/config.py @@ -413,6 +413,14 @@ POSTMASTER = os.environ.get("POSTMASTER") # store temporary files, especially for debugging TEMP_DIR = os.environ.get("TEMP_DIR") +# Store unsent emails +SAVE_UNSENT_DIR = os.environ.get("SAVE_UNSENT_DIR") +if SAVE_UNSENT_DIR and not os.path.isdir(SAVE_UNSENT_DIR): + try: + os.makedirs(SAVE_UNSENT_DIR) + except FileExistsError: + pass + # enable the alias automation disable: an alias can be automatically disabled if it has too many bounces ALIAS_AUTOMATIC_DISABLE = "ALIAS_AUTOMATIC_DISABLE" in os.environ diff --git a/app/mail_sender.py b/app/mail_sender.py index b8e38221..dcd719bc 100644 --- a/app/mail_sender.py +++ b/app/mail_sender.py @@ -1,8 +1,15 @@ +from __future__ import annotations +import base64 +import email +import json +import os import time +import uuid from concurrent.futures import ThreadPoolExecutor +from functools import wraps from mailbox import Message -from smtplib import SMTP, SMTPServerDisconnected, SMTPRecipientsRefused -from typing import Optional, Dict, List +from smtplib import SMTP, SMTPException +from typing import Optional, Dict, List, Callable import newrelic.agent from attr import dataclass @@ -23,6 +30,40 @@ class SendRequest: is_forward: bool = False ignore_smtp_errors: bool = False + def to_bytes(self) -> bytes: + if not config.SAVE_UNSENT_DIR: + LOG.d("Skipping saving unsent message because SAVE_UNSENT_DIR is not set") + return + serialized_message = message_to_bytes(self.msg) + data = { + "envelope_from": self.envelope_from, + "envelope_to": self.envelope_to, + "msg": base64.b64encode(serialized_message).decode("utf-8"), + "mail_options": self.mail_options, + "rcpt_options": self.rcpt_options, + "is_forward": self.is_forward, + } + return json.dumps(data).encode("utf-8") + + @staticmethod + def load_from_file(file_path: str) -> SendRequest: + with open(file_path, "rb") as fd: + return SendRequest.load_from_bytes(fd.read()) + + @staticmethod + def load_from_bytes(data: bytes) -> SendRequest: + decoded_data = json.loads(data) + msg_data = base64.b64decode(decoded_data["msg"]) + msg = email.message_from_bytes(msg_data) + return SendRequest( + envelope_from=decoded_data["envelope_from"], + envelope_to=decoded_data["envelope_to"], + msg=msg, + mail_options=decoded_data["mail_options"], + rcpt_options=decoded_data["rcpt_options"], + is_forward=decoded_data["is_forward"], + ) + class MailSender: def __init__(self): @@ -30,8 +71,8 @@ class MailSender: self._store_emails = False self._emails_sent: List[SendRequest] = [] - def store_emails_instead_of_sending(self): - self._store_emails = True + def store_emails_instead_of_sending(self, store_emails: bool = True): + self._store_emails = store_emails def purge_stored_emails(self): self._emails_sent = [] @@ -39,6 +80,18 @@ class MailSender: def get_stored_emails(self) -> List[SendRequest]: return self._emails_sent + def store_emails_test_decorator(self, fn: Callable) -> Callable: + @wraps(fn) + def wrapper(*args, **kwargs): + self.purge_stored_emails() + self.store_emails_instead_of_sending() + result = fn(*args, **kwargs) + self.purge_stored_emails() + self.store_emails_instead_of_sending(False) + return result + + return wrapper + def enable_background_pool(self, max_workers=10): self._pool = ThreadPoolExecutor(max_workers=max_workers) @@ -98,20 +151,30 @@ class MailSender: newrelic.agent.record_custom_metric( "Custom/smtp_sending_time", time.time() - start ) - except (SMTPServerDisconnected, SMTPRecipientsRefused) as e: + except ( + SMTPException, + ConnectionRefusedError, + TimeoutError, + ) as e: if retries > 0: - LOG.w( - "SMTPServerDisconnected or SMTPRecipientsRefused error %s, retry", - e, - exc_info=True, - ) time.sleep(0.3 * send_request.retries) self._send_to_smtp(send_request, retries - 1) else: - if send_request.ignore_smtp_error: - LOG.w("Ignore smtp error %s", e) - else: - raise + if send_request.ignore_smtp_errors: + LOG.e(f"Ignore smtp error {e}") + return + LOG.e( + f"Could not send message to smtp server {config.POSTFIX_SERVER}:{smtp_port}" + ) + self._save_request_to_unsent_dir(send_request) + + def _save_request_to_unsent_dir(self, send_request: SendRequest): + file_name = f"DeliveryFail-{int(time.time())}-{uuid.uuid4()}.eml" + file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name) + file_contents = send_request.to_bytes() + with open(file_path, "wb") as fd: + fd.write(file_contents) + LOG.i(f"Saved unsent message {file_path}") mail_sender = MailSender() diff --git a/tests/handler/test_provider_complaints.py b/tests/handler/test_provider_complaints.py index 7df5879e..7f95249f 100644 --- a/tests/handler/test_provider_complaints.py +++ b/tests/handler/test_provider_complaints.py @@ -60,10 +60,9 @@ def prepare_complaint( ) +@mail_sender.store_emails_test_decorator @pytest.mark.parametrize("handle_ftor,provider", origins) def test_provider_to_user(flask_client, handle_ftor, provider): - mail_sender.store_emails_instead_of_sending() - mail_sender.purge_stored_emails() user = create_new_user() alias = Alias.create_new_random(user) Session.commit() @@ -92,6 +91,7 @@ def test_provider_forward_phase(flask_client, handle_ftor, provider): assert alerts[0].alert_type == f"{ALERT_COMPLAINT_REPLY_PHASE}_{provider}" +@mail_sender.store_emails_test_decorator @pytest.mark.parametrize("handle_ftor,provider", origins) def test_provider_reply_phase(flask_client, handle_ftor, provider): mail_sender.store_emails_instead_of_sending() diff --git a/tests/test_mail_sender.py b/tests/test_mail_sender.py new file mode 100644 index 00000000..0cee6091 --- /dev/null +++ b/tests/test_mail_sender.py @@ -0,0 +1,116 @@ +import os +import tempfile +import threading +import socket +from email.message import Message +from random import random +from typing import Callable + +import pytest +from aiosmtpd.controller import Controller + +from app.email import headers +from app.mail_sender import mail_sender, SendRequest +from app import config + + +def create_dummy_send_request() -> SendRequest: + to_addr = f"to-{int(random())}@destination.com" + from_addr = f"from-{int(random())}@source.com" + msg = Message() + msg[headers.TO] = to_addr + msg[headers.FROM] = from_addr + msg[headers.SUBJECT] = f"Random subject {random()}" + msg.set_payload(f"Test content {random()}") + + return SendRequest( + f"from-{int(random())}@envelope.com", + to_addr, + msg, + ) + + +@mail_sender.store_emails_test_decorator +def test_mail_sender_save_to_mem(): + send_request = create_dummy_send_request() + mail_sender.send(send_request, 0) + stored_emails = mail_sender.get_stored_emails() + assert len(stored_emails) == 1 + assert stored_emails[0] == send_request + + +def close_on_connect_dummy_server() -> int: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(("localhost", 0)) + sock.listen() + port = sock.getsockname()[1] + + def close_on_accept(): + connection, _ = sock.accept() + connection.close() + sock.close() + + threading.Thread(target=close_on_accept, daemon=True).start() + return port + + +def closed_dummy_server() -> int: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(("localhost", 0)) + sock.listen() + port = sock.getsockname()[1] + sock.close() + return port + + +def smtp_response_server(smtp_response: str) -> Callable[[], int]: + def inner(): + empty_port = closed_dummy_server() + + class ResponseHandler: + async def handle_DATA(self, server, session, envelope) -> str: + return smtp_response + + controller = Controller( + ResponseHandler(), hostname="localhost", port=empty_port + ) + controller.start() + return controller.server.sockets[0].getsockname()[1] + + return inner + + +@pytest.mark.parametrize( + "server_fn", + [ + close_on_connect_dummy_server, + closed_dummy_server, + smtp_response_server("421 Retry"), + smtp_response_server("500 error"), + ], +) +def test_mail_sender_save_unsent_to_disk(server_fn): + original_postfix_server = config.POSTFIX_SERVER + config.POSTFIX_SERVER = "localhost" + config.NOT_SEND_EMAIL = False + config.POSTFIX_SUBMISSION_TLS = False + config.POSTFIX_PORT = server_fn() + with tempfile.TemporaryDirectory() as temp_dir: + config.SAVE_UNSENT_DIR = temp_dir + send_request = create_dummy_send_request() + mail_sender.send(send_request, 0) + found_files = os.listdir(temp_dir) + assert len(found_files) == 1 + loaded_send_request = SendRequest.load_from_file( + os.path.join(temp_dir, found_files[0]) + ) + assert send_request.mail_options == loaded_send_request.mail_options + assert send_request.rcpt_options == loaded_send_request.rcpt_options + assert send_request.envelope_to == loaded_send_request.envelope_to + assert send_request.envelope_from == loaded_send_request.envelope_from + assert send_request.msg[headers.TO] == loaded_send_request.msg[headers.TO] + assert send_request.msg[headers.FROM] == loaded_send_request.msg[headers.FROM] + config.POSTFIX_SERVER = original_postfix_server + config.NOT_SEND_EMAIL = True