Added spoofed email test

This commit is contained in:
Adrià Casajús 2022-03-18 15:44:07 +01:00
parent c9cbaeb460
commit 44dd06fabf
No known key found for this signature in database
GPG Key ID: F0033226A5AFC9B9
6 changed files with 108 additions and 18 deletions

View File

@ -13,6 +13,7 @@ MIME_VERSION = "Mime-Version"
REPLY_TO = "Reply-To"
RECEIVED = "Received"
RSPAM_QUEUE_ID = "X-Rspamd-Queue-Id"
SPAMD_RESULT = "X-Spamd-Result"
CC = "Cc"
DKIM_SIGNATURE = "DKIM-Signature"
X_SPAM_STATUS = "X-Spam-Status"

View File

@ -536,6 +536,55 @@ def handle_email_sent_to_ourself(alias, from_addr: str, msg: Message, user):
)
def apply_dmarc_policy(alias: Alias, contact: Contact, msg: Message) -> Optional[str]:
spam_result = msg.get_all(headers.SPAMD_RESULT)
if not spam_result:
return False
spam_entries = [entry.strip() for entry in spam_result[-1].split("\n")]
for iPos in range(len(spam_entries)):
sep = spam_entries[iPos].find("(")
if sep > -1:
spam_entries[iPos] = spam_entries[iPos][:sep]
if "DMARC_POLICY_REJECT" in spam_entries:
return status.E519
if (
"DMARC_POLICY_SOFTFAIL" in spam_entries
or "DMARC_POLICY_QUARANTINE" in spam_entries
):
add_or_replace_header(msg, headers.SL_DIRECTION, "Forward")
msg[headers.SL_ENVELOPE_TO] = alias.email
add_or_replace_header(msg, "From", contact.new_addr())
# replace CC & To emails by reverse-alias for all emails that are not alias
try:
replace_header_when_forward(msg, alias, "Cc")
replace_header_when_forward(msg, alias, "To")
except CannotCreateContactForReverseAlias:
Session.commit()
raise
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
EmailLog.create(
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
alias_id=alias.id,
message_id=str(msg[headers.MESSAGE_ID]),
refused_email_id=refused_email.id,
is_spam=True,
blocked=True,
commit=True,
)
return status.E519
return None
def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str]]:
"""return an array of SMTP status (is_success, smtp_status)
is_success indicates whether an email has been delivered and
@ -616,6 +665,11 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
# do not return 5** to allow user to receive emails later when alias is enabled or contact is unblocked
return [(True, res_status)]
# Check if we need to reject or quarantine based on dmarc
dmarc_delivery_status = apply_dmarc_policy(alias, contact, msg)
if dmarc_delivery_status is not None:
return [(False, dmarc_delivery_status)]
ret = []
mailboxes = alias.mailboxes

View File

@ -2,7 +2,7 @@ from app.api.serializer import get_alias_infos_with_pagination_v3
from app.config import PAGE_LIMIT
from app.db import Session
from app.models import User, Alias, Mailbox, Contact
from tests.utils import create_user
from tests.utils import create_random_user
def test_get_alias_infos_with_pagination_v3(flask_client):
@ -147,7 +147,7 @@ def test_get_alias_infos_with_pagination_v3_no_duplicate_when_empty_contact(
"""
Make sure an alias is returned once when it has 2 contacts that have no email log activity
"""
user = create_user(flask_client)
user = create_random_user()
alias = Alias.first()
Contact.create(

View File

@ -8,7 +8,7 @@ Received: from relay.somewhere.net (relay.somewhere.net [34.59.200.130])
by mx1.sldev.ovh (Postfix) with ESMTPS id 6D8C13F069
for <wehrman_mannequin@sldev.ovh>; Thu, 17 Mar 2022 16:50:20 +0000 (UTC)
Date: Thu, 17 Mar 2022 16:50:18 +0000
To: wehrman_mannequin@sldev.ovh
To: {{ alias_email }}
From: spoofedemailsource@gmail.com
Subject: test Thu, 17 Mar 2022 16:50:18 +0000
Message-Id: <20220317165018.000191@somewhere-5488dd4b6b-7crp6>

View File

@ -2,14 +2,17 @@ import email
import os.path
from email.message import EmailMessage
from app.email import headers
from app.models import User, Alias, AuthorizedAddress, IgnoredEmail
from aiosmtpd.smtp import Envelope
import email_handler
from app.email import headers, status
from app.models import User, Alias, AuthorizedAddress, IgnoredEmail, EmailLog
from email_handler import (
get_mailbox_from_mail_from,
should_ignore,
is_automatic_out_of_office,
)
from tests.utils import load_eml_file
from tests.utils import load_eml_file, create_random_user, create_random_alias
def test_get_mailbox_from_mail_from(flask_client):
@ -66,9 +69,21 @@ def test_is_automatic_out_of_office():
assert is_automatic_out_of_office(msg)
def test_process_spoofed():
msg = load_eml_file("gmail_spoof.eml")
breakpoint()
a = msg["a"]
b = 1
c = 2
def test_process_spoofed(flask_client):
user = create_random_user()
alias = create_random_alias(user)
msg = load_eml_file("gmail_spoof.eml", {"alias_email": alias.email})
envelope = Envelope()
envelope.mail_from = msg["from"]
envelope.rcpt_tos = [msg["to"]]
result = email_handler.handle(envelope, msg)
assert result == status.E519
email_logs = (
EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
.order_by(EmailLog.id.desc())
.all()
)
assert len(email_logs) == 1
email_log = email_logs[0]
assert email_log.blocked
assert email_log.refused_email_id

View File

@ -1,11 +1,14 @@
import email
import json
import os
import random
import string
from email.message import EmailMessage
import jinja2
from flask import url_for
from app.models import User
from app.models import User, Alias
def login(flask_client) -> User:
@ -30,10 +33,14 @@ def login(flask_client) -> User:
return user
def create_user(flask_client) -> User:
# create user, user is activated
def random_token(length: int = 10) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
def create_random_user() -> User:
email = "{}@{}.com".format(random_token(), random_token())
return User.create(
email="a@b.c",
email=email,
password="password",
name="Test User",
activated=True,
@ -41,14 +48,27 @@ def create_user(flask_client) -> User:
)
def create_random_alias(user: User) -> Alias:
alias_email = "{}@{}.com".format(random_token(), random_token())
alias = Alias.create(
user_id=user.id,
email=alias_email,
mailbox_id=user.default_mailbox_id,
commit=True,
)
return alias
def pretty(d):
"""pretty print as json"""
print(json.dumps(d, indent=2))
def load_eml_file(filename: str) -> EmailMessage:
def load_eml_file(filename: str, template_values={}) -> EmailMessage:
emails_dir = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "example_emls"
)
fullpath = os.path.join(emails_dir, filename)
return email.message_from_file(open(fullpath))
template = jinja2.Template(open(fullpath).read())
rendered = template.render(**template_values)
return email.message_from_string(rendered)