app-MAIL-temp/app/handler/dmarc.py

189 lines
6.4 KiB
Python
Raw Normal View History

2022-04-08 11:28:14 +02:00
import uuid
from io import BytesIO
2022-04-11 10:19:25 +02:00
from typing import Optional, Tuple
2022-04-08 11:28:14 +02:00
from aiosmtpd.handlers import Message
from aiosmtpd.smtp import Envelope
from app import s3, config
2022-04-08 11:28:14 +02:00
from app.config import (
DMARC_CHECK_ENABLED,
ALERT_QUARANTINE_DMARC,
ALERT_DMARC_FAILED_REPLY_PHASE,
)
from app.email import headers, status
from app.email_utils import (
get_header_unicode,
send_email_with_rate_control,
render,
add_or_replace_header,
add_header,
)
from app.handler.spamd_result import SpamdResult, Phase, DmarcCheckResult
from app.log import LOG
from app.message_utils import message_to_bytes
2022-04-08 11:28:14 +02:00
from app.models import Alias, Contact, Notification, EmailLog, RefusedEmail
def apply_dmarc_policy_for_forward_phase(
alias: Alias, contact: Contact, envelope: Envelope, msg: Message
2022-04-11 09:28:57 +02:00
) -> Tuple[Message, Optional[str]]:
2022-04-08 11:28:14 +02:00
spam_result = SpamdResult.extract_from_headers(msg, Phase.forward)
if not DMARC_CHECK_ENABLED or not spam_result:
LOG.i("DMARC check disabled")
2022-04-11 09:28:57 +02:00
return msg, None
LOG.i(f"Spam check result in {spam_result}")
2022-04-08 11:28:14 +02:00
from_header = get_header_unicode(msg[headers.FROM])
warning_plain_text = """This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.
More info on https://simplelogin.io/docs/getting-started/anti-phishing/
"""
warning_html = """
<p style="color:red">
This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.
More info on <a href="https://simplelogin.io/docs/getting-started/anti-phishing/">anti-phishing measure</a>
</p>
"""
# do not quarantine an email if fails DMARC but has a small rspamd score
if (
config.MIN_RSPAMD_SCORE_FOR_FAILED_DMARC is not None
and spam_result.rspamd_score < config.MIN_RSPAMD_SCORE_FOR_FAILED_DMARC
and spam_result.dmarc
in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
)
):
LOG.w(
f"email fails DMARC but has a small rspamd score, from contact {contact.email} to alias {alias.email}."
f"mail_from:{envelope.mail_from}, from_header: {from_header}"
)
changed_msg = add_header(
msg,
warning_plain_text,
warning_html,
)
return changed_msg, None
2022-04-08 11:28:14 +02:00
if spam_result.dmarc == DmarcCheckResult.soft_fail:
LOG.w(
f"dmarc forward: soft_fail from contact {contact.email} to alias {alias.email}."
f"mail_from:{envelope.mail_from}, from_header: {from_header}"
)
changed_msg = add_header(
msg,
warning_plain_text,
warning_html,
2022-04-08 11:28:14 +02:00
)
2022-04-11 09:28:57 +02:00
return changed_msg, None
2022-04-08 11:28:14 +02:00
if spam_result.dmarc in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
):
LOG.w(
f"dmarc forward: put email from {contact} to {alias} to quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
email_log = quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg)
Notification.create(
user_id=alias.user_id,
title=f"{alias.email} has a new mail in quarantine",
message=Notification.render(
"notification/message-quarantine.html", alias=alias
),
commit=True,
)
user = alias.user
send_email_with_rate_control(
user,
ALERT_QUARANTINE_DMARC,
user.email,
f"An email sent to {alias.email} has been quarantined",
render(
"transactional/message-quarantine-dmarc.txt.jinja2",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
render(
"transactional/message-quarantine-dmarc.html",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
max_nb_alert=10,
ignore_smtp_error=True,
)
2022-04-11 09:28:57 +02:00
return msg, status.E215
2022-04-08 11:28:14 +02:00
2022-04-11 09:28:57 +02:00
return msg, None
2022-04-08 11:28:14 +02:00
def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> EmailLog:
add_or_replace_header(msg, headers.SL_DIRECTION, "Forward")
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(message_to_bytes(msg)), f"full-{random_name}"
2022-04-08 11:28:14 +02:00
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
email_log = EmailLog.create(
2022-04-08 11:28:14 +02:00
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
alias_id=alias.id,
message_id=str(msg[headers.MESSAGE_ID]),
refused_email_id=refused_email.id,
is_spam=True,
blocked=True,
commit=True,
)
return email_log
2022-04-08 11:28:14 +02:00
def apply_dmarc_policy_for_reply_phase(
alias_from: Alias, contact_recipient: Contact, envelope: Envelope, msg: Message
) -> Optional[str]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.reply)
if not DMARC_CHECK_ENABLED or not spam_result:
LOG.i("DMARC check disabled")
2022-04-08 11:28:14 +02:00
return None
LOG.i(f"Spam check result is {spam_result}")
2022-04-08 11:28:14 +02:00
if spam_result.dmarc not in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
DmarcCheckResult.soft_fail,
):
return None
2022-04-08 11:28:14 +02:00
LOG.w(
f"dmarc reply: Put email from {alias_from.email} to {contact_recipient} into quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
send_email_with_rate_control(
alias_from.user,
ALERT_DMARC_FAILED_REPLY_PHASE,
alias_from.user.email,
f"Attempt to send an email to your contact {contact_recipient.email} from {envelope.mail_from}",
render(
2022-04-14 09:23:49 +02:00
"transactional/spoof-reply.txt.jinja2",
2022-04-08 11:28:14 +02:00
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
render(
"transactional/spoof-reply.html",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
)
return status.E215