2022-04-06 12:51:04 +02:00
|
|
|
import random
|
2022-01-04 18:06:08 +01:00
|
|
|
from email.message import EmailMessage
|
2022-04-06 12:51:04 +02:00
|
|
|
from typing import List
|
2022-01-04 18:06:08 +01:00
|
|
|
|
2022-04-06 12:51:04 +02:00
|
|
|
import pytest
|
2022-03-18 15:44:07 +01:00
|
|
|
from aiosmtpd.smtp import Envelope
|
|
|
|
|
|
|
|
import email_handler
|
2022-04-21 08:59:46 +02:00
|
|
|
from app import config
|
2022-04-21 09:26:44 +02:00
|
|
|
from app.config import EMAIL_DOMAIN, ALERT_DMARC_FAILED_REPLY_PHASE
|
2022-04-06 12:51:04 +02:00
|
|
|
from app.db import Session
|
2022-03-18 15:44:07 +01:00
|
|
|
from app.email import headers, status
|
2022-03-30 16:09:17 +02:00
|
|
|
from app.email_utils import generate_verp_email
|
2022-10-10 10:00:19 +02:00
|
|
|
from app.mail_sender import mail_sender
|
2022-03-21 18:33:18 +01:00
|
|
|
from app.models import (
|
|
|
|
Alias,
|
|
|
|
AuthorizedAddress,
|
|
|
|
IgnoredEmail,
|
|
|
|
EmailLog,
|
|
|
|
Notification,
|
2022-03-30 16:29:38 +02:00
|
|
|
VerpType,
|
2022-04-06 12:51:04 +02:00
|
|
|
Contact,
|
|
|
|
SentAlert,
|
2022-03-21 18:33:18 +01:00
|
|
|
)
|
2023-01-25 13:17:20 +01:00
|
|
|
from app.utils import random_string, canonicalize_email
|
2022-01-04 18:06:08 +01:00
|
|
|
from email_handler import (
|
|
|
|
get_mailbox_from_mail_from,
|
|
|
|
should_ignore,
|
|
|
|
is_automatic_out_of_office,
|
|
|
|
)
|
2022-10-10 10:00:19 +02:00
|
|
|
from tests.utils import load_eml_file, create_new_user, random_email
|
2020-09-28 17:41:16 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_get_mailbox_from_mail_from(flask_client):
|
2022-04-15 16:59:44 +02:00
|
|
|
user = create_new_user()
|
2022-05-20 14:39:07 +02:00
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
Session.commit()
|
2020-09-28 17:41:16 +02:00
|
|
|
|
2022-04-15 16:59:44 +02:00
|
|
|
mb = get_mailbox_from_mail_from(user.email, alias)
|
|
|
|
assert mb.email == user.email
|
2020-09-28 17:41:16 +02:00
|
|
|
|
|
|
|
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", alias)
|
|
|
|
assert mb is None
|
|
|
|
|
|
|
|
# authorized address
|
|
|
|
AuthorizedAddress.create(
|
|
|
|
user_id=user.id,
|
|
|
|
mailbox_id=user.default_mailbox_id,
|
|
|
|
email="unauthorized@gmail.com",
|
|
|
|
commit=True,
|
|
|
|
)
|
|
|
|
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", alias)
|
2022-04-15 16:59:44 +02:00
|
|
|
assert mb.email == user.email
|
2021-06-22 17:52:24 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_should_ignore(flask_client):
|
|
|
|
assert should_ignore("mail_from", []) is False
|
|
|
|
|
|
|
|
assert not should_ignore("mail_from", ["rcpt_to"])
|
|
|
|
IgnoredEmail.create(mail_from="mail_from", rcpt_to="rcpt_to", commit=True)
|
|
|
|
assert should_ignore("mail_from", ["rcpt_to"])
|
2022-01-04 18:06:08 +01:00
|
|
|
|
|
|
|
|
|
|
|
def test_is_automatic_out_of_office():
|
|
|
|
msg = EmailMessage()
|
|
|
|
assert not is_automatic_out_of_office(msg)
|
|
|
|
|
2022-01-05 15:21:54 +01:00
|
|
|
msg[headers.AUTO_SUBMITTED] = "auto-replied"
|
2022-01-04 18:06:08 +01:00
|
|
|
assert is_automatic_out_of_office(msg)
|
|
|
|
|
2022-01-05 15:21:54 +01:00
|
|
|
del msg[headers.AUTO_SUBMITTED]
|
2022-01-04 18:06:08 +01:00
|
|
|
assert not is_automatic_out_of_office(msg)
|
|
|
|
|
2022-01-05 15:21:54 +01:00
|
|
|
msg[headers.AUTO_SUBMITTED] = "auto-generated"
|
2022-01-04 18:06:08 +01:00
|
|
|
assert is_automatic_out_of_office(msg)
|
2022-03-17 19:03:36 +01:00
|
|
|
|
2022-03-17 21:36:25 +01:00
|
|
|
|
2022-04-06 12:51:04 +02:00
|
|
|
def test_dmarc_forward_quarantine(flask_client):
|
2022-04-15 16:59:44 +02:00
|
|
|
user = create_new_user()
|
2022-03-21 12:03:11 +01:00
|
|
|
alias = Alias.create_new_random(user)
|
2022-03-21 12:31:25 +01:00
|
|
|
msg = load_eml_file("dmarc_quarantine.eml", {"alias_email": alias.email})
|
2022-03-18 15:44:07 +01:00
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = msg["from"]
|
|
|
|
envelope.rcpt_tos = [msg["to"]]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
2022-03-22 17:44:08 +01:00
|
|
|
assert result == status.E215
|
2022-03-18 15:44:07 +01:00
|
|
|
email_logs = (
|
|
|
|
EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
|
|
|
|
.order_by(EmailLog.id.desc())
|
|
|
|
.all()
|
|
|
|
)
|
|
|
|
assert len(email_logs) == 1
|
|
|
|
email_log = email_logs[0]
|
|
|
|
assert email_log.blocked
|
|
|
|
assert email_log.refused_email_id
|
2022-03-21 18:33:18 +01:00
|
|
|
notifications = Notification.filter_by(user_id=user.id).all()
|
|
|
|
assert len(notifications) == 1
|
|
|
|
assert f"{alias.email} has a new mail in quarantine" == notifications[0].title
|
2022-03-21 17:38:41 +01:00
|
|
|
|
|
|
|
|
2022-04-08 11:28:14 +02:00
|
|
|
def test_gmail_dmarc_softfail(flask_client):
|
2022-04-15 16:59:44 +02:00
|
|
|
user = create_new_user()
|
2022-04-08 11:28:14 +02:00
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email})
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = msg["from"]
|
|
|
|
envelope.rcpt_tos = [msg["to"]]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E200
|
2022-04-11 09:28:57 +02:00
|
|
|
# Enable when we can verify that the actual message sent has this content
|
|
|
|
# payload = msg.get_payload()
|
|
|
|
# assert payload.find("failed anti-phishing checks") > -1
|
2022-03-29 15:09:10 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_prevent_5xx_from_spf(flask_client):
|
2022-04-15 16:59:44 +02:00
|
|
|
user = create_new_user()
|
2022-03-29 15:09:10 +02:00
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
msg = load_eml_file(
|
|
|
|
"5xx_overwrite_spf.eml",
|
|
|
|
{"alias_email": alias.email, "spf_result": "R_SPF_FAIL"},
|
|
|
|
)
|
|
|
|
envelope = Envelope()
|
2022-03-30 16:29:38 +02:00
|
|
|
envelope.mail_from = msg["from"]
|
2022-03-30 17:20:49 +02:00
|
|
|
# Ensure invalid email log
|
2022-03-30 16:29:38 +02:00
|
|
|
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
|
2022-03-29 15:09:10 +02:00
|
|
|
result = email_handler.MailHandler()._handle(envelope, msg)
|
2022-03-30 16:29:38 +02:00
|
|
|
assert status.E216 == result
|
2022-03-29 15:09:10 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_preserve_5xx_with_valid_spf(flask_client):
|
2022-04-15 16:59:44 +02:00
|
|
|
user = create_new_user()
|
2022-03-29 15:09:10 +02:00
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
msg = load_eml_file(
|
|
|
|
"5xx_overwrite_spf.eml",
|
|
|
|
{"alias_email": alias.email, "spf_result": "R_SPF_ALLOW"},
|
|
|
|
)
|
|
|
|
envelope = Envelope()
|
2022-03-30 16:29:38 +02:00
|
|
|
envelope.mail_from = msg["from"]
|
2022-03-30 17:20:49 +02:00
|
|
|
# Ensure invalid email log
|
2022-03-30 16:29:38 +02:00
|
|
|
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
|
2022-03-29 15:09:10 +02:00
|
|
|
result = email_handler.MailHandler()._handle(envelope, msg)
|
2022-03-30 16:29:38 +02:00
|
|
|
assert status.E512 == result
|
2022-03-29 15:59:35 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_preserve_5xx_with_no_header(flask_client):
|
2022-04-15 16:59:44 +02:00
|
|
|
user = create_new_user()
|
2022-03-29 15:59:35 +02:00
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
msg = load_eml_file(
|
|
|
|
"no_spamd_header.eml",
|
|
|
|
{"alias_email": alias.email},
|
|
|
|
)
|
|
|
|
envelope = Envelope()
|
2022-03-30 16:29:38 +02:00
|
|
|
envelope.mail_from = msg["from"]
|
2022-03-30 17:20:49 +02:00
|
|
|
# Ensure invalid email log
|
2022-03-30 16:29:38 +02:00
|
|
|
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
|
2022-03-29 15:59:35 +02:00
|
|
|
result = email_handler.MailHandler()._handle(envelope, msg)
|
2022-03-30 16:29:38 +02:00
|
|
|
assert status.E512 == result
|
2022-04-06 12:51:04 +02:00
|
|
|
|
|
|
|
|
|
|
|
def generate_dmarc_result() -> List:
|
|
|
|
return ["DMARC_POLICY_QUARANTINE", "DMARC_POLICY_REJECT", "DMARC_POLICY_SOFTFAIL"]
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("dmarc_result", generate_dmarc_result())
|
2022-04-06 17:07:36 +02:00
|
|
|
def test_dmarc_reply_quarantine(flask_client, dmarc_result):
|
2022-04-15 16:59:44 +02:00
|
|
|
user = create_new_user()
|
2022-04-06 12:51:04 +02:00
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
Session.commit()
|
|
|
|
contact = Contact.create(
|
|
|
|
user_id=alias.user_id,
|
|
|
|
alias_id=alias.id,
|
|
|
|
website_email="random-{}@nowhere.net".format(int(random.random())),
|
|
|
|
name="Name {}".format(int(random.random())),
|
|
|
|
reply_email="random-{}@{}".format(random.random(), EMAIL_DOMAIN),
|
|
|
|
)
|
2022-04-06 17:07:36 +02:00
|
|
|
Session.commit()
|
2022-04-06 12:51:04 +02:00
|
|
|
msg = load_eml_file(
|
|
|
|
"dmarc_reply_check.eml",
|
|
|
|
{
|
|
|
|
"alias_email": alias.email,
|
|
|
|
"contact_email": contact.reply_email,
|
|
|
|
"dmarc_result": dmarc_result,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = msg["from"]
|
|
|
|
envelope.rcpt_tos = [msg["to"]]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E215
|
|
|
|
alerts = SentAlert.filter_by(
|
|
|
|
user_id=user.id, alert_type=ALERT_DMARC_FAILED_REPLY_PHASE
|
|
|
|
).all()
|
|
|
|
assert len(alerts) == 1
|
2022-04-15 10:16:03 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_add_alias_to_header_if_needed():
|
|
|
|
msg = EmailMessage()
|
2022-04-19 18:45:59 +02:00
|
|
|
user = create_new_user()
|
|
|
|
alias = Alias.filter_by(user_id=user.id).first()
|
2022-04-15 10:16:03 +02:00
|
|
|
|
|
|
|
assert msg[headers.TO] is None
|
|
|
|
|
|
|
|
email_handler.add_alias_to_header_if_needed(msg, alias)
|
|
|
|
|
|
|
|
assert msg[headers.TO] == alias.email
|
2022-04-19 18:45:59 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_append_alias_to_header_if_needed_existing_to():
|
|
|
|
msg = EmailMessage()
|
|
|
|
original_to = "noone@nowhere.no"
|
|
|
|
msg[headers.TO] = original_to
|
|
|
|
user = create_new_user()
|
|
|
|
alias = Alias.filter_by(user_id=user.id).first()
|
|
|
|
email_handler.add_alias_to_header_if_needed(msg, alias)
|
|
|
|
assert msg[headers.TO] == f"{original_to}, {alias.email}"
|
|
|
|
|
|
|
|
|
|
|
|
def test_avoid_add_to_header_already_present():
|
|
|
|
msg = EmailMessage()
|
|
|
|
user = create_new_user()
|
|
|
|
alias = Alias.filter_by(user_id=user.id).first()
|
|
|
|
msg[headers.TO] = alias.email
|
|
|
|
email_handler.add_alias_to_header_if_needed(msg, alias)
|
|
|
|
assert msg[headers.TO] == alias.email
|
|
|
|
|
|
|
|
|
|
|
|
def test_avoid_add_to_header_already_present_in_cc():
|
|
|
|
msg = EmailMessage()
|
|
|
|
create_new_user()
|
|
|
|
alias = Alias.first()
|
|
|
|
msg[headers.CC] = alias.email
|
|
|
|
email_handler.add_alias_to_header_if_needed(msg, alias)
|
|
|
|
assert msg[headers.TO] is None
|
|
|
|
assert msg[headers.CC] == alias.email
|
2022-04-21 08:59:46 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_email_sent_to_noreply(flask_client):
|
|
|
|
msg = EmailMessage()
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = "from@domain.test"
|
|
|
|
envelope.rcpt_tos = [config.NOREPLY]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E200
|
|
|
|
|
|
|
|
|
|
|
|
def test_email_sent_to_noreplies(flask_client):
|
|
|
|
msg = EmailMessage()
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = "from@domain.test"
|
|
|
|
config.NOREPLIES = ["other-no-reply@sl.test"]
|
|
|
|
|
|
|
|
envelope.rcpt_tos = ["other-no-reply@sl.test"]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E200
|
|
|
|
|
|
|
|
# NOREPLY isn't used anymore
|
|
|
|
envelope.rcpt_tos = [config.NOREPLY]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E515
|
2022-06-29 19:48:22 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_references_header(flask_client):
|
|
|
|
user = create_new_user()
|
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
msg = load_eml_file("reference_encoded.eml", {"alias_email": alias.email})
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = "somewhere@rainbow.com"
|
|
|
|
envelope.rcpt_tos = [alias.email]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E200
|
2022-10-10 10:00:19 +02:00
|
|
|
|
|
|
|
|
|
|
|
@mail_sender.store_emails_test_decorator
|
|
|
|
def test_replace_contacts_and_user_in_reply_phase(flask_client):
|
|
|
|
user = create_new_user()
|
|
|
|
user.replace_reverse_alias = True
|
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
Session.flush()
|
|
|
|
contact = Contact.create(
|
|
|
|
user_id=user.id,
|
|
|
|
alias_id=alias.id,
|
|
|
|
website_email=random_email(),
|
|
|
|
reply_email=f"{random.random()}@{EMAIL_DOMAIN}",
|
|
|
|
commit=True,
|
|
|
|
)
|
|
|
|
contact_real_mail = contact.website_email
|
|
|
|
contact2 = Contact.create(
|
|
|
|
user_id=user.id,
|
|
|
|
alias_id=alias.id,
|
|
|
|
website_email=random_email(),
|
|
|
|
reply_email=f"{random.random()}@{EMAIL_DOMAIN}",
|
|
|
|
commit=True,
|
|
|
|
)
|
|
|
|
contact2_real_mail = contact2.website_email
|
|
|
|
msg = load_eml_file(
|
|
|
|
"replacement_on_reply_phase.eml",
|
|
|
|
{
|
|
|
|
"contact_reply_email": contact.reply_email,
|
|
|
|
"other_contact_reply_email": contact2.reply_email,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = alias.mailbox.email
|
|
|
|
envelope.rcpt_tos = [contact.reply_email]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E200
|
|
|
|
sent_mails = mail_sender.get_stored_emails()
|
|
|
|
assert len(sent_mails) == 1
|
|
|
|
payload = sent_mails[0].msg.get_payload()[0].get_payload()
|
|
|
|
assert payload.find("Contact is {}".format(contact_real_mail)) > -1
|
|
|
|
assert payload.find("Other contact is {}".format(contact2_real_mail)) > -1
|
2023-01-25 13:17:20 +01:00
|
|
|
|
|
|
|
|
|
|
|
@mail_sender.store_emails_test_decorator
|
|
|
|
def test_send_email_from_non_canonical_address_on_reply(flask_client):
|
|
|
|
email_address = f"{random_string(10)}.suf@gmail.com"
|
|
|
|
user = create_new_user(email=canonicalize_email(email_address))
|
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
Session.commit()
|
|
|
|
contact = Contact.create(
|
|
|
|
user_id=user.id,
|
|
|
|
alias_id=alias.id,
|
|
|
|
website_email=random_email(),
|
|
|
|
reply_email=f"{random_string(10)}@{EMAIL_DOMAIN}",
|
|
|
|
commit=True,
|
|
|
|
)
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = email_address
|
|
|
|
envelope.rcpt_tos = [contact.reply_email]
|
|
|
|
msg = EmailMessage()
|
|
|
|
msg[headers.TO] = contact.reply_email
|
|
|
|
msg[headers.SUBJECT] = random_string()
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E200
|
|
|
|
sent_mails = mail_sender.get_stored_emails()
|
|
|
|
assert len(sent_mails) == 1
|
|
|
|
email_logs = EmailLog.filter_by(user_id=user.id).all()
|
|
|
|
assert len(email_logs) == 1
|
|
|
|
assert email_logs[0].alias_id == alias.id
|
|
|
|
assert email_logs[0].mailbox_id == user.default_mailbox_id
|
|
|
|
|
|
|
|
|
|
|
|
@mail_sender.store_emails_test_decorator
|
|
|
|
def test_send_email_from_non_canonical_matches_already_existing_user(flask_client):
|
|
|
|
email_address = f"{random_string(10)}.suf@gmail.com"
|
|
|
|
create_new_user(email=canonicalize_email(email_address))
|
|
|
|
user = create_new_user(email=email_address)
|
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
Session.commit()
|
|
|
|
contact = Contact.create(
|
|
|
|
user_id=user.id,
|
|
|
|
alias_id=alias.id,
|
|
|
|
website_email=random_email(),
|
|
|
|
reply_email=f"{random_string(10)}@{EMAIL_DOMAIN}",
|
|
|
|
commit=True,
|
|
|
|
)
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = email_address
|
|
|
|
envelope.rcpt_tos = [contact.reply_email]
|
|
|
|
msg = EmailMessage()
|
|
|
|
msg[headers.TO] = contact.reply_email
|
|
|
|
msg[headers.SUBJECT] = random_string()
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E200
|
|
|
|
sent_mails = mail_sender.get_stored_emails()
|
|
|
|
assert len(sent_mails) == 1
|
|
|
|
email_logs = EmailLog.filter_by(user_id=user.id).all()
|
|
|
|
assert len(email_logs) == 1
|
|
|
|
assert email_logs[0].alias_id == alias.id
|
|
|
|
assert email_logs[0].mailbox_id == user.default_mailbox_id
|
2023-03-13 13:01:00 +01:00
|
|
|
|
|
|
|
|
|
|
|
@mail_sender.store_emails_test_decorator
|
|
|
|
def test_break_loop_alias_as_mailbox(flask_client):
|
|
|
|
user = create_new_user()
|
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
user.default_mailbox.email = alias.email
|
|
|
|
Session.commit()
|
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = random_email()
|
|
|
|
envelope.rcpt_tos = [alias.email]
|
|
|
|
msg = EmailMessage()
|
|
|
|
msg[headers.TO] = alias.email
|
|
|
|
msg[headers.SUBJECT] = random_string()
|
|
|
|
result = email_handler.handle(envelope, msg)
|
|
|
|
assert result == status.E525
|