Merge pull request #899 from simple-login/add-alias-to-to-header

add alias to To: header if it isn't included in To and Cc header
This commit is contained in:
Son Nguyen Kim 2022-04-20 09:10:11 +02:00 committed by GitHub
commit c14e01839e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 158 additions and 83 deletions

View File

@ -388,6 +388,31 @@ def replace_header_when_forward(msg: Message, alias: Alias, header: str):
delete_header(msg, header)
def add_alias_to_header_if_needed(msg, alias):
"""
During the forward phase, add alias to To: header if it isn't included in To and Cc header
It can happen that the alias isn't included in To: and CC: header, for example if this is a BCC email
:return:
"""
to_header = str(msg[headers.TO]) if msg[headers.TO] else None
cc_header = str(msg[headers.CC]) if msg[headers.CC] else None
# nothing to do
if to_header and alias.email in to_header:
return
# nothing to do
if cc_header and alias.email in cc_header:
return
LOG.d(f"add {alias} to To: header {to_header}")
if to_header:
add_or_replace_header(msg, headers.TO, f"{to_header},{alias.email}")
else:
add_or_replace_header(msg, headers.TO, alias.email)
def replace_header_when_reply(msg: Message, alias: Alias, header: str):
"""
Replace CC or To Reply emails by original emails
@ -845,14 +870,17 @@ def forward_email_to_mailbox(
# replace CC & To emails by reverse-alias for all emails that are not alias
try:
replace_header_when_forward(msg, alias, "Cc")
replace_header_when_forward(msg, alias, "To")
replace_header_when_forward(msg, alias, headers.CC)
replace_header_when_forward(msg, alias, headers.TO)
except CannotCreateContactForReverseAlias:
LOG.d("CannotCreateContactForReverseAlias error, delete %s", email_log)
EmailLog.delete(email_log.id)
Session.commit()
raise
# add alias to To: header if it isn't included in To and Cc header
add_alias_to_header_if_needed(msg, alias)
# add List-Unsubscribe header
if user.one_click_unsubscribe_block_sender:
unsubscribe_link, via_email = alias.unsubscribe_link(contact)

View File

@ -393,9 +393,9 @@ def test_update_disable_pgp(flask_client):
def test_update_pinned(flask_client):
login(flask_client)
user = login(flask_client)
alias = Alias.first()
alias = Alias.filter_by(user_id=user.id).first()
assert not alias.pinned
r = flask_client.patch(
@ -490,8 +490,8 @@ def test_create_contact_route(flask_client):
def test_create_contact_route_empty_contact_address(flask_client):
login(flask_client)
alias = Alias.first()
user = login(flask_client)
alias = Alias.filter_by(user_id=user.id).first()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
@ -503,8 +503,8 @@ def test_create_contact_route_empty_contact_address(flask_client):
def test_create_contact_route_invalid_contact_email(flask_client):
login(flask_client)
alias = Alias.first()
user = login(flask_client)
alias = Alias.filter_by(user_id=user.id).first()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),

View File

@ -6,7 +6,7 @@ from app.dashboard.views.custom_alias import signer
from app.db import Session
from app.models import Alias, CustomDomain, Mailbox, AliasUsedOn
from app.utils import random_word
from tests.utils import login
from tests.utils import login, random_domain
def test_v2(flask_client):
@ -126,11 +126,12 @@ def test_custom_domain_alias(flask_client):
user = login(flask_client)
# create a custom domain
domain = random_domain()
CustomDomain.create(
user_id=user.id, domain="ab.cd", ownership_verified=True, commit=True
user_id=user.id, domain=domain, ownership_verified=True, commit=True
)
signed_suffix = signer.sign("@ab.cd").decode()
signed_suffix = signer.sign(f"@{domain}").decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
@ -142,7 +143,7 @@ def test_custom_domain_alias(flask_client):
)
assert r.status_code == 201
assert r.json["alias"] == "prefix@ab.cd"
assert r.json["alias"] == f"prefix@{domain}"
def test_wrongly_formatted_payload(flask_client):
@ -212,11 +213,12 @@ def test_cannot_create_alias_in_trash(flask_client):
user = login(flask_client)
# create a custom domain
domain = random_domain()
CustomDomain.create(
user_id=user.id, domain="ab.cd", ownership_verified=True, commit=True
user_id=user.id, domain=domain, ownership_verified=True, commit=True
)
signed_suffix = signer.sign("@ab.cd").decode()
signed_suffix = signer.sign(f"@{domain}").decode()
r = flask_client.post(
"/api/v3/alias/custom/new",
@ -228,10 +230,10 @@ def test_cannot_create_alias_in_trash(flask_client):
)
assert r.status_code == 201
assert r.json["alias"] == f"prefix@ab.cd"
assert r.json["alias"] == f"prefix@{domain}"
# delete alias: it's going to be moved to ab.cd trash
alias = Alias.get_by(email="prefix@ab.cd")
# delete alias: it's going to be moved to domain trash
alias = Alias.get_by(email=f"prefix@{domain}")
assert alias.custom_domain_id
delete_alias(alias, user)
@ -251,11 +253,12 @@ def test_too_many_requests(flask_client):
user = login(flask_client)
# create a custom domain
CustomDomain.create(user_id=user.id, domain="ab.cd", verified=True, commit=True)
domain = random_domain()
CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True)
# can't create more than 5 aliases in 1 minute
for i in range(7):
signed_suffix = signer.sign("@ab.cd").decode()
signed_suffix = signer.sign(f"@{domain}").decode()
r = flask_client.post(
"/api/v3/alias/custom/new",

View File

@ -5,7 +5,7 @@ from flask import url_for, g
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
from app.db import Session
from app.models import Alias, CustomDomain, AliasUsedOn
from tests.utils import login
from tests.utils import login, random_domain
def test_with_hostname(flask_client):
@ -40,8 +40,9 @@ def test_with_hostname(flask_client):
def test_with_custom_domain(flask_client):
user = login(flask_client)
domain = random_domain()
CustomDomain.create(
user_id=user.id, domain="ab.cd", ownership_verified=True, commit=True
user_id=user.id, domain=domain, ownership_verified=True, commit=True
)
r = flask_client.post(
@ -49,8 +50,8 @@ def test_with_custom_domain(flask_client):
)
assert r.status_code == 201
assert r.json["alias"] == "test@ab.cd"
assert Alias.count() == 2
assert r.json["alias"] == f"test@{domain}"
assert Alias.filter_by(user_id=user.id).count() == 2
# call the endpoint again, should return the same alias
r = flask_client.post(
@ -58,9 +59,9 @@ def test_with_custom_domain(flask_client):
)
assert r.status_code == 201
assert r.json["alias"] == "test@ab.cd"
assert r.json["alias"] == f"test@{domain}"
# no new alias is created
assert Alias.count() == 2
assert Alias.filter_by(user_id=user.id).count() == 2
def test_without_hostname(flask_client):

View File

@ -13,7 +13,7 @@ def test_get_alias_infos_with_pagination_v3(flask_client):
assert len(alias_infos) == 1
alias_info = alias_infos[0]
alias = Alias.first()
alias = Alias.filter_by(user_id=user.id).first()
assert alias_info.alias == alias
assert alias_info.mailbox == user.default_mailbox
assert alias_info.mailboxes == [user.default_mailbox]
@ -28,7 +28,7 @@ def test_get_alias_infos_with_pagination_v3_query_alias_email(flask_client):
"""test the query on the alias email"""
user = create_new_user()
alias = Alias.first()
alias = Alias.filter_by(user_id=user.id).first()
alias_infos = get_alias_infos_with_pagination_v3(user, query=alias.email)
assert len(alias_infos) == 1
@ -40,7 +40,7 @@ def test_get_alias_infos_with_pagination_v3_query_alias_email(flask_client):
def test_get_alias_infos_with_pagination_v3_query_alias_mailbox(flask_client):
"""test the query on the alias mailbox email"""
user = create_new_user()
alias = Alias.first()
alias = Alias.filter_by(user_id=user.id).first()
alias_infos = get_alias_infos_with_pagination_v3(user, mailbox_id=alias.mailbox_id)
assert len(alias_infos) == 1
@ -48,7 +48,7 @@ def test_get_alias_infos_with_pagination_v3_query_alias_mailbox(flask_client):
def test_get_alias_infos_with_pagination_v3_query_alias_mailboxes(flask_client):
"""test the query on the alias additional mailboxes"""
user = create_new_user()
alias = Alias.first()
alias = Alias.filter_by(user_id=user.id).first()
mb = Mailbox.create(user_id=user.id, email="mb@gmail.com")
alias._mailboxes.append(mb)
Session.commit()
@ -64,7 +64,7 @@ def test_get_alias_infos_with_pagination_v3_query_alias_note(flask_client):
"""test the query on the alias note"""
user = create_new_user()
alias = Alias.first()
alias = Alias.filter_by(user_id=user.id).first()
alias.note = "test note"
Session.commit()
@ -76,7 +76,7 @@ def test_get_alias_infos_with_pagination_v3_query_alias_name(flask_client):
"""test the query on the alias name"""
user = create_new_user()
alias = Alias.first()
alias = Alias.filter_by(user_id=user.id).first()
alias.name = "Test Name"
Session.commit()
@ -134,7 +134,7 @@ def test_get_alias_infos_pinned_alias(flask_client):
for _ in range(2 * PAGE_LIMIT):
Alias.create_new_random(user)
first_alias = Alias.order_by(Alias.id).first()
first_alias = Alias.filter_by(user_id=user.id).order_by(Alias.id).first()
# should return PAGE_LIMIT alias
alias_infos = get_alias_infos_with_pagination_v3(user)

View File

@ -4,7 +4,7 @@ from app.models import (
SenderFormatEnum,
AliasSuffixEnum,
)
from tests.utils import login
from tests.utils import login, random_domain
def test_get_setting(flask_client):
@ -76,7 +76,8 @@ def test_update_settings_sender_format(flask_client):
def test_get_setting_domains(flask_client):
user = login(flask_client)
CustomDomain.create(user_id=user.id, domain="ab.cd", verified=True, commit=True)
domain = random_domain()
CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True)
r = flask_client.get("/api/setting/domains")
assert r.status_code == 200
@ -84,7 +85,8 @@ def test_get_setting_domains(flask_client):
def test_get_setting_domains_v2(flask_client):
user = login(flask_client)
CustomDomain.create(user_id=user.id, domain="ab.cd", verified=True, commit=True)
domain = random_domain()
CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True)
r = flask_client.get("/api/v2/setting/domains")
assert r.status_code == 200

View File

@ -20,16 +20,3 @@ def test_unactivated_user_login(flask_client):
b"Please check your inbox for the activation email. You can also have this email re-sent"
in r.data
)
def test_activated_user_login(flask_client):
user = create_new_user()
r = flask_client.post(
url_for("auth.login"),
data={"email": user.email, "password": "password"},
follow_redirects=True,
)
assert r.status_code == 200
assert b"/auth/logout" in r.data

View File

@ -8,10 +8,10 @@ from tests.utils import login
def test_add_contact_success(flask_client):
login(flask_client)
alias = Alias.first()
user = login(flask_client)
alias = Alias.filter(Alias.user_id == user.id).first()
assert Contact.count() == 0
assert Contact.filter_by(user_id=user.id).count() == 0
# <<< Create a new contact >>>
flask_client.post(
@ -23,8 +23,8 @@ def test_add_contact_success(flask_client):
follow_redirects=True,
)
# a new contact is added
assert Contact.count() == 1
contact = Contact.first()
assert Contact.filter_by(user_id=user.id).count() == 1
contact = Contact.filter_by(user_id=user.id).first()
assert contact.website_email == "abcd@gmail.com"
# <<< Create a new contact using a full email format >>>
@ -37,8 +37,10 @@ def test_add_contact_success(flask_client):
follow_redirects=True,
)
# a new contact is added
assert Contact.count() == 2
contact = Contact.filter(Contact.id != contact.id).first()
assert Contact.filter_by(user_id=user.id).count() == 2
contact = (
Contact.filter_by(user_id=user.id).filter(Contact.id != contact.id).first()
)
assert contact.website_email == "another@gmail.com"
assert contact.name == "First Last"
@ -53,5 +55,5 @@ def test_add_contact_success(flask_client):
)
# no new contact is added
assert Contact.count() == 2
assert Contact.filter_by(user_id=user.id).count() == 2
assert "Invalid email format. Email must be either email@example.com" in str(r.data)

View File

@ -19,7 +19,7 @@ from app.models import (
SLDomain,
)
from app.utils import random_word
from tests.utils import login
from tests.utils import login, random_domain
def test_add_alias_success(flask_client):
@ -258,14 +258,15 @@ def test_add_alias_in_global_trash(flask_client):
def test_add_alias_in_custom_domain_trash(flask_client):
user = login(flask_client)
domain = random_domain()
custom_domain = CustomDomain.create(
user_id=user.id, domain="ab.cd", ownership_verified=True, commit=True
user_id=user.id, domain=domain, ownership_verified=True, commit=True
)
# delete a custom-domain alias: alias should go the DomainDeletedAlias
alias = Alias.create(
user_id=user.id,
email="prefix@ab.cd",
email=f"prefix@{domain}",
custom_domain_id=custom_domain.id,
mailbox_id=user.default_mailbox_id,
commit=True,
@ -276,7 +277,7 @@ def test_add_alias_in_custom_domain_trash(flask_client):
assert DomainDeletedAlias.count() == 1
# create the same alias, should return error
suffix = "@ab.cd"
suffix = f"@{domain}"
alias_suffix = AliasSuffix(
is_custom=False, suffix=suffix, is_premium=False, domain=EMAIL_DOMAIN
@ -302,11 +303,12 @@ def test_too_many_requests(flask_client):
user = login(flask_client)
# create a custom domain
CustomDomain.create(user_id=user.id, domain="ab.cd", verified=True, commit=True)
domain = random_domain()
CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True)
# can't create more than 5 aliases in 1 minute
for i in range(7):
signed_suffix = signer.sign("@ab.cd").decode()
signed_suffix = signer.sign(f"@{domain}").decode()
r = flask_client.post(
url_for("dashboard.custom_alias"),

View File

@ -3,7 +3,7 @@ from flask import url_for
from app.db import Session
from app.email_utils import get_email_domain_part
from app.models import Mailbox
from tests.utils import login
from tests.utils import login, random_domain
def test_add_domain_success(flask_client):
@ -11,14 +11,15 @@ def test_add_domain_success(flask_client):
user.lifetime = True
Session.commit()
domain = random_domain()
r = flask_client.post(
url_for("dashboard.custom_domain"),
data={"form-name": "create", "domain": "ab.cd"},
data={"form-name": "create", "domain": domain},
follow_redirects=True,
)
assert r.status_code == 200
assert b"New domain ab.cd is created" in r.data
assert f"New domain {domain} is created".encode() in r.data
def test_add_domain_same_as_user_email(flask_client):

View File

@ -7,8 +7,8 @@ from tests.utils import login
def test_create_random_alias_success(flask_client):
login(flask_client)
assert Alias.count() == 1
user = login(flask_client)
assert Alias.filter(Alias.user_id == user.id).count() == 1
r = flask_client.post(
url_for("dashboard.index"),
@ -16,7 +16,7 @@ def test_create_random_alias_success(flask_client):
follow_redirects=True,
)
assert r.status_code == 200
assert Alias.count() == 2
assert Alias.filter(Alias.user_id == user.id).count() == 2
def test_too_many_requests(flask_client):

View File

@ -96,7 +96,7 @@ def test_create_subdomain_out_of_quota(flask_client):
commit=True,
)
assert CustomDomain.count() == MAX_NB_SUBDOMAIN
assert CustomDomain.filter_by(user_id=user.id).count() == MAX_NB_SUBDOMAIN
flask_client.post(
url_for("dashboard.subdomain_route"),
@ -105,11 +105,11 @@ def test_create_subdomain_out_of_quota(flask_client):
)
# no new subdomain is created
assert CustomDomain.count() == MAX_NB_SUBDOMAIN
assert CustomDomain.filter_by(user_id=user.id).count() == MAX_NB_SUBDOMAIN
def test_create_subdomain_invalid(flask_client):
login(flask_client)
user = login(flask_client)
sl_domain = setup_sl_domain()
# subdomain can't end with dash (-)
@ -118,7 +118,7 @@ def test_create_subdomain_invalid(flask_client):
data={"form-name": "create", "subdomain": "test-", "domain": sl_domain.domain},
follow_redirects=True,
)
assert CustomDomain.count() == 0
assert CustomDomain.filter_by(user_id=user.id).count() == 0
# subdomain can't contain underscore (_)
flask_client.post(
@ -130,7 +130,7 @@ def test_create_subdomain_invalid(flask_client):
},
follow_redirects=True,
)
assert CustomDomain.count() == 0
assert CustomDomain.filter_by(user_id=user.id).count() == 0
# subdomain must have at least 3 characters
flask_client.post(
@ -138,4 +138,4 @@ def test_create_subdomain_invalid(flask_client):
data={"form-name": "create", "subdomain": "te", "domain": sl_domain.domain},
follow_redirects=True,
)
assert CustomDomain.count() == 0
assert CustomDomain.filter_by(user_id=user.id).count() == 0

View File

@ -1,3 +1,4 @@
from app.db import Session
from app.models import (
Alias,
Contact,
@ -6,8 +7,9 @@ from tests.utils import login
def test_disable_alias(flask_client):
login(flask_client)
alias = Alias.first()
user = login(flask_client)
alias = Alias.create_new_random(user)
Session.commit()
assert alias.enabled
flask_client.post(f"/dashboard/unsubscribe/{alias.id}")

View File

@ -192,3 +192,44 @@ def test_dmarc_reply_quarantine(flask_client, dmarc_result):
user_id=user.id, alert_type=ALERT_DMARC_FAILED_REPLY_PHASE
).all()
assert len(alerts) == 1
def test_add_alias_to_header_if_needed():
msg = EmailMessage()
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
assert msg[headers.TO] is None
email_handler.add_alias_to_header_if_needed(msg, alias)
assert msg[headers.TO] == alias.email
def test_append_alias_to_header_if_needed_existing_to():
msg = EmailMessage()
original_to = "noone@nowhere.no"
msg[headers.TO] = original_to
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
email_handler.add_alias_to_header_if_needed(msg, alias)
assert msg[headers.TO] == f"{original_to}, {alias.email}"
def test_avoid_add_to_header_already_present():
msg = EmailMessage()
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
msg[headers.TO] = alias.email
email_handler.add_alias_to_header_if_needed(msg, alias)
assert msg[headers.TO] == alias.email
def test_avoid_add_to_header_already_present_in_cc():
msg = EmailMessage()
create_new_user()
alias = Alias.first()
msg[headers.CC] = alias.email
email_handler.add_alias_to_header_if_needed(msg, alias)
assert msg[headers.TO] is None
assert msg[headers.CC] == alias.email

View File

@ -50,7 +50,7 @@ from app.models import (
)
# flake8: noqa: E101, W191
from tests.utils import login, load_eml_file, create_new_user
from tests.utils import login, load_eml_file, create_new_user, random_domain
def test_get_email_domain_part():
@ -76,9 +76,10 @@ def test_can_be_used_as_personal_email(flask_client):
assert not email_can_be_used_as_mailbox("hey@d1.test")
# custom domain
domain = random_domain()
user = create_new_user()
CustomDomain.create(user_id=user.id, domain="ab.cd", verified=True, commit=True)
assert not email_can_be_used_as_mailbox("hey@ab.cd")
CustomDomain.create(user_id=user.id, domain=domain, verified=True, commit=True)
assert not email_can_be_used_as_mailbox(f"hey@{domain}")
# disposable domain
assert not email_can_be_used_as_mailbox("abcd@10minutesmail.fr")
@ -764,11 +765,12 @@ def test_get_mailbox_bounce_info():
def test_is_invalid_mailbox_domain(flask_client):
InvalidMailboxDomain.create(domain="ab.cd", commit=True)
domain = random_domain()
InvalidMailboxDomain.create(domain=domain, commit=True)
assert is_invalid_mailbox_domain("ab.cd")
assert is_invalid_mailbox_domain("sub.ab.cd")
assert is_invalid_mailbox_domain("sub1.sub2.ab.cd")
assert is_invalid_mailbox_domain(domain)
assert is_invalid_mailbox_domain(f"sub.{domain}")
assert is_invalid_mailbox_domain(f"sub1.sub2.{domain}")
assert not is_invalid_mailbox_domain("xy.zt")

View File

@ -46,6 +46,10 @@ def login(flask_client) -> User:
return user
def random_domain() -> str:
return random_token() + ".test"
def random_token(length: int = 10) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))