app-MAIL-temp/tests/test_email_handler.py

166 lines
5 KiB
Python
Raw Normal View History

from email.message import EmailMessage
2022-03-18 15:44:07 +01:00
from aiosmtpd.smtp import Envelope
import email_handler
from app.email import headers, status
2022-03-30 16:09:17 +02:00
from app.email_utils import generate_verp_email
from app.models import (
User,
Alias,
AuthorizedAddress,
IgnoredEmail,
EmailLog,
2022-03-30 16:29:38 +02:00
Notification,
VerpType,
)
from email_handler import (
get_mailbox_from_mail_from,
should_ignore,
is_automatic_out_of_office,
)
2022-03-21 12:03:11 +01:00
from tests.utils import load_eml_file, create_random_user
2020-09-28 17:41:16 +02:00
def test_get_mailbox_from_mail_from(flask_client):
user = User.create(
email="a@b.c",
password="password",
name="Test User",
activated=True,
commit=True,
)
alias = Alias.create(
user_id=user.id,
email="first@d1.test",
mailbox_id=user.default_mailbox_id,
commit=True,
)
mb = get_mailbox_from_mail_from("a@b.c", alias)
assert mb.email == "a@b.c"
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", alias)
assert mb is None
# authorized address
AuthorizedAddress.create(
user_id=user.id,
mailbox_id=user.default_mailbox_id,
email="unauthorized@gmail.com",
commit=True,
)
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", alias)
assert mb.email == "a@b.c"
def test_should_ignore(flask_client):
assert should_ignore("mail_from", []) is False
assert not should_ignore("mail_from", ["rcpt_to"])
IgnoredEmail.create(mail_from="mail_from", rcpt_to="rcpt_to", commit=True)
assert should_ignore("mail_from", ["rcpt_to"])
def test_is_automatic_out_of_office():
msg = EmailMessage()
assert not is_automatic_out_of_office(msg)
msg[headers.AUTO_SUBMITTED] = "auto-replied"
assert is_automatic_out_of_office(msg)
del msg[headers.AUTO_SUBMITTED]
assert not is_automatic_out_of_office(msg)
msg[headers.AUTO_SUBMITTED] = "auto-generated"
assert is_automatic_out_of_office(msg)
2022-03-17 19:03:36 +01:00
2022-03-17 21:36:25 +01:00
2022-03-21 12:31:25 +01:00
def test_dmarc_quarantine(flask_client):
2022-03-18 15:44:07 +01:00
user = create_random_user()
2022-03-21 12:03:11 +01:00
alias = Alias.create_new_random(user)
2022-03-21 12:31:25 +01:00
msg = load_eml_file("dmarc_quarantine.eml", {"alias_email": alias.email})
2022-03-18 15:44:07 +01:00
envelope = Envelope()
envelope.mail_from = msg["from"]
envelope.rcpt_tos = [msg["to"]]
result = email_handler.handle(envelope, msg)
2022-03-22 17:44:08 +01:00
assert result == status.E215
2022-03-18 15:44:07 +01:00
email_logs = (
EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
.order_by(EmailLog.id.desc())
.all()
)
assert len(email_logs) == 1
email_log = email_logs[0]
assert email_log.blocked
assert email_log.refused_email_id
notifications = Notification.filter_by(user_id=user.id).all()
assert len(notifications) == 1
assert f"{alias.email} has a new mail in quarantine" == notifications[0].title
2022-03-25 18:12:33 +01:00
# todo: re-enable test when softfail is quarantined
# def test_gmail_dmarc_softfail(flask_client):
# user = create_random_user()
# alias = Alias.create_new_random(user)
# msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email})
# envelope = Envelope()
# envelope.mail_from = msg["from"]
# envelope.rcpt_tos = [msg["to"]]
# result = email_handler.handle(envelope, msg)
# assert result == status.E215
# email_logs = (
# EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
# .order_by(EmailLog.id.desc())
# .all()
# )
# assert len(email_logs) == 1
# email_log = email_logs[0]
# assert email_log.blocked
# assert email_log.refused_email_id
def test_prevent_5xx_from_spf(flask_client):
user = create_random_user()
alias = Alias.create_new_random(user)
msg = load_eml_file(
"5xx_overwrite_spf.eml",
{"alias_email": alias.email, "spf_result": "R_SPF_FAIL"},
)
envelope = Envelope()
2022-03-30 16:29:38 +02:00
envelope.mail_from = msg["from"]
2022-03-30 17:20:49 +02:00
# Ensure invalid email log
2022-03-30 16:29:38 +02:00
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
result = email_handler.MailHandler()._handle(envelope, msg)
2022-03-30 16:29:38 +02:00
assert status.E216 == result
def test_preserve_5xx_with_valid_spf(flask_client):
user = create_random_user()
alias = Alias.create_new_random(user)
msg = load_eml_file(
"5xx_overwrite_spf.eml",
{"alias_email": alias.email, "spf_result": "R_SPF_ALLOW"},
)
envelope = Envelope()
2022-03-30 16:29:38 +02:00
envelope.mail_from = msg["from"]
2022-03-30 17:20:49 +02:00
# Ensure invalid email log
2022-03-30 16:29:38 +02:00
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
result = email_handler.MailHandler()._handle(envelope, msg)
2022-03-30 16:29:38 +02:00
assert status.E512 == result
2022-03-29 15:59:35 +02:00
def test_preserve_5xx_with_no_header(flask_client):
user = create_random_user()
alias = Alias.create_new_random(user)
msg = load_eml_file(
"no_spamd_header.eml",
{"alias_email": alias.email},
)
envelope = Envelope()
2022-03-30 16:29:38 +02:00
envelope.mail_from = msg["from"]
2022-03-30 17:20:49 +02:00
# Ensure invalid email log
2022-03-30 16:29:38 +02:00
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
2022-03-29 15:59:35 +02:00
result = email_handler.MailHandler()._handle(envelope, msg)
2022-03-30 16:29:38 +02:00
assert status.E512 == result