Save unsent emails to disk to be resent later (#1022)

* Initial save to disk

* Store unsent messages to disk so they can be retried later

* Set back not sending emails

* Fixed decorator

* Add general exceptions to the catchall

* Have dummy server just to make sure

* Added several server test cases

* ADded tests for bounced and error status

* Moved dir creation to config parse time

* Set LOG.e

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
Adrià Casajús 2022-05-30 11:52:10 +02:00 committed by GitHub
parent 4a839d9a55
commit 7ba9bcb9e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 203 additions and 16 deletions

View File

@ -413,6 +413,14 @@ POSTMASTER = os.environ.get("POSTMASTER")
# store temporary files, especially for debugging
TEMP_DIR = os.environ.get("TEMP_DIR")
# Store unsent emails
SAVE_UNSENT_DIR = os.environ.get("SAVE_UNSENT_DIR")
if SAVE_UNSENT_DIR and not os.path.isdir(SAVE_UNSENT_DIR):
try:
os.makedirs(SAVE_UNSENT_DIR)
except FileExistsError:
pass
# enable the alias automation disable: an alias can be automatically disabled if it has too many bounces
ALIAS_AUTOMATIC_DISABLE = "ALIAS_AUTOMATIC_DISABLE" in os.environ

View File

@ -1,8 +1,15 @@
from __future__ import annotations
import base64
import email
import json
import os
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from mailbox import Message
from smtplib import SMTP, SMTPServerDisconnected, SMTPRecipientsRefused
from typing import Optional, Dict, List
from smtplib import SMTP, SMTPException
from typing import Optional, Dict, List, Callable
import newrelic.agent
from attr import dataclass
@ -23,6 +30,40 @@ class SendRequest:
is_forward: bool = False
ignore_smtp_errors: bool = False
def to_bytes(self) -> bytes:
if not config.SAVE_UNSENT_DIR:
LOG.d("Skipping saving unsent message because SAVE_UNSENT_DIR is not set")
return
serialized_message = message_to_bytes(self.msg)
data = {
"envelope_from": self.envelope_from,
"envelope_to": self.envelope_to,
"msg": base64.b64encode(serialized_message).decode("utf-8"),
"mail_options": self.mail_options,
"rcpt_options": self.rcpt_options,
"is_forward": self.is_forward,
}
return json.dumps(data).encode("utf-8")
@staticmethod
def load_from_file(file_path: str) -> SendRequest:
with open(file_path, "rb") as fd:
return SendRequest.load_from_bytes(fd.read())
@staticmethod
def load_from_bytes(data: bytes) -> SendRequest:
decoded_data = json.loads(data)
msg_data = base64.b64decode(decoded_data["msg"])
msg = email.message_from_bytes(msg_data)
return SendRequest(
envelope_from=decoded_data["envelope_from"],
envelope_to=decoded_data["envelope_to"],
msg=msg,
mail_options=decoded_data["mail_options"],
rcpt_options=decoded_data["rcpt_options"],
is_forward=decoded_data["is_forward"],
)
class MailSender:
def __init__(self):
@ -30,8 +71,8 @@ class MailSender:
self._store_emails = False
self._emails_sent: List[SendRequest] = []
def store_emails_instead_of_sending(self):
self._store_emails = True
def store_emails_instead_of_sending(self, store_emails: bool = True):
self._store_emails = store_emails
def purge_stored_emails(self):
self._emails_sent = []
@ -39,6 +80,18 @@ class MailSender:
def get_stored_emails(self) -> List[SendRequest]:
return self._emails_sent
def store_emails_test_decorator(self, fn: Callable) -> Callable:
@wraps(fn)
def wrapper(*args, **kwargs):
self.purge_stored_emails()
self.store_emails_instead_of_sending()
result = fn(*args, **kwargs)
self.purge_stored_emails()
self.store_emails_instead_of_sending(False)
return result
return wrapper
def enable_background_pool(self, max_workers=10):
self._pool = ThreadPoolExecutor(max_workers=max_workers)
@ -98,20 +151,30 @@ class MailSender:
newrelic.agent.record_custom_metric(
"Custom/smtp_sending_time", time.time() - start
)
except (SMTPServerDisconnected, SMTPRecipientsRefused) as e:
except (
SMTPException,
ConnectionRefusedError,
TimeoutError,
) as e:
if retries > 0:
LOG.w(
"SMTPServerDisconnected or SMTPRecipientsRefused error %s, retry",
e,
exc_info=True,
)
time.sleep(0.3 * send_request.retries)
self._send_to_smtp(send_request, retries - 1)
else:
if send_request.ignore_smtp_error:
LOG.w("Ignore smtp error %s", e)
else:
raise
if send_request.ignore_smtp_errors:
LOG.e(f"Ignore smtp error {e}")
return
LOG.e(
f"Could not send message to smtp server {config.POSTFIX_SERVER}:{smtp_port}"
)
self._save_request_to_unsent_dir(send_request)
def _save_request_to_unsent_dir(self, send_request: SendRequest):
file_name = f"DeliveryFail-{int(time.time())}-{uuid.uuid4()}.eml"
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
file_contents = send_request.to_bytes()
with open(file_path, "wb") as fd:
fd.write(file_contents)
LOG.i(f"Saved unsent message {file_path}")
mail_sender = MailSender()

View File

@ -60,10 +60,9 @@ def prepare_complaint(
)
@mail_sender.store_emails_test_decorator
@pytest.mark.parametrize("handle_ftor,provider", origins)
def test_provider_to_user(flask_client, handle_ftor, provider):
mail_sender.store_emails_instead_of_sending()
mail_sender.purge_stored_emails()
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
@ -92,6 +91,7 @@ def test_provider_forward_phase(flask_client, handle_ftor, provider):
assert alerts[0].alert_type == f"{ALERT_COMPLAINT_REPLY_PHASE}_{provider}"
@mail_sender.store_emails_test_decorator
@pytest.mark.parametrize("handle_ftor,provider", origins)
def test_provider_reply_phase(flask_client, handle_ftor, provider):
mail_sender.store_emails_instead_of_sending()

116
tests/test_mail_sender.py Normal file
View File

@ -0,0 +1,116 @@
import os
import tempfile
import threading
import socket
from email.message import Message
from random import random
from typing import Callable
import pytest
from aiosmtpd.controller import Controller
from app.email import headers
from app.mail_sender import mail_sender, SendRequest
from app import config
def create_dummy_send_request() -> SendRequest:
to_addr = f"to-{int(random())}@destination.com"
from_addr = f"from-{int(random())}@source.com"
msg = Message()
msg[headers.TO] = to_addr
msg[headers.FROM] = from_addr
msg[headers.SUBJECT] = f"Random subject {random()}"
msg.set_payload(f"Test content {random()}")
return SendRequest(
f"from-{int(random())}@envelope.com",
to_addr,
msg,
)
@mail_sender.store_emails_test_decorator
def test_mail_sender_save_to_mem():
send_request = create_dummy_send_request()
mail_sender.send(send_request, 0)
stored_emails = mail_sender.get_stored_emails()
assert len(stored_emails) == 1
assert stored_emails[0] == send_request
def close_on_connect_dummy_server() -> int:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("localhost", 0))
sock.listen()
port = sock.getsockname()[1]
def close_on_accept():
connection, _ = sock.accept()
connection.close()
sock.close()
threading.Thread(target=close_on_accept, daemon=True).start()
return port
def closed_dummy_server() -> int:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("localhost", 0))
sock.listen()
port = sock.getsockname()[1]
sock.close()
return port
def smtp_response_server(smtp_response: str) -> Callable[[], int]:
def inner():
empty_port = closed_dummy_server()
class ResponseHandler:
async def handle_DATA(self, server, session, envelope) -> str:
return smtp_response
controller = Controller(
ResponseHandler(), hostname="localhost", port=empty_port
)
controller.start()
return controller.server.sockets[0].getsockname()[1]
return inner
@pytest.mark.parametrize(
"server_fn",
[
close_on_connect_dummy_server,
closed_dummy_server,
smtp_response_server("421 Retry"),
smtp_response_server("500 error"),
],
)
def test_mail_sender_save_unsent_to_disk(server_fn):
original_postfix_server = config.POSTFIX_SERVER
config.POSTFIX_SERVER = "localhost"
config.NOT_SEND_EMAIL = False
config.POSTFIX_SUBMISSION_TLS = False
config.POSTFIX_PORT = server_fn()
with tempfile.TemporaryDirectory() as temp_dir:
config.SAVE_UNSENT_DIR = temp_dir
send_request = create_dummy_send_request()
mail_sender.send(send_request, 0)
found_files = os.listdir(temp_dir)
assert len(found_files) == 1
loaded_send_request = SendRequest.load_from_file(
os.path.join(temp_dir, found_files[0])
)
assert send_request.mail_options == loaded_send_request.mail_options
assert send_request.rcpt_options == loaded_send_request.rcpt_options
assert send_request.envelope_to == loaded_send_request.envelope_to
assert send_request.envelope_from == loaded_send_request.envelope_from
assert send_request.msg[headers.TO] == loaded_send_request.msg[headers.TO]
assert send_request.msg[headers.FROM] == loaded_send_request.msg[headers.FROM]
config.POSTFIX_SERVER = original_postfix_server
config.NOT_SEND_EMAIL = True