mirror of
https://github.com/simple-login/app.git
synced 2024-11-13 07:31:12 +01:00
Fix: allow receive email from non-canonical sources (#1545)
Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
parent
b487b01442
commit
0a197313ea
2 changed files with 78 additions and 13 deletions
|
@ -168,7 +168,7 @@ from app.pgp_utils import (
|
|||
sign_data,
|
||||
load_public_key_and_check,
|
||||
)
|
||||
from app.utils import sanitize_email
|
||||
from app.utils import sanitize_email, canonicalize_email
|
||||
from init_app import load_pgp_public_keys
|
||||
from server import create_light_app
|
||||
|
||||
|
@ -1384,21 +1384,26 @@ def get_mailbox_from_mail_from(mail_from: str, alias) -> Optional[Mailbox]:
|
|||
"""return the corresponding mailbox given the mail_from and alias
|
||||
Usually the mail_from=mailbox.email but it can also be one of the authorized address
|
||||
"""
|
||||
for mailbox in alias.mailboxes:
|
||||
if mailbox.email == mail_from:
|
||||
return mailbox
|
||||
|
||||
for authorized_address in mailbox.authorized_addresses:
|
||||
if authorized_address.email == mail_from:
|
||||
LOG.d(
|
||||
"Found an authorized address for %s %s %s",
|
||||
alias,
|
||||
mailbox,
|
||||
authorized_address,
|
||||
)
|
||||
def __check(email_address: str, alias: Alias) -> Optional[Mailbox]:
|
||||
for mailbox in alias.mailboxes:
|
||||
if mailbox.email == email_address:
|
||||
return mailbox
|
||||
|
||||
return None
|
||||
for authorized_address in mailbox.authorized_addresses:
|
||||
if authorized_address.email == email_address:
|
||||
LOG.d(
|
||||
"Found an authorized address for %s %s %s",
|
||||
alias,
|
||||
mailbox,
|
||||
authorized_address,
|
||||
)
|
||||
return mailbox
|
||||
return None
|
||||
|
||||
# We need to first check for the uncanonicalized version because we still have users in the db with the
|
||||
# email non canonicalized. So if it matches the already existing one use that, otherwise check the canonical one
|
||||
return __check(mail_from, alias) or __check(canonicalize_email(mail_from), alias)
|
||||
|
||||
|
||||
def handle_unknown_mailbox(
|
||||
|
|
|
@ -22,6 +22,7 @@ from app.models import (
|
|||
Contact,
|
||||
SentAlert,
|
||||
)
|
||||
from app.utils import random_string, canonicalize_email
|
||||
from email_handler import (
|
||||
get_mailbox_from_mail_from,
|
||||
should_ignore,
|
||||
|
@ -308,3 +309,62 @@ def test_replace_contacts_and_user_in_reply_phase(flask_client):
|
|||
payload = sent_mails[0].msg.get_payload()[0].get_payload()
|
||||
assert payload.find("Contact is {}".format(contact_real_mail)) > -1
|
||||
assert payload.find("Other contact is {}".format(contact2_real_mail)) > -1
|
||||
|
||||
|
||||
@mail_sender.store_emails_test_decorator
|
||||
def test_send_email_from_non_canonical_address_on_reply(flask_client):
|
||||
email_address = f"{random_string(10)}.suf@gmail.com"
|
||||
user = create_new_user(email=canonicalize_email(email_address))
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
contact = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=random_email(),
|
||||
reply_email=f"{random_string(10)}@{EMAIL_DOMAIN}",
|
||||
commit=True,
|
||||
)
|
||||
envelope = Envelope()
|
||||
envelope.mail_from = email_address
|
||||
envelope.rcpt_tos = [contact.reply_email]
|
||||
msg = EmailMessage()
|
||||
msg[headers.TO] = contact.reply_email
|
||||
msg[headers.SUBJECT] = random_string()
|
||||
result = email_handler.handle(envelope, msg)
|
||||
assert result == status.E200
|
||||
sent_mails = mail_sender.get_stored_emails()
|
||||
assert len(sent_mails) == 1
|
||||
email_logs = EmailLog.filter_by(user_id=user.id).all()
|
||||
assert len(email_logs) == 1
|
||||
assert email_logs[0].alias_id == alias.id
|
||||
assert email_logs[0].mailbox_id == user.default_mailbox_id
|
||||
|
||||
|
||||
@mail_sender.store_emails_test_decorator
|
||||
def test_send_email_from_non_canonical_matches_already_existing_user(flask_client):
|
||||
email_address = f"{random_string(10)}.suf@gmail.com"
|
||||
create_new_user(email=canonicalize_email(email_address))
|
||||
user = create_new_user(email=email_address)
|
||||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
contact = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=random_email(),
|
||||
reply_email=f"{random_string(10)}@{EMAIL_DOMAIN}",
|
||||
commit=True,
|
||||
)
|
||||
envelope = Envelope()
|
||||
envelope.mail_from = email_address
|
||||
envelope.rcpt_tos = [contact.reply_email]
|
||||
msg = EmailMessage()
|
||||
msg[headers.TO] = contact.reply_email
|
||||
msg[headers.SUBJECT] = random_string()
|
||||
result = email_handler.handle(envelope, msg)
|
||||
assert result == status.E200
|
||||
sent_mails = mail_sender.get_stored_emails()
|
||||
assert len(sent_mails) == 1
|
||||
email_logs = EmailLog.filter_by(user_id=user.id).all()
|
||||
assert len(email_logs) == 1
|
||||
assert email_logs[0].alias_id == alias.id
|
||||
assert email_logs[0].mailbox_id == user.default_mailbox_id
|
||||
|
|
Loading…
Reference in a new issue