Canonicalize emails from google and proton before registering users (#1493)

* Revert "Revert "Use canonical email when registering users (#1458)" (#1474)"

This reverts commit c8ab1c747e.

* Only canonicalize gmail and proton

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
Adrià Casajús 2022-12-14 11:50:36 +01:00 committed by GitHub
parent 9dcf063337
commit 5e48d86efa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 311 additions and 131 deletions

View File

@ -23,7 +23,7 @@ from app.events.auth_event import LoginEvent, RegisterEvent
from app.extensions import limiter
from app.log import LOG
from app.models import User, ApiKey, SocialAuth, AccountActivation
from app.utils import sanitize_email
from app.utils import sanitize_email, canonicalize_email
@api_bp.route("/auth/login", methods=["POST"])
@ -49,11 +49,13 @@ def auth_login():
if not data:
return jsonify(error="request body cannot be empty"), 400
email = sanitize_email(data.get("email"))
password = data.get("password")
device = data.get("device")
user = User.filter_by(email=email).first()
email = sanitize_email(data.get("email"))
canonical_email = canonicalize_email(data.get("email"))
user = User.get_by(email=email) or User.get_by(email=canonical_email)
if not user or not user.check_password(password):
LoginEvent(LoginEvent.ActionType.failed, LoginEvent.Source.api).send()
@ -89,7 +91,8 @@ def auth_register():
if not data:
return jsonify(error="request body cannot be empty"), 400
email = sanitize_email(data.get("email"))
dirty_email = data.get("email")
email = canonicalize_email(dirty_email)
password = data.get("password")
if DISABLE_REGISTRATION:
@ -110,7 +113,7 @@ def auth_register():
return jsonify(error="password too long"), 400
LOG.d("create user %s", email)
user = User.create(email=email, name=email, password=password)
user = User.create(email=email, name=dirty_email, password=password)
Session.flush()
# create activation code
@ -148,9 +151,10 @@ def auth_activate():
return jsonify(error="request body cannot be empty"), 400
email = sanitize_email(data.get("email"))
canonical_email = canonicalize_email(data.get("email"))
code = data.get("code")
user = User.get_by(email=email)
user = User.get_by(email=email) or User.get_by(email=canonical_email)
# do not use a different message to avoid exposing existing email
if not user or user.activated:
@ -196,7 +200,9 @@ def auth_reactivate():
return jsonify(error="request body cannot be empty"), 400
email = sanitize_email(data.get("email"))
user = User.get_by(email=email)
canonical_email = canonicalize_email(data.get("email"))
user = User.get_by(email=email) or User.get_by(email=canonical_email)
# do not use a different message to avoid exposing existing email
if not user or user.activated:
@ -367,8 +373,9 @@ def forgot_password():
return jsonify(error="request body must contain email"), 400
email = sanitize_email(data.get("email"))
canonical_email = canonicalize_email(data.get("email"))
user = User.get_by(email=email)
user = User.get_by(email=email) or User.get_by(email=canonical_email)
if user:
send_reset_password_email(user)

View File

@ -7,7 +7,7 @@ from app.dashboard.views.setting import send_reset_password_email
from app.extensions import limiter
from app.log import LOG
from app.models import User
from app.utils import sanitize_email
from app.utils import sanitize_email, canonicalize_email
class ForgotPasswordForm(FlaskForm):
@ -25,12 +25,14 @@ def forgot_password():
# Trigger rate limiter
g.deduct_limit = True
email = sanitize_email(form.email.data)
flash(
"If your email is correct, you are going to receive an email to reset your password",
"success",
)
user = User.get_by(email=email)
email = sanitize_email(form.email.data)
canonical_email = canonicalize_email(email)
user = User.get_by(email=email) or User.get_by(email=canonical_email)
if user:
LOG.d("Send forgot password email to %s", user)

View File

@ -10,7 +10,7 @@ from app.events.auth_event import LoginEvent
from app.extensions import limiter
from app.log import LOG
from app.models import User
from app.utils import sanitize_email, sanitize_next_url
from app.utils import sanitize_email, sanitize_next_url, canonicalize_email
class LoginForm(FlaskForm):
@ -38,7 +38,9 @@ def login():
show_resend_activation = False
if form.validate_on_submit():
user = User.filter_by(email=sanitize_email(form.email.data)).first()
email = sanitize_email(form.email.data)
canonical_email = canonicalize_email(email)
user = User.get_by(email=email) or User.get_by(email=canonical_email)
if not user or not user.check_password(form.password.data):
# Trigger rate limiter

View File

@ -17,7 +17,7 @@ from app.email_utils import (
from app.events.auth_event import RegisterEvent
from app.log import LOG
from app.models import User, ActivationCode, DailyMetric
from app.utils import random_string, encode_url, sanitize_email
from app.utils import random_string, encode_url, sanitize_email, canonicalize_email
class RegisterForm(FlaskForm):
@ -70,12 +70,15 @@ def register():
HCAPTCHA_SITEKEY=HCAPTCHA_SITEKEY,
)
email = sanitize_email(form.email.data)
email = canonicalize_email(form.email.data)
if not email_can_be_used_as_mailbox(email):
flash("You cannot use this email address as your personal inbox.", "error")
RegisterEvent(RegisterEvent.ActionType.email_in_use).send()
else:
if personal_email_already_used(email):
sanitized_email = sanitize_email(form.email.data)
if personal_email_already_used(email) or personal_email_already_used(
sanitized_email
):
flash(f"Email {email} already used", "error")
RegisterEvent(RegisterEvent.ActionType.email_in_use).send()
else:

View File

@ -7,7 +7,7 @@ from app.auth.views.register import send_activation_email
from app.extensions import limiter
from app.log import LOG
from app.models import User
from app.utils import sanitize_email
from app.utils import sanitize_email, canonicalize_email
class ResendActivationForm(FlaskForm):
@ -20,7 +20,9 @@ def resend_activation():
form = ResendActivationForm(request.form)
if form.validate_on_submit():
user = User.filter_by(email=sanitize_email(form.email.data)).first()
email = sanitize_email(form.email.data)
canonical_email = canonicalize_email(email)
user = User.get_by(email=email) or User.get_by(email=canonical_email)
if not user:
flash("There is no such email", "warning")

View File

@ -523,4 +523,7 @@ if ENABLE_ALL_REVERSE_ALIAS_REPLACEMENT:
os.environ["MAX_NB_REVERSE_ALIAS_REPLACEMENT"]
)
# Only used for tests
SKIP_MX_LOOKUP_ON_CHECK = False
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ

View File

@ -54,7 +54,11 @@ from app.models import (
UnsubscribeBehaviourEnum,
)
from app.proton.utils import get_proton_partner, perform_proton_account_unlink
from app.utils import random_string, sanitize_email, CSRFValidationForm
from app.utils import (
random_string,
CSRFValidationForm,
canonicalize_email,
)
class SettingForm(FlaskForm):
@ -122,11 +126,8 @@ def setting():
if change_email_form.validate():
# whether user can proceed with the email update
new_email_valid = True
if (
sanitize_email(change_email_form.email.data) != current_user.email
and not pending_email
):
new_email = sanitize_email(change_email_form.email.data)
new_email = canonicalize_email(change_email_form.email.data)
if new_email != current_user.email and not pending_email:
# check if this email is not already used
if personal_email_already_used(new_email) or Alias.get_by(

View File

@ -34,30 +34,7 @@ from flanker.addresslib.address import EmailAddress
from jinja2 import Environment, FileSystemLoader
from sqlalchemy import func
from app.config import (
ROOT_DIR,
POSTFIX_SERVER,
DKIM_SELECTOR,
DKIM_PRIVATE_KEY,
ALIAS_DOMAINS,
POSTFIX_SUBMISSION_TLS,
MAX_NB_EMAIL_FREE_PLAN,
MAX_ALERT_24H,
POSTFIX_PORT,
URL,
LANDING_PAGE_URL,
EMAIL_DOMAIN,
ALERT_DIRECTORY_DISABLED_ALIAS_CREATION,
ALERT_SPF,
ALERT_INVALID_TOTP_LOGIN,
TEMP_DIR,
ALIAS_AUTOMATIC_DISABLE,
RSPAMD_SIGN_DKIM,
NOREPLY,
VERP_PREFIX,
VERP_MESSAGE_LIFETIME,
VERP_EMAIL_SECRET,
)
from app import config
from app.db import Session
from app.dns_utils import get_mx_domains
from app.email import headers
@ -91,15 +68,15 @@ VERP_HMAC_ALGO = "sha3-224"
def render(template_name, **kwargs) -> str:
templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
templates_dir = os.path.join(config.ROOT_DIR, "templates", "emails")
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template(template_name)
return template.render(
MAX_NB_EMAIL_FREE_PLAN=MAX_NB_EMAIL_FREE_PLAN,
URL=URL,
LANDING_PAGE_URL=LANDING_PAGE_URL,
MAX_NB_EMAIL_FREE_PLAN=config.MAX_NB_EMAIL_FREE_PLAN,
URL=config.URL,
LANDING_PAGE_URL=config.LANDING_PAGE_URL,
YEAR=arrow.now().year,
**kwargs,
)
@ -187,7 +164,7 @@ def send_change_email(new_email, current_email, link):
def send_invalid_totp_login_email(user, totp_type):
send_email_with_rate_control(
user,
ALERT_INVALID_TOTP_LOGIN,
config.ALERT_INVALID_TOTP_LOGIN,
user.email,
"Unsuccessful attempt to login to your SimpleLogin account",
render(
@ -245,7 +222,7 @@ def send_cannot_create_directory_alias_disabled(user, alias_address, directory_n
"""
send_email_with_rate_control(
user,
ALERT_DIRECTORY_DISABLED_ALIAS_CREATION,
config.ALERT_DIRECTORY_DISABLED_ALIAS_CREATION,
user.email,
f"Alias {alias_address} cannot be created",
render(
@ -297,8 +274,8 @@ def send_email(
LOG.d("send email to %s, subject '%s'", to_email, subject)
from_name = from_name or NOREPLY
from_addr = from_addr or NOREPLY
from_name = from_name or config.NOREPLY
from_addr = from_addr or config.NOREPLY
from_domain = get_email_domain_part(from_addr)
if html:
@ -314,7 +291,7 @@ def send_email(
msg[headers.FROM] = f'"{from_name}" <{from_addr}>'
msg[headers.TO] = to_email
msg_id_header = make_msgid(domain=EMAIL_DOMAIN)
msg_id_header = make_msgid(domain=config.EMAIL_DOMAIN)
msg[headers.MESSAGE_ID] = msg_id_header
date_header = formatdate()
@ -353,7 +330,7 @@ def send_email_with_rate_control(
subject,
plaintext,
html=None,
max_nb_alert=MAX_ALERT_24H,
max_nb_alert=config.MAX_ALERT_24H,
nb_day=1,
ignore_smtp_error=False,
retries=0,
@ -450,7 +427,7 @@ def get_email_domain_part(address):
def add_dkim_signature(msg: Message, email_domain: str):
if RSPAMD_SIGN_DKIM:
if config.RSPAMD_SIGN_DKIM:
LOG.d("DKIM signature will be added by rspamd")
msg[headers.SL_WANT_SIGNING] = "yes"
return
@ -465,9 +442,9 @@ def add_dkim_signature(msg: Message, email_domain: str):
continue
# To investigate why some emails can't be DKIM signed. todo: remove
if TEMP_DIR:
if config.TEMP_DIR:
file_name = str(uuid.uuid4()) + ".eml"
with open(os.path.join(TEMP_DIR, file_name), "wb") as f:
with open(os.path.join(config.TEMP_DIR, file_name), "wb") as f:
f.write(msg.as_bytes())
LOG.w("email saved to %s", file_name)
@ -482,12 +459,12 @@ def add_dkim_signature_with_header(
# Specify headers in "byte" form
# Generate message signature
if DKIM_PRIVATE_KEY:
if config.DKIM_PRIVATE_KEY:
sig = dkim.sign(
message_to_bytes(msg),
DKIM_SELECTOR,
config.DKIM_SELECTOR,
email_domain.encode(),
DKIM_PRIVATE_KEY.encode(),
config.DKIM_PRIVATE_KEY.encode(),
include_headers=dkim_headers,
)
sig = sig.decode()
@ -539,7 +516,7 @@ def delete_all_headers_except(msg: Message, headers: [str]):
def can_create_directory_for_address(email_address: str) -> bool:
"""return True if an email ends with one of the alias domains provided by SimpleLogin"""
# not allow creating directory with premium domain
for domain in ALIAS_DOMAINS:
for domain in config.ALIAS_DOMAINS:
if email_address.endswith("@" + domain):
return True
@ -596,7 +573,7 @@ def email_can_be_used_as_mailbox(email_address: str) -> bool:
mx_domains = get_mx_domain_list(domain)
# if no MX record, email is not valid
if not mx_domains:
if not config.SKIP_MX_LOOKUP_ON_CHECK and not mx_domains:
LOG.d("No MX record for domain %s", domain)
return False
@ -1097,14 +1074,14 @@ def generate_reply_email(contact_email: str, user: User) -> str:
random_length = random.randint(5, 10)
reply_email = (
# do not use the ra+ anymore
# f"ra+{contact_email}+{random_string(random_length)}@{EMAIL_DOMAIN}"
f"{contact_email}_{random_string(random_length)}@{EMAIL_DOMAIN}"
# f"ra+{contact_email}+{random_string(random_length)}@{config.EMAIL_DOMAIN}"
f"{contact_email}_{random_string(random_length)}@{config.EMAIL_DOMAIN}"
)
else:
random_length = random.randint(20, 50)
# do not use the ra+ anymore
# reply_email = f"ra+{random_string(random_length)}@{EMAIL_DOMAIN}"
reply_email = f"{random_string(random_length)}@{EMAIL_DOMAIN}"
# reply_email = f"ra+{random_string(random_length)}@{config.EMAIL_DOMAIN}"
reply_email = f"{random_string(random_length)}@{config.EMAIL_DOMAIN}"
if not Contact.get_by(reply_email=reply_email):
return reply_email
@ -1117,7 +1094,7 @@ def is_reverse_alias(address: str) -> bool:
if Contact.get_by(reply_email=address):
return True
return address.endswith(f"@{EMAIL_DOMAIN}") and (
return address.endswith(f"@{config.EMAIL_DOMAIN}") and (
address.startswith("reply+") or address.startswith("ra+")
)
@ -1151,7 +1128,7 @@ def should_disable(alias: Alias) -> (bool, str):
LOG.w("%s cannot be disabled", alias)
return False, ""
if not ALIAS_AUTOMATIC_DISABLE:
if not config.ALIAS_AUTOMATIC_DISABLE:
return False, ""
yesterday = arrow.now().shift(days=-1)
@ -1266,14 +1243,14 @@ def spf_pass(
subject = get_header_unicode(msg[headers.SUBJECT])
send_email_with_rate_control(
user,
ALERT_SPF,
config.ALERT_SPF,
mailbox.email,
f"SimpleLogin Alert: attempt to send emails from your alias {alias.email} from unknown IP Address",
render(
"transactional/spf-fail.txt",
alias=alias.email,
ip=ip,
mailbox_url=URL + f"/dashboard/mailbox/{mailbox.id}#spf",
mailbox_url=config.URL + f"/dashboard/mailbox/{mailbox.id}#spf",
to_email=contact_email,
subject=subject,
time=arrow.now(),
@ -1281,7 +1258,7 @@ def spf_pass(
render(
"transactional/spf-fail.html",
ip=ip,
mailbox_url=URL + f"/dashboard/mailbox/{mailbox.id}#spf",
mailbox_url=config.URL + f"/dashboard/mailbox/{mailbox.id}#spf",
to_email=contact_email,
subject=subject,
time=arrow.now(),
@ -1304,11 +1281,11 @@ def spf_pass(
@cached(cache=TTLCache(maxsize=2, ttl=20))
def get_smtp_server():
LOG.d("get a smtp server")
if POSTFIX_SUBMISSION_TLS:
smtp = SMTP(POSTFIX_SERVER, 587)
if config.POSTFIX_SUBMISSION_TLS:
smtp = SMTP(config.POSTFIX_SERVER, 587)
smtp.starttls()
else:
smtp = SMTP(POSTFIX_SERVER, POSTFIX_PORT)
smtp = SMTP(config.POSTFIX_SERVER, config.POSTFIX_PORT)
return smtp
@ -1380,12 +1357,12 @@ def save_email_for_debugging(msg: Message, file_name_prefix=None) -> str:
"""Save email for debugging to temporary location
Return the file path
"""
if TEMP_DIR:
if config.TEMP_DIR:
file_name = str(uuid.uuid4()) + ".eml"
if file_name_prefix:
file_name = "{}-{}".format(file_name_prefix, file_name)
with open(os.path.join(TEMP_DIR, file_name), "wb") as f:
with open(os.path.join(config.TEMP_DIR, file_name), "wb") as f:
f.write(msg.as_bytes())
LOG.d("email saved to %s", file_name)
@ -1398,12 +1375,12 @@ def save_envelope_for_debugging(envelope: Envelope, file_name_prefix=None) -> st
"""Save envelope for debugging to temporary location
Return the file path
"""
if TEMP_DIR:
if config.TEMP_DIR:
file_name = str(uuid.uuid4()) + ".eml"
if file_name_prefix:
file_name = "{}-{}".format(file_name_prefix, file_name)
with open(os.path.join(TEMP_DIR, file_name), "wb") as f:
with open(os.path.join(config.TEMP_DIR, file_name), "wb") as f:
f.write(envelope.original_content)
LOG.d("envelope saved to %s", file_name)
@ -1429,12 +1406,15 @@ def generate_verp_email(
# Signing without itsdangereous because it uses base64 that includes +/= symbols and lower and upper case letters.
# We need to encode in base32
payload_hmac = hmac.new(
VERP_EMAIL_SECRET.encode("utf-8"), json_payload, VERP_HMAC_ALGO
config.VERP_EMAIL_SECRET.encode("utf-8"), json_payload, VERP_HMAC_ALGO
).digest()[:8]
encoded_payload = base64.b32encode(json_payload).rstrip(b"=").decode("utf-8")
encoded_signature = base64.b32encode(payload_hmac).rstrip(b"=").decode("utf-8")
return "{}.{}.{}@{}".format(
VERP_PREFIX, encoded_payload, encoded_signature, sender_domain or EMAIL_DOMAIN
config.VERP_PREFIX,
encoded_payload,
encoded_signature,
sender_domain or config.EMAIL_DOMAIN,
).lower()
@ -1447,7 +1427,7 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]:
return None
username = email[:idx]
fields = username.split(".")
if len(fields) != 3 or fields[0] != VERP_PREFIX:
if len(fields) != 3 or fields[0] != config.VERP_PREFIX:
return None
try:
padding = (8 - (len(fields[1]) % 8)) % 8
@ -1459,7 +1439,7 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]:
except binascii.Error:
return None
expected_signature = hmac.new(
VERP_EMAIL_SECRET.encode("utf-8"), payload, VERP_HMAC_ALGO
config.VERP_EMAIL_SECRET.encode("utf-8"), payload, VERP_HMAC_ALGO
).digest()[:8]
if expected_signature != signature:
return None
@ -1467,7 +1447,7 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]:
# verp type, object_id, time
if len(data) != 3:
return None
if data[2] > (time.time() + VERP_MESSAGE_LIFETIME - VERP_TIME_START) / 60:
if data[2] > (time.time() + config.VERP_MESSAGE_LIFETIME - VERP_TIME_START) / 60:
return None
return VerpType(data[0]), data[1]

View File

@ -15,7 +15,7 @@ from app.models import (
Mailbox,
User,
)
from app.utils import sanitize_email
from app.utils import sanitize_email, canonicalize_email
from .log import LOG
@ -69,7 +69,7 @@ def import_from_csv(batch_import: BatchImport, user: User, lines):
if "mailboxes" in row:
for mailbox_email in row["mailboxes"].split():
mailbox_email = sanitize_email(mailbox_email)
mailbox_email = canonicalize_email(mailbox_email)
mailbox = Mailbox.get_by(email=mailbox_email)
if not mailbox or not mailbox.verified or mailbox.user_id != user.id:

View File

@ -69,6 +69,25 @@ def encode_url(url):
return urllib.parse.quote(url, safe="")
def canonicalize_email(email_address: str) -> str:
email_address = sanitize_email(email_address)
parts = email_address.split("@")
if len(parts) != 2:
return ""
domain = parts[1]
if domain not in ("gmail.com", "protonmail.com", "proton.me", "pm.me"):
return email_address
first = parts[0]
try:
plus_idx = first.index("+")
first = first[:plus_idx]
except ValueError:
# No + in the email
pass
first = first.replace(".", "")
return f"{first}@{parts[1]}".lower().strip()
def sanitize_email(email_address: str, not_lower=False) -> str:
if email_address:
email_address = email_address.strip().replace(" ", "").replace("\n", " ")

View File

@ -1,5 +1,5 @@
[tool.black]
target-version = ['py37']
target-version = ['py310']
exclude = '''
(
/(

View File

@ -2,18 +2,29 @@ import pytest
import unicodedata
from flask import url_for
from app import config
from app.db import Session
from app.models import User, AccountActivation
from tests.utils import random_email
PASSWORD_1 = "Aurélie"
PASSWORD_2 = unicodedata.normalize("NFKD", PASSWORD_1)
assert PASSWORD_1 != PASSWORD_2
def setup_module():
config.SKIP_MX_LOOKUP_ON_CHECK = True
def teardown_module():
config.SKIP_MX_LOOKUP_ON_CHECK = False
@pytest.mark.parametrize("mfa", (True, False), ids=("MFA", "no MFA"))
def test_auth_login_success(flask_client, mfa: bool):
email = random_email()
User.create(
email="abcd@gmail.com",
email=email,
password=PASSWORD_1,
name="Test User",
activated=True,
@ -24,7 +35,7 @@ def test_auth_login_success(flask_client, mfa: bool):
r = flask_client.post(
"/api/auth/login",
json={
"email": "abcd@gmail.com",
"email": email,
"password": PASSWORD_2,
"device": "Test Device",
},
@ -45,15 +56,14 @@ def test_auth_login_success(flask_client, mfa: bool):
def test_auth_login_device_exist(flask_client):
User.create(
email="abcd@gmail.com", password="password", name="Test User", activated=True
)
email = random_email()
User.create(email=email, password="password", name="Test User", activated=True)
Session.commit()
r = flask_client.post(
url_for("api.auth_login"),
json={
"email": "abcd@gmail.com",
"email": email,
"password": "password",
"device": "Test Device",
},
@ -69,7 +79,7 @@ def test_auth_login_device_exist(flask_client):
r = flask_client.post(
url_for("api.auth_login"),
json={
"email": "abcd@gmail.com",
"email": email,
"password": "password",
"device": "Test Device",
},
@ -78,11 +88,12 @@ def test_auth_login_device_exist(flask_client):
def test_auth_register_success(flask_client):
email = random_email()
assert AccountActivation.first() is None
r = flask_client.post(
url_for("api.auth_register"),
json={"email": "abcd@gmail.com", "password": "password"},
json={"email": email, "password": "password"},
)
assert r.status_code == 200
@ -96,9 +107,10 @@ def test_auth_register_success(flask_client):
def test_auth_register_too_short_password(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": "abcd@gmail.com", "password": "short"},
json={"email": email, "password": "short"},
)
assert r.status_code == 400
@ -106,9 +118,10 @@ def test_auth_register_too_short_password(flask_client):
def test_auth_register_too_long_password(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": "abcd@gmail.com", "password": "0123456789" * 11},
json={"email": email, "password": "0123456789" * 11},
)
assert r.status_code == 400
@ -116,9 +129,10 @@ def test_auth_register_too_long_password(flask_client):
def test_auth_activate_success(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": "abcd@gmail.com", "password": "password"},
json={"email": email, "password": "password"},
)
assert r.status_code == 200
@ -131,7 +145,7 @@ def test_auth_activate_success(flask_client):
r = flask_client.post(
url_for("api.auth_activate"),
json={"email": "abcd@gmail.com", "code": act_code.code},
json={"email": email, "code": act_code.code},
)
assert r.status_code == 200
@ -144,21 +158,21 @@ def test_auth_activate_wrong_email(flask_client):
def test_auth_activate_user_already_activated(flask_client):
User.create(
email="abcd@gmail.com", password="password", name="Test User", activated=True
)
email = random_email()
User.create(email=email, password="password", name="Test User", activated=True)
Session.commit()
r = flask_client.post(
url_for("api.auth_activate"), json={"email": "abcd@gmail.com", "code": "123456"}
url_for("api.auth_activate"), json={"email": email, "code": "123456"}
)
assert r.status_code == 400
def test_auth_activate_wrong_code(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": "abcd@gmail.com", "password": "password"},
json={"email": email, "password": "password"},
)
assert r.status_code == 200
@ -175,7 +189,7 @@ def test_auth_activate_wrong_code(flask_client):
r = flask_client.post(
url_for("api.auth_activate"),
json={"email": "abcd@gmail.com", "code": wrong_code},
json={"email": email, "code": wrong_code},
)
assert r.status_code == 400
@ -185,9 +199,10 @@ def test_auth_activate_wrong_code(flask_client):
def test_auth_activate_too_many_wrong_code(flask_client):
email = random_email()
r = flask_client.post(
url_for("api.auth_register"),
json={"email": "abcd@gmail.com", "password": "password"},
json={"email": email, "password": "password"},
)
assert r.status_code == 200
@ -205,14 +220,14 @@ def test_auth_activate_too_many_wrong_code(flask_client):
for _ in range(2):
r = flask_client.post(
url_for("api.auth_activate"),
json={"email": "abcd@gmail.com", "code": wrong_code},
json={"email": email, "code": wrong_code},
)
assert r.status_code == 400
# the activation code is deleted
r = flask_client.post(
url_for("api.auth_activate"),
json={"email": "abcd@gmail.com", "code": wrong_code},
json={"email": email, "code": wrong_code},
)
assert r.status_code == 410
@ -222,12 +237,11 @@ def test_auth_activate_too_many_wrong_code(flask_client):
def test_auth_reactivate_success(flask_client):
User.create(email="abcd@gmail.com", password="password", name="Test User")
email = random_email()
User.create(email=email, password="password", name="Test User")
Session.commit()
r = flask_client.post(
url_for("api.auth_reactivate"), json={"email": "abcd@gmail.com"}
)
r = flask_client.post(url_for("api.auth_reactivate"), json={"email": email})
assert r.status_code == 200
# make sure an activation code is created
@ -238,14 +252,13 @@ def test_auth_reactivate_success(flask_client):
def test_auth_login_forgot_password(flask_client):
User.create(
email="abcd@gmail.com", password="password", name="Test User", activated=True
)
email = random_email()
User.create(email=email, password="password", name="Test User", activated=True)
Session.commit()
r = flask_client.post(
url_for("api.forgot_password"),
json={"email": "abcd@gmail.com"},
json={"email": email},
)
assert r.status_code == 200
@ -253,7 +266,7 @@ def test_auth_login_forgot_password(flask_client):
# No such email, still return 200
r = flask_client.post(
url_for("api.forgot_password"),
json={"email": "not-exist@b.c"},
json={"email": random_email()},
)
assert r.status_code == 200

View File

@ -1,6 +1,7 @@
from flask import url_for
from app.db import Session
from app.utils import canonicalize_email, random_string
from tests.utils import create_new_user
@ -20,3 +21,62 @@ def test_unactivated_user_login(flask_client):
b"Please check your inbox for the activation email. You can also have this email re-sent"
in r.data
)
def test_non_canonical_login(flask_client):
email = f"pre.{random_string(10)}@gmail.com"
name = f"NAME-{random_string(10)}"
user = create_new_user(email, name)
Session.commit()
r = flask_client.post(
url_for("auth.login"),
data={"email": user.email, "password": "password"},
follow_redirects=True,
)
assert r.status_code == 200
assert name.encode("utf-8") in r.data
canonical_email = canonicalize_email(email)
assert canonical_email != email
flask_client.get(url_for("auth.logout"))
r = flask_client.post(
url_for("auth.login"),
data={"email": canonical_email, "password": "password"},
follow_redirects=True,
)
assert r.status_code == 200
assert name.encode("utf-8") not in r.data
def test_canonical_login_with_non_canonical_email(flask_client):
suffix = f"{random_string(10)}@gmail.com"
canonical_email = f"pre{suffix}"
non_canonical_email = f"pre.{suffix}"
name = f"NAME-{random_string(10)}"
create_new_user(canonical_email, name)
Session.commit()
r = flask_client.post(
url_for("auth.login"),
data={"email": non_canonical_email, "password": "password"},
follow_redirects=True,
)
assert r.status_code == 200
assert name.encode("utf-8") in r.data
flask_client.get(url_for("auth.logout"))
r = flask_client.post(
url_for("auth.login"),
data={"email": canonical_email, "password": "password"},
follow_redirects=True,
)
assert r.status_code == 200
assert name.encode("utf-8") in r.data

View File

@ -1,13 +1,25 @@
from flask import url_for
from app import config
from app.db import Session
from app.models import DailyMetric
from app.models import DailyMetric, User
from app.utils import canonicalize_email
from tests.utils import create_new_user, random_email
def setup_module():
config.SKIP_MX_LOOKUP_ON_CHECK = True
def teardown_module():
config.SKIP_MX_LOOKUP_ON_CHECK = False
def test_register_success(flask_client):
email = random_email()
r = flask_client.post(
url_for("auth.register"),
data={"email": "abcd@gmail.com", "password": "password"},
data={"email": email, "password": "password"},
follow_redirects=True,
)
@ -23,7 +35,7 @@ def test_register_increment_nb_new_web_non_proton_user(flask_client):
r = flask_client.post(
url_for("auth.register"),
data={"email": "abcd@gmail.com", "password": "password"},
data={"email": random_email(), "password": "password"},
follow_redirects=True,
)
@ -34,7 +46,6 @@ def test_register_increment_nb_new_web_non_proton_user(flask_client):
def test_register_disabled(flask_client):
"""User cannot create new account when DISABLE_REGISTRATION."""
from app import config
config.DISABLE_REGISTRATION = True
@ -44,4 +55,34 @@ def test_register_disabled(flask_client):
follow_redirects=True,
)
config.DISABLE_REGISTRATION = False
assert b"Registration is closed" in r.data
def test_register_non_canonical_if_canonical_exists_is_not_allowed(flask_client):
"""User cannot create new account if the canonical name clashes"""
email = f"noncan.{random_email()}"
canonical_email = canonicalize_email(email)
create_new_user(email=canonical_email)
r = flask_client.post(
url_for("auth.register"),
data={"email": email, "password": "password"},
follow_redirects=True,
)
assert f"Email {canonical_email} already used".encode("utf-8") in r.data
def test_register_non_canonical_is_canonicalized(flask_client):
"""User cannot create new account if the canonical name clashes"""
email = f"noncan.{random_email()}"
r = flask_client.post(
url_for("auth.register"),
data={"email": email, "password": "password"},
follow_redirects=True,
)
assert b"An email to validate your email is on its way" in r.data
assert User.get_by(email=canonicalize_email(email)) is not None

View File

@ -0,0 +1,28 @@
from flask import url_for
from app import config
from app.models import EmailChange
from app.utils import canonicalize_email
from tests.utils import login, random_email, create_new_user
def test_setup_done(flask_client):
config.SKIP_MX_LOOKUP_ON_CHECK = True
user = create_new_user()
login(flask_client, user)
noncanonical_email = f"nonca.{random_email()}"
r = flask_client.post(
url_for("dashboard.setting"),
data={
"form-name": "update-email",
"email": noncanonical_email,
},
follow_redirects=True,
)
assert r.status_code == 200
email_change = EmailChange.get_by(user_id=user.id)
assert email_change is not None
assert email_change.new_email == canonicalize_email(noncanonical_email)
config.SKIP_MX_LOOKUP_ON_CHECK = False

View File

@ -67,4 +67,5 @@ RECOVERY_CODE_HMAC_SECRET=1234567890123456789
ENABLE_ALL_REVERSE_ALIAS_REPLACEMENT=true
MAX_NB_REVERSE_ALIAS_REPLACEMENT=200
MEM_STORE_URI=redis://localhost
MEM_STORE_URI=redis://localhost

View File

@ -4,7 +4,7 @@ from urllib.parse import parse_qs
import pytest
from app.config import ALLOWED_REDIRECT_DOMAINS
from app.utils import random_string, random_words, sanitize_next_url
from app.utils import random_string, random_words, sanitize_next_url, canonicalize_email
def test_random_words():
@ -59,3 +59,16 @@ def test_parse_querystring():
assert len(res) == len(expected)
for k, v in expected.items():
assert res[k] == v
def canonicalize_email_cases():
for domain in ("gmail.com", "protonmail.com", "proton.me", "pm.me"):
yield (f"a@{domain}", f"a@{domain}")
yield (f"a.b@{domain}", f"ab@{domain}")
yield (f"a.b+c@{domain}", f"ab@{domain}")
yield (f"a.b+c@other.com", f"a.b+c@other.com")
@pytest.mark.parametrize("dirty,clean", canonicalize_email_cases())
def test_canonicalize_email(dirty: str, clean: str):
assert canonicalize_email(dirty) == clean

View File

@ -13,12 +13,16 @@ from app.models import User
from app.utils import random_string
def create_new_user() -> User:
def create_new_user(email: Optional[str] = None, name: Optional[str] = None) -> User:
if not email:
email = f"user_{random_token(10)}@mailbox.test"
if not name:
name = f"Test User"
# new user has a different email address
user = User.create(
email=f"user{random.random()}@mailbox.test",
email=email,
password="password",
name="Test User",
name=name,
activated=True,
flush=True,
)
@ -26,8 +30,9 @@ def create_new_user() -> User:
return user
def login(flask_client) -> User:
user = create_new_user()
def login(flask_client, user: Optional[User] = None) -> User:
if not user:
user = create_new_user()
r = flask_client.post(
url_for("auth.login"),