Allow sending messages in a background thread

This commit is contained in:
Adrià Casajús 2022-04-28 14:43:24 +02:00
parent 93ae82aa46
commit 9a04376894
No known key found for this signature in database
GPG Key ID: F0033226A5AFC9B9
8 changed files with 215 additions and 128 deletions

View File

@ -5,14 +5,14 @@ from email.message import Message
import aiospamc
from app.config import SPAMASSASSIN_HOST
from app.email_utils import to_bytes
from app.log import LOG
from app.message_utils import message_to_bytes
from app.models import EmailLog
from app.spamassassin_utils import SpamAssassin
async def get_spam_score_async(message: Message) -> float:
sa_input = to_bytes(message)
sa_input = message_to_bytes(message)
# Spamassassin requires to have an ending linebreak
if not sa_input.endswith(b"\n"):
@ -41,7 +41,7 @@ def get_spam_score(
Return the spam score and spam report
"""
LOG.d("get spam score for %s", email_log)
sa_input = to_bytes(message)
sa_input = message_to_bytes(message)
# Spamassassin requires to have an ending linebreak
if not sa_input.endswith(b"\n"):

View File

@ -14,12 +14,11 @@ from email.message import Message, EmailMessage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import make_msgid, formatdate
from smtplib import SMTP, SMTPServerDisconnected, SMTPException, SMTPRecipientsRefused
from smtplib import SMTP, SMTPException
from typing import Tuple, List, Optional, Union
import arrow
import dkim
import newrelic.agent
import re2 as re
import spf
from aiosmtpd.smtp import Envelope
@ -63,6 +62,8 @@ from app.db import Session
from app.dns_utils import get_mx_domains
from app.email import headers
from app.log import LOG
from app.mail_sender import sl_sendmail
from app.message_utils import message_to_bytes
from app.models import (
Mailbox,
User,
@ -477,7 +478,7 @@ def add_dkim_signature_with_header(
# Generate message signature
if DKIM_PRIVATE_KEY:
sig = dkim.sign(
to_bytes(msg),
message_to_bytes(msg),
DKIM_SELECTOR,
email_domain.encode(),
DKIM_PRIVATE_KEY.encode(),
@ -836,7 +837,7 @@ def copy(msg: Message) -> Message:
return message_from_string(msg.as_string())
except (UnicodeEncodeError, LookupError):
LOG.w("as_string() fails, try bytes parsing")
return message_from_bytes(to_bytes(msg))
return message_from_bytes(message_to_bytes(msg))
def to_bytes(msg: Message):
@ -1307,82 +1308,6 @@ def get_smtp_server():
return smtp
def sl_sendmail(
from_addr,
to_addr,
msg: Message,
mail_options=(),
rcpt_options=(),
is_forward: bool = False,
retries=2,
ignore_smtp_error=False,
):
"""replace smtp.sendmail"""
if NOT_SEND_EMAIL:
LOG.d(
"send email with subject '%s', from '%s' to '%s'",
msg[headers.SUBJECT],
msg[headers.FROM],
msg[headers.TO],
)
return
try:
start = time.time()
if POSTFIX_SUBMISSION_TLS:
smtp_port = 587
else:
smtp_port = POSTFIX_PORT
with SMTP(POSTFIX_SERVER, smtp_port) as smtp:
if POSTFIX_SUBMISSION_TLS:
smtp.starttls()
elapsed = time.time() - start
LOG.d("getting a smtp connection takes seconds %s", elapsed)
newrelic.agent.record_custom_metric("Custom/smtp_connection_time", elapsed)
# smtp.send_message has UnicodeEncodeError
# encode message raw directly instead
LOG.d(
"Sendmail mail_from:%s, rcpt_to:%s, header_from:%s, header_to:%s, header_cc:%s",
from_addr,
to_addr,
msg[headers.FROM],
msg[headers.TO],
msg[headers.CC],
)
smtp.sendmail(
from_addr,
to_addr,
to_bytes(msg),
mail_options,
rcpt_options,
)
except (SMTPServerDisconnected, SMTPRecipientsRefused) as e:
if retries > 0:
LOG.w(
"SMTPServerDisconnected or SMTPRecipientsRefused error %s, retry",
e,
exc_info=True,
)
time.sleep(0.3 * retries)
sl_sendmail(
from_addr,
to_addr,
msg,
mail_options,
rcpt_options,
is_forward,
retries=retries - 1,
)
else:
if ignore_smtp_error:
LOG.w("Ignore smtp error %s", e)
else:
raise
def get_queue_id(msg: Message) -> Optional[str]:
"""Get the Postfix queue-id from a message"""
header_values = msg.get_all(headers.RSPAMD_QUEUE_ID)

View File

@ -17,11 +17,11 @@ from app.email_utils import (
send_email_with_rate_control,
render,
add_or_replace_header,
to_bytes,
add_header,
)
from app.handler.spamd_result import SpamdResult, Phase, DmarcCheckResult
from app.log import LOG
from app.message_utils import message_to_bytes
from app.models import Alias, Contact, Notification, EmailLog, RefusedEmail
@ -102,7 +102,7 @@ def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> Emai
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
s3_report_path, BytesIO(message_to_bytes(msg)), f"full-{random_name}"
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True

131
app/mail_sender.py Normal file
View File

@ -0,0 +1,131 @@
import time
from concurrent.futures import ThreadPoolExecutor
from mailbox import Message
from smtplib import SMTP, SMTPServerDisconnected, SMTPRecipientsRefused
from typing import Optional, Dict
import newrelic
from attr import dataclass
from app.config import (
NOT_SEND_EMAIL,
POSTFIX_SUBMISSION_TLS,
POSTFIX_PORT,
POSTFIX_SERVER,
)
from app.email import headers
from app.log import LOG
from app.message_utils import message_to_bytes
@dataclass
class SendRequest:
from_address: str
to_address: str
msg: Message
mail_options: Dict = {}
rcpt_options: Dict = {}
is_forward: bool = False
ignore_smtp_errors: bool = False
class MailSender:
def __init__(self):
self._pool: Optional[ThreadPoolExecutor] = None
def enable_background_pool(self, max_workers=10):
self._pool = ThreadPoolExecutor(max_workers=max_workers)
def send(self, send_request: SendRequest, retries: int = 2):
"""replace smtp.sendmail"""
if NOT_SEND_EMAIL:
LOG.d(
"send email with subject '%s', from '%s' to '%s'",
send_request.msg[headers.SUBJECT],
send_request.msg[headers.FROM],
send_request.msg[headers.TO],
)
return
if not self._pool:
self._send_to_smtp(send_request, retries)
else:
self._pool.submit(self._send_to_smtp, (send_request, retries))
def _send_to_smtp(self, send_request: SendRequest, retries: int):
try:
start = time.time()
if POSTFIX_SUBMISSION_TLS:
smtp_port = 587
else:
smtp_port = POSTFIX_PORT
with SMTP(POSTFIX_SERVER, smtp_port) as smtp:
if POSTFIX_SUBMISSION_TLS:
smtp.starttls()
elapsed = time.time() - start
LOG.d("getting a smtp connection takes seconds %s", elapsed)
newrelic.agent.record_custom_metric(
"Custom/smtp_connection_time", elapsed
)
# smtp.send_message has UnicodeEncodeError
# encode message raw directly instead
LOG.d(
"Sendmail mail_from:%s, rcpt_to:%s, header_from:%s, header_to:%s, header_cc:%s",
send_request.from_address,
send_request.to_address,
send_request.msg[headers.FROM],
send_request.msg[headers.TO],
send_request.msg[headers.CC],
)
smtp.sendmail(
send_request.from_address,
send_request.to_address,
message_to_bytes(send_request.msg),
send_request.mail_options,
send_request.rcpt_options,
)
newrelic.agent.record_custom_metric(
"Custom/smtp_sending_time", time.time() - start
)
except (SMTPServerDisconnected, SMTPRecipientsRefused) as e:
if retries > 0:
LOG.w(
"SMTPServerDisconnected or SMTPRecipientsRefused error %s, retry",
e,
exc_info=True,
)
time.sleep(0.3 * send_request.retries)
self._send_to_smtp(send_request, retries - 1)
else:
if send_request.ignore_smtp_error:
LOG.w("Ignore smtp error %s", e)
else:
raise
mail_sender = MailSender()
def sl_sendmail(
from_address: str,
to_address: str,
msg: Message,
mail_options=(),
rcpt_options=(),
is_forward: bool = False,
retries=2,
ignore_smtp_error=False,
):
send_request = SendRequest(
from_address,
to_address,
msg,
mail_options,
rcpt_options,
is_forward,
ignore_smtp_error,
)
mail_sender.send(send_request, retries)

21
app/message_utils.py Normal file
View File

@ -0,0 +1,21 @@
from email import policy
from email.message import Message
from app.log import LOG
def message_to_bytes(msg: Message) -> bytes:
"""replace Message.as_bytes() method by trying different policies"""
for generator_policy in [None, policy.SMTP, policy.SMTPUTF8]:
try:
return msg.as_bytes(policy=generator_policy)
except:
LOG.w("as_bytes() fails with %s policy", policy, exc_info=True)
msg_string = msg.as_string()
try:
return msg_string.encode()
except:
LOG.w("as_string().encode() fails", exc_info=True)
return msg_string.encode(errors="replace")

View File

@ -104,7 +104,6 @@ from app.email_utils import (
send_email_with_rate_control,
get_email_domain_part,
copy,
to_bytes,
send_email_at_most_times,
is_valid_alias_address_domain,
should_add_dkim_signature,
@ -118,7 +117,6 @@ from app.email_utils import (
should_disable,
parse_id_from_bounce,
spf_pass,
sl_sendmail,
sanitize_header,
get_queue_id,
should_ignore_bounce,
@ -147,6 +145,8 @@ from app.handler.spamd_result import (
SPFCheckResult,
)
from app.log import LOG, set_message_id
from app.mail_sender import sl_sendmail
from app.message_utils import message_to_bytes
from app.models import (
Alias,
Contact,
@ -501,7 +501,7 @@ def prepare_pgp_message(
# encrypt
# use pgpy as fallback
msg_bytes = to_bytes(clone_msg)
msg_bytes = message_to_bytes(clone_msg)
try:
encrypted_data = pgp_utils.encrypt_file(BytesIO(msg_bytes), pgp_fingerprint)
second.set_payload(encrypted_data)
@ -527,11 +527,11 @@ def sign_msg(msg: Message) -> Message:
signature.add_header("Content-Disposition", 'attachment; filename="signature.asc"')
try:
signature.set_payload(sign_data(to_bytes(msg).replace(b"\n", b"\r\n")))
signature.set_payload(sign_data(message_to_bytes(msg).replace(b"\n", b"\r\n")))
except Exception:
LOG.e("Cannot sign, try using pgpy")
signature.set_payload(
sign_data_with_pgpy(to_bytes(msg).replace(b"\n", b"\r\n"))
sign_data_with_pgpy(message_to_bytes(msg).replace(b"\n", b"\r\n"))
)
container.attach(signature)
@ -543,7 +543,9 @@ def handle_email_sent_to_ourself(alias, from_addr: str, msg: Message, user):
# store the refused email
random_name = str(uuid.uuid4())
full_report_path = f"refused-emails/cycle-{random_name}.eml"
s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name)
s3.upload_email_from_bytesio(
full_report_path, BytesIO(message_to_bytes(msg)), random_name
)
refused_email = RefusedEmail.create(
path=None, full_report_path=full_report_path, user_id=alias.user_id
)
@ -1394,7 +1396,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog):
full_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
full_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
full_report_path, BytesIO(message_to_bytes(msg)), f"full-{random_name}"
)
file_path = None
@ -1413,7 +1415,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog):
else:
file_path = f"refused-emails/{random_name}.eml"
s3.upload_email_from_bytesio(
file_path, BytesIO(to_bytes(orig_msg)), random_name
file_path, BytesIO(message_to_bytes(orig_msg)), random_name
)
refused_email = RefusedEmail.create(
@ -1733,14 +1735,16 @@ def handle_bounce_reply_phase(envelope, msg: Message, email_log: EmailLog):
random_name = str(uuid.uuid4())
full_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name)
s3.upload_email_from_bytesio(
full_report_path, BytesIO(message_to_bytes(msg)), random_name
)
orig_msg = get_orig_message_from_bounce(msg)
file_path = None
if orig_msg:
file_path = f"refused-emails/{random_name}.eml"
s3.upload_email_from_bytesio(
file_path, BytesIO(to_bytes(orig_msg)), random_name
file_path, BytesIO(message_to_bytes(orig_msg)), random_name
)
refused_email = RefusedEmail.create(
@ -1809,13 +1813,15 @@ def handle_spam(
random_name = str(uuid.uuid4())
full_report_path = f"spams/full-{random_name}.eml"
s3.upload_email_from_bytesio(full_report_path, BytesIO(to_bytes(msg)), random_name)
s3.upload_email_from_bytesio(
full_report_path, BytesIO(message_to_bytes(msg)), random_name
)
file_path = None
if orig_msg:
file_path = f"spams/{random_name}.eml"
s3.upload_email_from_bytesio(
file_path, BytesIO(to_bytes(orig_msg)), random_name
file_path, BytesIO(message_to_bytes(orig_msg)), random_name
)
refused_email = RefusedEmail.create(

View File

@ -19,7 +19,6 @@ from app.email_utils import (
get_header_from_bounce,
is_valid_email,
add_header,
to_bytes,
generate_reply_email,
normalize_reply_email,
get_encoding,
@ -161,23 +160,6 @@ def test_send_email_with_rate_control(flask_client):
)
def test_copy():
email_str = """
From: abcd@gmail.com
To: hey@example.org
Subject: subject
Body
"""
msg = email.message_from_string(email_str)
msg2 = copy(msg)
assert to_bytes(msg) == to_bytes(msg2)
msg = email.message_from_string("👌")
msg2 = copy(msg)
assert to_bytes(msg) == to_bytes(msg2)
def test_get_spam_from_header():
is_spam, _ = get_spam_from_header(
"""No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
@ -476,19 +458,6 @@ Content-Type: text/html; charset=us-ascii
assert "old" not in new_msg.as_string()
def test_to_bytes():
msg = email.message_from_string("☕️ emoji")
assert to_bytes(msg)
# \n is appended when message is converted to bytes
assert to_bytes(msg).decode() == "\n☕️ emoji"
msg = email.message_from_string("ascii")
assert to_bytes(msg) == b"\nascii"
msg = email.message_from_string("éèà€")
assert to_bytes(msg).decode() == "\néèà€"
def test_generate_reply_email(flask_client):
user = create_new_user()
reply_email = generate_reply_email("test@example.org", user)

View File

@ -0,0 +1,35 @@
import email
from app.email_utils import (
copy,
)
from app.message_utils import message_to_bytes
def test_copy():
email_str = """
From: abcd@gmail.com
To: hey@example.org
Subject: subject
Body
"""
msg = email.message_from_string(email_str)
msg2 = copy(msg)
assert message_to_bytes(msg) == message_to_bytes(msg2)
msg = email.message_from_string("👌")
msg2 = copy(msg)
assert message_to_bytes(msg) == message_to_bytes(msg2)
def test_to_bytes():
msg = email.message_from_string("☕️ emoji")
assert message_to_bytes(msg)
# \n is appended when message is converted to bytes
assert message_to_bytes(msg).decode() == "\n☕️ emoji"
msg = email.message_from_string("ascii")
assert message_to_bytes(msg) == b"\nascii"
msg = email.message_from_string("éèà€")
assert message_to_bytes(msg).decode() == "\néèà€"