import os import tempfile import threading import socket from email.message import Message from random import random from typing import Callable import pytest from aiosmtpd.controller import Controller from app.email import headers from app.mail_sender import ( mail_sender, SendRequest, load_unsent_mails_from_fs_and_resend, ) from app import config def create_dummy_send_request() -> SendRequest: to_addr = f"to-{int(random())}@destination.com" from_addr = f"from-{int(random())}@source.com" msg = Message() msg[headers.TO] = to_addr msg[headers.FROM] = from_addr msg[headers.SUBJECT] = f"Random subject {random()}" msg.set_payload(f"Test content {random()}") return SendRequest( f"from-{int(random())}@envelope.com", to_addr, msg, ) @mail_sender.store_emails_test_decorator def test_mail_sender_save_to_mem(): send_request = create_dummy_send_request() mail_sender.send(send_request, 0) stored_emails = mail_sender.get_stored_emails() assert len(stored_emails) == 1 assert stored_emails[0] == send_request def close_on_connect_dummy_server() -> int: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("localhost", 0)) sock.listen() port = sock.getsockname()[1] def close_on_accept(): connection, _ = sock.accept() connection.close() sock.close() threading.Thread(target=close_on_accept, daemon=True).start() return port def closed_dummy_server() -> int: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("localhost", 0)) sock.listen() port = sock.getsockname()[1] sock.close() return port def smtp_response_server(smtp_response: str) -> Callable[[], int]: def inner(): empty_port = closed_dummy_server() class ResponseHandler: async def handle_DATA(self, server, session, envelope) -> str: return smtp_response controller = Controller( ResponseHandler(), hostname="localhost", port=empty_port ) controller.start() return controller.server.sockets[0].getsockname()[1] return inner def compare_send_requests(expected: SendRequest, request: SendRequest): assert request.mail_options == expected.mail_options assert request.rcpt_options == expected.rcpt_options assert request.envelope_to == expected.envelope_to assert request.envelope_from == expected.envelope_from assert request.msg[headers.TO] == expected.msg[headers.TO] assert request.msg[headers.FROM] == expected.msg[headers.FROM] @pytest.mark.parametrize( "server_fn", [ close_on_connect_dummy_server, closed_dummy_server, smtp_response_server("421 Retry"), smtp_response_server("500 error"), ], ) def test_mail_sender_save_unsent_to_disk(server_fn): original_postfix_server = config.POSTFIX_SERVER config.POSTFIX_SERVER = "localhost" config.NOT_SEND_EMAIL = False config.POSTFIX_SUBMISSION_TLS = False config.POSTFIX_PORT = server_fn() try: with tempfile.TemporaryDirectory() as temp_dir: config.SAVE_UNSENT_DIR = temp_dir send_request = create_dummy_send_request() assert not mail_sender.send(send_request, 0) found_files = os.listdir(temp_dir) assert len(found_files) == 1 loaded_send_request = SendRequest.load_from_file( os.path.join(temp_dir, found_files[0]) ) compare_send_requests(loaded_send_request, send_request) finally: config.POSTFIX_SERVER = original_postfix_server config.NOT_SEND_EMAIL = True @mail_sender.store_emails_test_decorator def test_send_unsent_email_from_fs(): original_postfix_server = config.POSTFIX_SERVER config.POSTFIX_SERVER = "localhost" config.NOT_SEND_EMAIL = False with tempfile.TemporaryDirectory() as temp_dir: try: config.SAVE_UNSENT_DIR = temp_dir send_request = create_dummy_send_request() assert not mail_sender.send(send_request, 1) finally: config.POSTFIX_SERVER = original_postfix_server config.NOT_SEND_EMAIL = True saved_files = os.listdir(config.SAVE_UNSENT_DIR) assert len(saved_files) == 1 mail_sender.purge_stored_emails() load_unsent_mails_from_fs_and_resend() sent_emails = mail_sender.get_stored_emails() assert len(sent_emails) == 1 compare_send_requests(send_request, sent_emails[0]) assert sent_emails[0].ignore_smtp_errors assert not os.path.exists(os.path.join(config.SAVE_UNSENT_DIR, saved_files[0])) saved_files = os.listdir(config.SAVE_UNSENT_DIR) assert len(saved_files) == 0 @mail_sender.store_emails_test_decorator def test_failed_resend_does_not_delete_file(): original_postfix_server = config.POSTFIX_SERVER config.POSTFIX_SERVER = "localhost" config.NOT_SEND_EMAIL = False try: with tempfile.TemporaryDirectory() as temp_dir: config.SAVE_UNSENT_DIR = temp_dir send_request = create_dummy_send_request() # Send and store email in disk assert not mail_sender.send(send_request, 1) saved_files = os.listdir(config.SAVE_UNSENT_DIR) assert len(saved_files) == 1 mail_sender.purge_stored_emails() # Send and keep email in disk load_unsent_mails_from_fs_and_resend() sent_emails = mail_sender.get_stored_emails() assert len(sent_emails) == 1 compare_send_requests(send_request, sent_emails[0]) assert sent_emails[0].ignore_smtp_errors assert os.path.exists(os.path.join(config.SAVE_UNSENT_DIR, saved_files[0])) # No more emails are stored in disk assert saved_files == os.listdir(config.SAVE_UNSENT_DIR) finally: config.POSTFIX_SERVER = original_postfix_server config.NOT_SEND_EMAIL = True @mail_sender.store_emails_test_decorator def test_ok_mail_does_not_generate_unsent_file(): with tempfile.TemporaryDirectory() as temp_dir: config.SAVE_UNSENT_DIR = temp_dir send_request = create_dummy_send_request() # Send and store email in disk assert mail_sender.send(send_request, 1) saved_files = os.listdir(config.SAVE_UNSENT_DIR) assert len(saved_files) == 0