app-MAIL-temp/app/handler/dmarc.py
Adrià Casajús 50c130a3a3
Store the latest email_log id in the alias to simplify dashboard query (#2022)
* Store the latest email_log id in the alias to simplify dashboard query

* Fix test

* Add script to migrate users last email_log_id to alias

* Always update the alias last_email_log_id automatically

* Only set the alias_id if it is set

* Fix test with randomization

* Fix notification test

* Also remove explicit set on tests

* Rate limit alias creation to prevent abuse (#2021)

* Rate limit alias creation to prevent abuse

* Limit in secs

* Calculate bucket time

* fix exception

* Tune limits

* Move rate limit config to configuration (#2023)

* Fix dropdown item in header (#2024)

* Add option for admin to stop trial (#2026)

* Fix: if redis is not configured do not enable rate limit (#2027)

* support product IDs for the new Mac app (#2028)

Co-authored-by: Son NK <son@simplelogin.io>

* Add metrics to rate limit (#2029)

* Order domains alphabetically when retrieving them (#2030)

* Removed unused import

* Remove debug info

---------

Co-authored-by: D-Bao <49440133+D-Bao@users.noreply.github.com>
Co-authored-by: Son Nguyen Kim <son.nguyen@proton.ch>
Co-authored-by: Son NK <son@simplelogin.io>
2024-02-15 15:48:02 +01:00

184 lines
6.3 KiB
Python

import uuid
from io import BytesIO
from typing import Optional, Tuple
from aiosmtpd.handlers import Message
from aiosmtpd.smtp import Envelope
from app import s3, config
from app.config import (
DMARC_CHECK_ENABLED,
ALERT_QUARANTINE_DMARC,
ALERT_DMARC_FAILED_REPLY_PHASE,
)
from app.email import headers, status
from app.email_utils import (
get_header_unicode,
send_email_with_rate_control,
render,
add_or_replace_header,
add_header,
)
from app.handler.spamd_result import SpamdResult, Phase, DmarcCheckResult
from app.log import LOG
from app.message_utils import message_to_bytes
from app.models import Alias, Contact, Notification, EmailLog, RefusedEmail
def apply_dmarc_policy_for_forward_phase(
alias: Alias, contact: Contact, envelope: Envelope, msg: Message
) -> Tuple[Message, Optional[str]]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.forward)
if not DMARC_CHECK_ENABLED or not spam_result:
return msg, None
from_header = get_header_unicode(msg[headers.FROM])
warning_plain_text = """This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.
More info on https://simplelogin.io/docs/getting-started/anti-phishing/
"""
warning_html = """
<p style="color:red">
This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.
More info on <a href="https://simplelogin.io/docs/getting-started/anti-phishing/">anti-phishing measure</a>
</p>
"""
# do not quarantine an email if fails DMARC but has a small rspamd score
if (
config.MIN_RSPAMD_SCORE_FOR_FAILED_DMARC is not None
and spam_result.rspamd_score < config.MIN_RSPAMD_SCORE_FOR_FAILED_DMARC
and spam_result.dmarc
in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
)
):
LOG.w(
f"email fails DMARC but has a small rspamd score, from contact {contact.email} to alias {alias.email}."
f"mail_from:{envelope.mail_from}, from_header: {from_header}"
)
changed_msg = add_header(
msg,
warning_plain_text,
warning_html,
)
return changed_msg, None
if spam_result.dmarc == DmarcCheckResult.soft_fail:
LOG.w(
f"dmarc forward: soft_fail from contact {contact.email} to alias {alias.email}."
f"mail_from:{envelope.mail_from}, from_header: {from_header}"
)
changed_msg = add_header(
msg,
warning_plain_text,
warning_html,
)
return changed_msg, None
if spam_result.dmarc in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
):
LOG.w(
f"dmarc forward: put email from {contact} to {alias} to quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
email_log = quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg)
Notification.create(
user_id=alias.user_id,
title=f"{alias.email} has a new mail in quarantine",
message=Notification.render(
"notification/message-quarantine.html", alias=alias
),
commit=True,
)
user = alias.user
send_email_with_rate_control(
user,
ALERT_QUARANTINE_DMARC,
user.email,
f"An email sent to {alias.email} has been quarantined",
render(
"transactional/message-quarantine-dmarc.txt.jinja2",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
render(
"transactional/message-quarantine-dmarc.html",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
max_nb_alert=10,
ignore_smtp_error=True,
)
return msg, status.E215
return msg, None
def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> EmailLog:
add_or_replace_header(msg, headers.SL_DIRECTION, "Forward")
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(message_to_bytes(msg)), f"full-{random_name}"
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
email_log = EmailLog.create(
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
alias_id=alias.id,
message_id=str(msg[headers.MESSAGE_ID]),
refused_email_id=refused_email.id,
is_spam=True,
blocked=True,
commit=True,
)
return email_log
def apply_dmarc_policy_for_reply_phase(
alias_from: Alias, contact_recipient: Contact, envelope: Envelope, msg: Message
) -> Optional[str]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.reply)
if not DMARC_CHECK_ENABLED or not spam_result:
return None
if spam_result.dmarc not in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
DmarcCheckResult.soft_fail,
):
return None
LOG.w(
f"dmarc reply: Put email from {alias_from.email} to {contact_recipient} into quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
send_email_with_rate_control(
alias_from.user,
ALERT_DMARC_FAILED_REPLY_PHASE,
alias_from.user.email,
f"Attempt to send an email to your contact {contact_recipient.email} from {envelope.mail_from}",
render(
"transactional/spoof-reply.txt.jinja2",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
render(
"transactional/spoof-reply.html",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
)
return status.E215