Merge pull request #856 from simple-login/feature/dmarc-email-notif

Send email when an email is put to quarantine and do not put soft_fail email to quarantine
This commit is contained in:
Son Nguyen Kim 2022-03-25 18:26:30 +01:00 committed by GitHub
commit af85b3a997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 110 additions and 26 deletions

View File

@ -340,6 +340,8 @@ ALERT_HOTMAIL_COMPLAINT_REPLY_PHASE = "alert_hotmail_complaint_reply_phase"
ALERT_HOTMAIL_COMPLAINT_TRANSACTIONAL = "alert_hotmail_complaint_transactional"
ALERT_YAHOO_COMPLAINT = "alert_yahoo_complaint"
ALERT_QUARANTINE_DMARC = "alert_quarantine_dmarc"
# <<<<< END ALERT EMAIL >>>>
# Disable onboarding emails

View File

@ -1819,6 +1819,9 @@ class EmailLog(Base, ModelMixin):
else:
return "forward"
def get_dashboard_url(self):
return f"{URL}/dashboard/refused_email?highlight_id={self.id}"
def __repr__(self):
return f"<EmailLog {self.id}>"

View File

@ -88,6 +88,7 @@ from app.config import (
ALERT_FROM_ADDRESS_IS_REVERSE_ALIAS,
ALERT_TO_NOREPLY,
DMARC_CHECK_ENABLED,
ALERT_QUARANTINE_DMARC,
)
from app.db import Session
from app.email import status, headers
@ -540,7 +541,7 @@ def handle_email_sent_to_ourself(alias, from_addr: str, msg: Message, user):
def apply_dmarc_policy(
alias: Alias, contact: Contact, envelope: Envelope, msg: Message
alias: Alias, contact: Contact, envelope: Envelope, msg: Message, from_header
) -> Optional[str]:
dmarc_result = get_dmarc_status(msg)
if dmarc_result:
@ -554,13 +555,14 @@ def apply_dmarc_policy(
if dmarc_result in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
DmarcCheckResult.soft_fail,
# todo: disable soft_fail for now
# DmarcCheckResult.soft_fail,
):
LOG.w(
f"put email from {contact} to {alias} to quarantine. {dmarc_result}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
quarantine_dmarc_failed_email(alias, contact, envelope, msg)
email_log = quarantine_dmarc_failed_email(alias, contact, envelope, msg)
Notification.create(
user_id=alias.user_id,
title=f"{alias.email} has a new mail in quarantine",
@ -569,12 +571,39 @@ def apply_dmarc_policy(
),
commit=True,
)
user = alias.user
send_email_with_rate_control(
user,
ALERT_QUARANTINE_DMARC,
user.email,
f"An email sent to {alias.email} has been quarantined",
render(
"transactional/message-quarantine-dmarc.txt.jinja2",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
render(
"transactional/message-quarantine-dmarc.html",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
max_nb_alert=10,
ignore_smtp_error=True,
)
return status.E215
# todo: remove when soft_fail email is put into quarantine
elif dmarc_result == DmarcCheckResult.soft_fail:
LOG.w(
f"might put email from {contact} to {alias} to quarantine. {dmarc_result}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
return None
def quarantine_dmarc_failed_email(alias, contact, envelope, msg):
def quarantine_dmarc_failed_email(alias, contact, envelope, msg) -> EmailLog:
add_or_replace_header(msg, headers.SL_DIRECTION, "Forward")
msg[headers.SL_ENVELOPE_TO] = alias.email
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
@ -586,6 +615,7 @@ def quarantine_dmarc_failed_email(alias, contact, envelope, msg):
except CannotCreateContactForReverseAlias:
Session.commit()
raise
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
@ -594,7 +624,7 @@ def quarantine_dmarc_failed_email(alias, contact, envelope, msg):
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
EmailLog.create(
return EmailLog.create(
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
@ -688,7 +718,9 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
return [(True, res_status)]
# Check if we need to reject or quarantine based on dmarc
dmarc_delivery_status = apply_dmarc_policy(alias, contact, envelope, msg)
dmarc_delivery_status = apply_dmarc_policy(
alias, contact, envelope, msg, from_header
)
if dmarc_delivery_status is not None:
return [(False, dmarc_delivery_status)]

View File

@ -0,0 +1,28 @@
{% extends "base.html" %}
{% block content %}
{% call text() %}
<h1>
An email from {{ from_header }} to {{ alias.email }} is put into Quarantine
</h1>
{% endcall %}
{% call text() %}
An email from {{ from_header }} to {{ alias.email }} is put into Quarantine as it fails DMARC check.
DMARC is an email authentication protocol designed for detecting phishing.
{% endcall %}
{{ render_button("View the original email", refused_email_url) }}
{% call text() %}
This email is automatically deleted in 7 days.
{% endcall %}
{% call text() %}
Best, <br/>
SimpleLogin Team.
{% endcall %}
{% endblock %}

View File

@ -0,0 +1,8 @@
{% extends "base.txt.jinja2" %}
{% block content %}
An email from {{ from_header }} to {{ alias.email }} is put into Quarantine as it fails DMARC check.
You can view the email at {{ refused_email_url }}.
This email is automatically deleted in 7 days.
{% endblock %}

View File

@ -1,5 +1,15 @@
<div>
There is a new quarantined message for <b>{{ alias.email }}</b>.
Please go to your <a href="/dashboard/refused_email">quarantined emails</a> to review it.
An email from {{ from_header }} to {{ alias.email }} is put into Quarantine as it fails DMARC check.
<br>
<a href="https://en.wikipedia.org/wiki/DMARC">DMARC</a> is an email authentication protocol designed for detecting
phishing. When an email fails DMARC, it can be considered as suspicious.
</div>
<a href="/dashboard/refused_email" class="btn btn-primary">
View quarantined emails
</a>
<div>
The email is automatically deleted in 7 days.
</div>

View File

@ -97,21 +97,22 @@ def test_dmarc_quarantine(flask_client):
assert f"{alias.email} has a new mail in quarantine" == notifications[0].title
def test_gmail_dmarc_softfail(flask_client):
user = create_random_user()
alias = Alias.create_new_random(user)
msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email})
envelope = Envelope()
envelope.mail_from = msg["from"]
envelope.rcpt_tos = [msg["to"]]
result = email_handler.handle(envelope, msg)
assert result == status.E215
email_logs = (
EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
.order_by(EmailLog.id.desc())
.all()
)
assert len(email_logs) == 1
email_log = email_logs[0]
assert email_log.blocked
assert email_log.refused_email_id
# todo: re-enable test when softfail is quarantined
# def test_gmail_dmarc_softfail(flask_client):
# user = create_random_user()
# alias = Alias.create_new_random(user)
# msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email})
# envelope = Envelope()
# envelope.mail_from = msg["from"]
# envelope.rcpt_tos = [msg["to"]]
# result = email_handler.handle(envelope, msg)
# assert result == status.E215
# email_logs = (
# EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
# .order_by(EmailLog.id.desc())
# .all()
# )
# assert len(email_logs) == 1
# email_log = email_logs[0]
# assert email_log.blocked
# assert email_log.refused_email_id