diff --git a/app/api/views/auth.py b/app/api/views/auth.py index 4da41c77..e8fb3616 100644 --- a/app/api/views/auth.py +++ b/app/api/views/auth.py @@ -23,7 +23,7 @@ from app.events.auth_event import LoginEvent, RegisterEvent from app.extensions import limiter from app.log import LOG from app.models import User, ApiKey, SocialAuth, AccountActivation -from app.utils import sanitize_email +from app.utils import sanitize_email, canonicalize_email @api_bp.route("/auth/login", methods=["POST"]) @@ -49,11 +49,13 @@ def auth_login(): if not data: return jsonify(error="request body cannot be empty"), 400 - email = sanitize_email(data.get("email")) password = data.get("password") device = data.get("device") - user = User.filter_by(email=email).first() + email = sanitize_email(data.get("email")) + canonical_email = canonicalize_email(data.get("email")) + + user = User.get_by(email=email) or User.get_by(email=canonical_email) if not user or not user.check_password(password): LoginEvent(LoginEvent.ActionType.failed, LoginEvent.Source.api).send() @@ -89,7 +91,8 @@ def auth_register(): if not data: return jsonify(error="request body cannot be empty"), 400 - email = sanitize_email(data.get("email")) + dirty_email = data.get("email") + email = canonicalize_email(dirty_email) password = data.get("password") if DISABLE_REGISTRATION: @@ -110,7 +113,7 @@ def auth_register(): return jsonify(error="password too long"), 400 LOG.d("create user %s", email) - user = User.create(email=email, name=email, password=password) + user = User.create(email=email, name=dirty_email, password=password) Session.flush() # create activation code @@ -148,9 +151,10 @@ def auth_activate(): return jsonify(error="request body cannot be empty"), 400 email = sanitize_email(data.get("email")) + canonical_email = canonicalize_email(data.get("email")) code = data.get("code") - user = User.get_by(email=email) + user = User.get_by(email=email) or User.get_by(email=canonical_email) # do not use a different message to avoid exposing existing email if not user or user.activated: @@ -196,7 +200,9 @@ def auth_reactivate(): return jsonify(error="request body cannot be empty"), 400 email = sanitize_email(data.get("email")) - user = User.get_by(email=email) + canonical_email = canonicalize_email(data.get("email")) + + user = User.get_by(email=email) or User.get_by(email=canonical_email) # do not use a different message to avoid exposing existing email if not user or user.activated: @@ -367,8 +373,9 @@ def forgot_password(): return jsonify(error="request body must contain email"), 400 email = sanitize_email(data.get("email")) + canonical_email = canonicalize_email(data.get("email")) - user = User.get_by(email=email) + user = User.get_by(email=email) or User.get_by(email=canonical_email) if user: send_reset_password_email(user) diff --git a/app/auth/views/forgot_password.py b/app/auth/views/forgot_password.py index 85ba5edb..684fd383 100644 --- a/app/auth/views/forgot_password.py +++ b/app/auth/views/forgot_password.py @@ -7,7 +7,7 @@ from app.dashboard.views.setting import send_reset_password_email from app.extensions import limiter from app.log import LOG from app.models import User -from app.utils import sanitize_email +from app.utils import sanitize_email, canonicalize_email class ForgotPasswordForm(FlaskForm): @@ -25,12 +25,14 @@ def forgot_password(): # Trigger rate limiter g.deduct_limit = True - email = sanitize_email(form.email.data) flash( "If your email is correct, you are going to receive an email to reset your password", "success", ) - user = User.get_by(email=email) + + email = sanitize_email(form.email.data) + canonical_email = canonicalize_email(email) + user = User.get_by(email=email) or User.get_by(email=canonical_email) if user: LOG.d("Send forgot password email to %s", user) diff --git a/app/auth/views/login.py b/app/auth/views/login.py index c74bf76a..55cb0c6c 100644 --- a/app/auth/views/login.py +++ b/app/auth/views/login.py @@ -10,7 +10,7 @@ from app.events.auth_event import LoginEvent from app.extensions import limiter from app.log import LOG from app.models import User -from app.utils import sanitize_email, sanitize_next_url +from app.utils import sanitize_email, sanitize_next_url, canonicalize_email class LoginForm(FlaskForm): @@ -38,7 +38,9 @@ def login(): show_resend_activation = False if form.validate_on_submit(): - user = User.filter_by(email=sanitize_email(form.email.data)).first() + email = sanitize_email(form.email.data) + canonical_email = canonicalize_email(email) + user = User.get_by(email=email) or User.get_by(email=canonical_email) if not user or not user.check_password(form.password.data): # Trigger rate limiter diff --git a/app/auth/views/register.py b/app/auth/views/register.py index f186000d..138a9719 100644 --- a/app/auth/views/register.py +++ b/app/auth/views/register.py @@ -17,7 +17,7 @@ from app.email_utils import ( from app.events.auth_event import RegisterEvent from app.log import LOG from app.models import User, ActivationCode, DailyMetric -from app.utils import random_string, encode_url, sanitize_email +from app.utils import random_string, encode_url, sanitize_email, canonicalize_email class RegisterForm(FlaskForm): @@ -70,12 +70,15 @@ def register(): HCAPTCHA_SITEKEY=HCAPTCHA_SITEKEY, ) - email = sanitize_email(form.email.data) + email = canonicalize_email(form.email.data) if not email_can_be_used_as_mailbox(email): flash("You cannot use this email address as your personal inbox.", "error") RegisterEvent(RegisterEvent.ActionType.email_in_use).send() else: - if personal_email_already_used(email): + sanitized_email = sanitize_email(form.email.data) + if personal_email_already_used(email) or personal_email_already_used( + sanitized_email + ): flash(f"Email {email} already used", "error") RegisterEvent(RegisterEvent.ActionType.email_in_use).send() else: diff --git a/app/auth/views/resend_activation.py b/app/auth/views/resend_activation.py index 517006af..36ca20fb 100644 --- a/app/auth/views/resend_activation.py +++ b/app/auth/views/resend_activation.py @@ -7,7 +7,7 @@ from app.auth.views.register import send_activation_email from app.extensions import limiter from app.log import LOG from app.models import User -from app.utils import sanitize_email +from app.utils import sanitize_email, canonicalize_email class ResendActivationForm(FlaskForm): @@ -20,7 +20,9 @@ def resend_activation(): form = ResendActivationForm(request.form) if form.validate_on_submit(): - user = User.filter_by(email=sanitize_email(form.email.data)).first() + email = sanitize_email(form.email.data) + canonical_email = canonicalize_email(email) + user = User.get_by(email=email) or User.get_by(email=canonical_email) if not user: flash("There is no such email", "warning") diff --git a/app/config.py b/app/config.py index d50b5016..775ca505 100644 --- a/app/config.py +++ b/app/config.py @@ -523,4 +523,7 @@ if ENABLE_ALL_REVERSE_ALIAS_REPLACEMENT: os.environ["MAX_NB_REVERSE_ALIAS_REPLACEMENT"] ) +# Only used for tests +SKIP_MX_LOOKUP_ON_CHECK = False + DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ diff --git a/app/dashboard/views/setting.py b/app/dashboard/views/setting.py index 6ac196ea..7ad4899f 100644 --- a/app/dashboard/views/setting.py +++ b/app/dashboard/views/setting.py @@ -54,7 +54,11 @@ from app.models import ( UnsubscribeBehaviourEnum, ) from app.proton.utils import get_proton_partner, perform_proton_account_unlink -from app.utils import random_string, sanitize_email, CSRFValidationForm +from app.utils import ( + random_string, + CSRFValidationForm, + canonicalize_email, +) class SettingForm(FlaskForm): @@ -122,11 +126,8 @@ def setting(): if change_email_form.validate(): # whether user can proceed with the email update new_email_valid = True - if ( - sanitize_email(change_email_form.email.data) != current_user.email - and not pending_email - ): - new_email = sanitize_email(change_email_form.email.data) + new_email = canonicalize_email(change_email_form.email.data) + if new_email != current_user.email and not pending_email: # check if this email is not already used if personal_email_already_used(new_email) or Alias.get_by( diff --git a/app/email_utils.py b/app/email_utils.py index 916aecda..9fed04c6 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -34,30 +34,7 @@ from flanker.addresslib.address import EmailAddress from jinja2 import Environment, FileSystemLoader from sqlalchemy import func -from app.config import ( - ROOT_DIR, - POSTFIX_SERVER, - DKIM_SELECTOR, - DKIM_PRIVATE_KEY, - ALIAS_DOMAINS, - POSTFIX_SUBMISSION_TLS, - MAX_NB_EMAIL_FREE_PLAN, - MAX_ALERT_24H, - POSTFIX_PORT, - URL, - LANDING_PAGE_URL, - EMAIL_DOMAIN, - ALERT_DIRECTORY_DISABLED_ALIAS_CREATION, - ALERT_SPF, - ALERT_INVALID_TOTP_LOGIN, - TEMP_DIR, - ALIAS_AUTOMATIC_DISABLE, - RSPAMD_SIGN_DKIM, - NOREPLY, - VERP_PREFIX, - VERP_MESSAGE_LIFETIME, - VERP_EMAIL_SECRET, -) +from app import config from app.db import Session from app.dns_utils import get_mx_domains from app.email import headers @@ -91,15 +68,15 @@ VERP_HMAC_ALGO = "sha3-224" def render(template_name, **kwargs) -> str: - templates_dir = os.path.join(ROOT_DIR, "templates", "emails") + templates_dir = os.path.join(config.ROOT_DIR, "templates", "emails") env = Environment(loader=FileSystemLoader(templates_dir)) template = env.get_template(template_name) return template.render( - MAX_NB_EMAIL_FREE_PLAN=MAX_NB_EMAIL_FREE_PLAN, - URL=URL, - LANDING_PAGE_URL=LANDING_PAGE_URL, + MAX_NB_EMAIL_FREE_PLAN=config.MAX_NB_EMAIL_FREE_PLAN, + URL=config.URL, + LANDING_PAGE_URL=config.LANDING_PAGE_URL, YEAR=arrow.now().year, **kwargs, ) @@ -187,7 +164,7 @@ def send_change_email(new_email, current_email, link): def send_invalid_totp_login_email(user, totp_type): send_email_with_rate_control( user, - ALERT_INVALID_TOTP_LOGIN, + config.ALERT_INVALID_TOTP_LOGIN, user.email, "Unsuccessful attempt to login to your SimpleLogin account", render( @@ -245,7 +222,7 @@ def send_cannot_create_directory_alias_disabled(user, alias_address, directory_n """ send_email_with_rate_control( user, - ALERT_DIRECTORY_DISABLED_ALIAS_CREATION, + config.ALERT_DIRECTORY_DISABLED_ALIAS_CREATION, user.email, f"Alias {alias_address} cannot be created", render( @@ -297,8 +274,8 @@ def send_email( LOG.d("send email to %s, subject '%s'", to_email, subject) - from_name = from_name or NOREPLY - from_addr = from_addr or NOREPLY + from_name = from_name or config.NOREPLY + from_addr = from_addr or config.NOREPLY from_domain = get_email_domain_part(from_addr) if html: @@ -314,7 +291,7 @@ def send_email( msg[headers.FROM] = f'"{from_name}" <{from_addr}>' msg[headers.TO] = to_email - msg_id_header = make_msgid(domain=EMAIL_DOMAIN) + msg_id_header = make_msgid(domain=config.EMAIL_DOMAIN) msg[headers.MESSAGE_ID] = msg_id_header date_header = formatdate() @@ -353,7 +330,7 @@ def send_email_with_rate_control( subject, plaintext, html=None, - max_nb_alert=MAX_ALERT_24H, + max_nb_alert=config.MAX_ALERT_24H, nb_day=1, ignore_smtp_error=False, retries=0, @@ -450,7 +427,7 @@ def get_email_domain_part(address): def add_dkim_signature(msg: Message, email_domain: str): - if RSPAMD_SIGN_DKIM: + if config.RSPAMD_SIGN_DKIM: LOG.d("DKIM signature will be added by rspamd") msg[headers.SL_WANT_SIGNING] = "yes" return @@ -465,9 +442,9 @@ def add_dkim_signature(msg: Message, email_domain: str): continue # To investigate why some emails can't be DKIM signed. todo: remove - if TEMP_DIR: + if config.TEMP_DIR: file_name = str(uuid.uuid4()) + ".eml" - with open(os.path.join(TEMP_DIR, file_name), "wb") as f: + with open(os.path.join(config.TEMP_DIR, file_name), "wb") as f: f.write(msg.as_bytes()) LOG.w("email saved to %s", file_name) @@ -482,12 +459,12 @@ def add_dkim_signature_with_header( # Specify headers in "byte" form # Generate message signature - if DKIM_PRIVATE_KEY: + if config.DKIM_PRIVATE_KEY: sig = dkim.sign( message_to_bytes(msg), - DKIM_SELECTOR, + config.DKIM_SELECTOR, email_domain.encode(), - DKIM_PRIVATE_KEY.encode(), + config.DKIM_PRIVATE_KEY.encode(), include_headers=dkim_headers, ) sig = sig.decode() @@ -539,7 +516,7 @@ def delete_all_headers_except(msg: Message, headers: [str]): def can_create_directory_for_address(email_address: str) -> bool: """return True if an email ends with one of the alias domains provided by SimpleLogin""" # not allow creating directory with premium domain - for domain in ALIAS_DOMAINS: + for domain in config.ALIAS_DOMAINS: if email_address.endswith("@" + domain): return True @@ -596,7 +573,7 @@ def email_can_be_used_as_mailbox(email_address: str) -> bool: mx_domains = get_mx_domain_list(domain) # if no MX record, email is not valid - if not mx_domains: + if not config.SKIP_MX_LOOKUP_ON_CHECK and not mx_domains: LOG.d("No MX record for domain %s", domain) return False @@ -1097,14 +1074,14 @@ def generate_reply_email(contact_email: str, user: User) -> str: random_length = random.randint(5, 10) reply_email = ( # do not use the ra+ anymore - # f"ra+{contact_email}+{random_string(random_length)}@{EMAIL_DOMAIN}" - f"{contact_email}_{random_string(random_length)}@{EMAIL_DOMAIN}" + # f"ra+{contact_email}+{random_string(random_length)}@{config.EMAIL_DOMAIN}" + f"{contact_email}_{random_string(random_length)}@{config.EMAIL_DOMAIN}" ) else: random_length = random.randint(20, 50) # do not use the ra+ anymore - # reply_email = f"ra+{random_string(random_length)}@{EMAIL_DOMAIN}" - reply_email = f"{random_string(random_length)}@{EMAIL_DOMAIN}" + # reply_email = f"ra+{random_string(random_length)}@{config.EMAIL_DOMAIN}" + reply_email = f"{random_string(random_length)}@{config.EMAIL_DOMAIN}" if not Contact.get_by(reply_email=reply_email): return reply_email @@ -1117,7 +1094,7 @@ def is_reverse_alias(address: str) -> bool: if Contact.get_by(reply_email=address): return True - return address.endswith(f"@{EMAIL_DOMAIN}") and ( + return address.endswith(f"@{config.EMAIL_DOMAIN}") and ( address.startswith("reply+") or address.startswith("ra+") ) @@ -1151,7 +1128,7 @@ def should_disable(alias: Alias) -> (bool, str): LOG.w("%s cannot be disabled", alias) return False, "" - if not ALIAS_AUTOMATIC_DISABLE: + if not config.ALIAS_AUTOMATIC_DISABLE: return False, "" yesterday = arrow.now().shift(days=-1) @@ -1266,14 +1243,14 @@ def spf_pass( subject = get_header_unicode(msg[headers.SUBJECT]) send_email_with_rate_control( user, - ALERT_SPF, + config.ALERT_SPF, mailbox.email, f"SimpleLogin Alert: attempt to send emails from your alias {alias.email} from unknown IP Address", render( "transactional/spf-fail.txt", alias=alias.email, ip=ip, - mailbox_url=URL + f"/dashboard/mailbox/{mailbox.id}#spf", + mailbox_url=config.URL + f"/dashboard/mailbox/{mailbox.id}#spf", to_email=contact_email, subject=subject, time=arrow.now(), @@ -1281,7 +1258,7 @@ def spf_pass( render( "transactional/spf-fail.html", ip=ip, - mailbox_url=URL + f"/dashboard/mailbox/{mailbox.id}#spf", + mailbox_url=config.URL + f"/dashboard/mailbox/{mailbox.id}#spf", to_email=contact_email, subject=subject, time=arrow.now(), @@ -1304,11 +1281,11 @@ def spf_pass( @cached(cache=TTLCache(maxsize=2, ttl=20)) def get_smtp_server(): LOG.d("get a smtp server") - if POSTFIX_SUBMISSION_TLS: - smtp = SMTP(POSTFIX_SERVER, 587) + if config.POSTFIX_SUBMISSION_TLS: + smtp = SMTP(config.POSTFIX_SERVER, 587) smtp.starttls() else: - smtp = SMTP(POSTFIX_SERVER, POSTFIX_PORT) + smtp = SMTP(config.POSTFIX_SERVER, config.POSTFIX_PORT) return smtp @@ -1380,12 +1357,12 @@ def save_email_for_debugging(msg: Message, file_name_prefix=None) -> str: """Save email for debugging to temporary location Return the file path """ - if TEMP_DIR: + if config.TEMP_DIR: file_name = str(uuid.uuid4()) + ".eml" if file_name_prefix: file_name = "{}-{}".format(file_name_prefix, file_name) - with open(os.path.join(TEMP_DIR, file_name), "wb") as f: + with open(os.path.join(config.TEMP_DIR, file_name), "wb") as f: f.write(msg.as_bytes()) LOG.d("email saved to %s", file_name) @@ -1398,12 +1375,12 @@ def save_envelope_for_debugging(envelope: Envelope, file_name_prefix=None) -> st """Save envelope for debugging to temporary location Return the file path """ - if TEMP_DIR: + if config.TEMP_DIR: file_name = str(uuid.uuid4()) + ".eml" if file_name_prefix: file_name = "{}-{}".format(file_name_prefix, file_name) - with open(os.path.join(TEMP_DIR, file_name), "wb") as f: + with open(os.path.join(config.TEMP_DIR, file_name), "wb") as f: f.write(envelope.original_content) LOG.d("envelope saved to %s", file_name) @@ -1429,12 +1406,15 @@ def generate_verp_email( # Signing without itsdangereous because it uses base64 that includes +/= symbols and lower and upper case letters. # We need to encode in base32 payload_hmac = hmac.new( - VERP_EMAIL_SECRET.encode("utf-8"), json_payload, VERP_HMAC_ALGO + config.VERP_EMAIL_SECRET.encode("utf-8"), json_payload, VERP_HMAC_ALGO ).digest()[:8] encoded_payload = base64.b32encode(json_payload).rstrip(b"=").decode("utf-8") encoded_signature = base64.b32encode(payload_hmac).rstrip(b"=").decode("utf-8") return "{}.{}.{}@{}".format( - VERP_PREFIX, encoded_payload, encoded_signature, sender_domain or EMAIL_DOMAIN + config.VERP_PREFIX, + encoded_payload, + encoded_signature, + sender_domain or config.EMAIL_DOMAIN, ).lower() @@ -1447,7 +1427,7 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: return None username = email[:idx] fields = username.split(".") - if len(fields) != 3 or fields[0] != VERP_PREFIX: + if len(fields) != 3 or fields[0] != config.VERP_PREFIX: return None try: padding = (8 - (len(fields[1]) % 8)) % 8 @@ -1459,7 +1439,7 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: except binascii.Error: return None expected_signature = hmac.new( - VERP_EMAIL_SECRET.encode("utf-8"), payload, VERP_HMAC_ALGO + config.VERP_EMAIL_SECRET.encode("utf-8"), payload, VERP_HMAC_ALGO ).digest()[:8] if expected_signature != signature: return None @@ -1467,7 +1447,7 @@ def get_verp_info_from_email(email: str) -> Optional[Tuple[VerpType, int]]: # verp type, object_id, time if len(data) != 3: return None - if data[2] > (time.time() + VERP_MESSAGE_LIFETIME - VERP_TIME_START) / 60: + if data[2] > (time.time() + config.VERP_MESSAGE_LIFETIME - VERP_TIME_START) / 60: return None return VerpType(data[0]), data[1] diff --git a/app/import_utils.py b/app/import_utils.py index f1ac8bc5..0f5421ac 100644 --- a/app/import_utils.py +++ b/app/import_utils.py @@ -15,7 +15,7 @@ from app.models import ( Mailbox, User, ) -from app.utils import sanitize_email +from app.utils import sanitize_email, canonicalize_email from .log import LOG @@ -69,7 +69,7 @@ def import_from_csv(batch_import: BatchImport, user: User, lines): if "mailboxes" in row: for mailbox_email in row["mailboxes"].split(): - mailbox_email = sanitize_email(mailbox_email) + mailbox_email = canonicalize_email(mailbox_email) mailbox = Mailbox.get_by(email=mailbox_email) if not mailbox or not mailbox.verified or mailbox.user_id != user.id: diff --git a/app/utils.py b/app/utils.py index 0dd2cdbe..d25a403a 100644 --- a/app/utils.py +++ b/app/utils.py @@ -69,6 +69,25 @@ def encode_url(url): return urllib.parse.quote(url, safe="") +def canonicalize_email(email_address: str) -> str: + email_address = sanitize_email(email_address) + parts = email_address.split("@") + if len(parts) != 2: + return "" + domain = parts[1] + if domain not in ("gmail.com", "protonmail.com", "proton.me", "pm.me"): + return email_address + first = parts[0] + try: + plus_idx = first.index("+") + first = first[:plus_idx] + except ValueError: + # No + in the email + pass + first = first.replace(".", "") + return f"{first}@{parts[1]}".lower().strip() + + def sanitize_email(email_address: str, not_lower=False) -> str: if email_address: email_address = email_address.strip().replace(" ", "").replace("\n", " ") diff --git a/pyproject.toml b/pyproject.toml index 53be5cb9..442f888c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [tool.black] -target-version = ['py37'] +target-version = ['py310'] exclude = ''' ( /( diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 61428c11..ff9b2dbc 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -2,18 +2,29 @@ import pytest import unicodedata from flask import url_for +from app import config from app.db import Session from app.models import User, AccountActivation +from tests.utils import random_email PASSWORD_1 = "Aurélie" PASSWORD_2 = unicodedata.normalize("NFKD", PASSWORD_1) assert PASSWORD_1 != PASSWORD_2 +def setup_module(): + config.SKIP_MX_LOOKUP_ON_CHECK = True + + +def teardown_module(): + config.SKIP_MX_LOOKUP_ON_CHECK = False + + @pytest.mark.parametrize("mfa", (True, False), ids=("MFA", "no MFA")) def test_auth_login_success(flask_client, mfa: bool): + email = random_email() User.create( - email="abcd@gmail.com", + email=email, password=PASSWORD_1, name="Test User", activated=True, @@ -24,7 +35,7 @@ def test_auth_login_success(flask_client, mfa: bool): r = flask_client.post( "/api/auth/login", json={ - "email": "abcd@gmail.com", + "email": email, "password": PASSWORD_2, "device": "Test Device", }, @@ -45,15 +56,14 @@ def test_auth_login_success(flask_client, mfa: bool): def test_auth_login_device_exist(flask_client): - User.create( - email="abcd@gmail.com", password="password", name="Test User", activated=True - ) + email = random_email() + User.create(email=email, password="password", name="Test User", activated=True) Session.commit() r = flask_client.post( url_for("api.auth_login"), json={ - "email": "abcd@gmail.com", + "email": email, "password": "password", "device": "Test Device", }, @@ -69,7 +79,7 @@ def test_auth_login_device_exist(flask_client): r = flask_client.post( url_for("api.auth_login"), json={ - "email": "abcd@gmail.com", + "email": email, "password": "password", "device": "Test Device", }, @@ -78,11 +88,12 @@ def test_auth_login_device_exist(flask_client): def test_auth_register_success(flask_client): + email = random_email() assert AccountActivation.first() is None r = flask_client.post( url_for("api.auth_register"), - json={"email": "abcd@gmail.com", "password": "password"}, + json={"email": email, "password": "password"}, ) assert r.status_code == 200 @@ -96,9 +107,10 @@ def test_auth_register_success(flask_client): def test_auth_register_too_short_password(flask_client): + email = random_email() r = flask_client.post( url_for("api.auth_register"), - json={"email": "abcd@gmail.com", "password": "short"}, + json={"email": email, "password": "short"}, ) assert r.status_code == 400 @@ -106,9 +118,10 @@ def test_auth_register_too_short_password(flask_client): def test_auth_register_too_long_password(flask_client): + email = random_email() r = flask_client.post( url_for("api.auth_register"), - json={"email": "abcd@gmail.com", "password": "0123456789" * 11}, + json={"email": email, "password": "0123456789" * 11}, ) assert r.status_code == 400 @@ -116,9 +129,10 @@ def test_auth_register_too_long_password(flask_client): def test_auth_activate_success(flask_client): + email = random_email() r = flask_client.post( url_for("api.auth_register"), - json={"email": "abcd@gmail.com", "password": "password"}, + json={"email": email, "password": "password"}, ) assert r.status_code == 200 @@ -131,7 +145,7 @@ def test_auth_activate_success(flask_client): r = flask_client.post( url_for("api.auth_activate"), - json={"email": "abcd@gmail.com", "code": act_code.code}, + json={"email": email, "code": act_code.code}, ) assert r.status_code == 200 @@ -144,21 +158,21 @@ def test_auth_activate_wrong_email(flask_client): def test_auth_activate_user_already_activated(flask_client): - User.create( - email="abcd@gmail.com", password="password", name="Test User", activated=True - ) + email = random_email() + User.create(email=email, password="password", name="Test User", activated=True) Session.commit() r = flask_client.post( - url_for("api.auth_activate"), json={"email": "abcd@gmail.com", "code": "123456"} + url_for("api.auth_activate"), json={"email": email, "code": "123456"} ) assert r.status_code == 400 def test_auth_activate_wrong_code(flask_client): + email = random_email() r = flask_client.post( url_for("api.auth_register"), - json={"email": "abcd@gmail.com", "password": "password"}, + json={"email": email, "password": "password"}, ) assert r.status_code == 200 @@ -175,7 +189,7 @@ def test_auth_activate_wrong_code(flask_client): r = flask_client.post( url_for("api.auth_activate"), - json={"email": "abcd@gmail.com", "code": wrong_code}, + json={"email": email, "code": wrong_code}, ) assert r.status_code == 400 @@ -185,9 +199,10 @@ def test_auth_activate_wrong_code(flask_client): def test_auth_activate_too_many_wrong_code(flask_client): + email = random_email() r = flask_client.post( url_for("api.auth_register"), - json={"email": "abcd@gmail.com", "password": "password"}, + json={"email": email, "password": "password"}, ) assert r.status_code == 200 @@ -205,14 +220,14 @@ def test_auth_activate_too_many_wrong_code(flask_client): for _ in range(2): r = flask_client.post( url_for("api.auth_activate"), - json={"email": "abcd@gmail.com", "code": wrong_code}, + json={"email": email, "code": wrong_code}, ) assert r.status_code == 400 # the activation code is deleted r = flask_client.post( url_for("api.auth_activate"), - json={"email": "abcd@gmail.com", "code": wrong_code}, + json={"email": email, "code": wrong_code}, ) assert r.status_code == 410 @@ -222,12 +237,11 @@ def test_auth_activate_too_many_wrong_code(flask_client): def test_auth_reactivate_success(flask_client): - User.create(email="abcd@gmail.com", password="password", name="Test User") + email = random_email() + User.create(email=email, password="password", name="Test User") Session.commit() - r = flask_client.post( - url_for("api.auth_reactivate"), json={"email": "abcd@gmail.com"} - ) + r = flask_client.post(url_for("api.auth_reactivate"), json={"email": email}) assert r.status_code == 200 # make sure an activation code is created @@ -238,14 +252,13 @@ def test_auth_reactivate_success(flask_client): def test_auth_login_forgot_password(flask_client): - User.create( - email="abcd@gmail.com", password="password", name="Test User", activated=True - ) + email = random_email() + User.create(email=email, password="password", name="Test User", activated=True) Session.commit() r = flask_client.post( url_for("api.forgot_password"), - json={"email": "abcd@gmail.com"}, + json={"email": email}, ) assert r.status_code == 200 @@ -253,7 +266,7 @@ def test_auth_login_forgot_password(flask_client): # No such email, still return 200 r = flask_client.post( url_for("api.forgot_password"), - json={"email": "not-exist@b.c"}, + json={"email": random_email()}, ) assert r.status_code == 200 diff --git a/tests/auth/test_login.py b/tests/auth/test_login.py index f4815e9a..fa00df82 100644 --- a/tests/auth/test_login.py +++ b/tests/auth/test_login.py @@ -1,6 +1,7 @@ from flask import url_for from app.db import Session +from app.utils import canonicalize_email, random_string from tests.utils import create_new_user @@ -20,3 +21,62 @@ def test_unactivated_user_login(flask_client): b"Please check your inbox for the activation email. You can also have this email re-sent" in r.data ) + + +def test_non_canonical_login(flask_client): + email = f"pre.{random_string(10)}@gmail.com" + name = f"NAME-{random_string(10)}" + user = create_new_user(email, name) + Session.commit() + + r = flask_client.post( + url_for("auth.login"), + data={"email": user.email, "password": "password"}, + follow_redirects=True, + ) + + assert r.status_code == 200 + assert name.encode("utf-8") in r.data + + canonical_email = canonicalize_email(email) + assert canonical_email != email + + flask_client.get(url_for("auth.logout")) + + r = flask_client.post( + url_for("auth.login"), + data={"email": canonical_email, "password": "password"}, + follow_redirects=True, + ) + + assert r.status_code == 200 + assert name.encode("utf-8") not in r.data + + +def test_canonical_login_with_non_canonical_email(flask_client): + suffix = f"{random_string(10)}@gmail.com" + canonical_email = f"pre{suffix}" + non_canonical_email = f"pre.{suffix}" + name = f"NAME-{random_string(10)}" + create_new_user(canonical_email, name) + Session.commit() + + r = flask_client.post( + url_for("auth.login"), + data={"email": non_canonical_email, "password": "password"}, + follow_redirects=True, + ) + + assert r.status_code == 200 + assert name.encode("utf-8") in r.data + + flask_client.get(url_for("auth.logout")) + + r = flask_client.post( + url_for("auth.login"), + data={"email": canonical_email, "password": "password"}, + follow_redirects=True, + ) + + assert r.status_code == 200 + assert name.encode("utf-8") in r.data diff --git a/tests/auth/test_register.py b/tests/auth/test_register.py index fafa4679..98f05b06 100644 --- a/tests/auth/test_register.py +++ b/tests/auth/test_register.py @@ -1,13 +1,25 @@ from flask import url_for +from app import config from app.db import Session -from app.models import DailyMetric +from app.models import DailyMetric, User +from app.utils import canonicalize_email +from tests.utils import create_new_user, random_email + + +def setup_module(): + config.SKIP_MX_LOOKUP_ON_CHECK = True + + +def teardown_module(): + config.SKIP_MX_LOOKUP_ON_CHECK = False def test_register_success(flask_client): + email = random_email() r = flask_client.post( url_for("auth.register"), - data={"email": "abcd@gmail.com", "password": "password"}, + data={"email": email, "password": "password"}, follow_redirects=True, ) @@ -23,7 +35,7 @@ def test_register_increment_nb_new_web_non_proton_user(flask_client): r = flask_client.post( url_for("auth.register"), - data={"email": "abcd@gmail.com", "password": "password"}, + data={"email": random_email(), "password": "password"}, follow_redirects=True, ) @@ -34,7 +46,6 @@ def test_register_increment_nb_new_web_non_proton_user(flask_client): def test_register_disabled(flask_client): """User cannot create new account when DISABLE_REGISTRATION.""" - from app import config config.DISABLE_REGISTRATION = True @@ -44,4 +55,34 @@ def test_register_disabled(flask_client): follow_redirects=True, ) + config.DISABLE_REGISTRATION = False assert b"Registration is closed" in r.data + + +def test_register_non_canonical_if_canonical_exists_is_not_allowed(flask_client): + """User cannot create new account if the canonical name clashes""" + email = f"noncan.{random_email()}" + canonical_email = canonicalize_email(email) + create_new_user(email=canonical_email) + + r = flask_client.post( + url_for("auth.register"), + data={"email": email, "password": "password"}, + follow_redirects=True, + ) + + assert f"Email {canonical_email} already used".encode("utf-8") in r.data + + +def test_register_non_canonical_is_canonicalized(flask_client): + """User cannot create new account if the canonical name clashes""" + email = f"noncan.{random_email()}" + + r = flask_client.post( + url_for("auth.register"), + data={"email": email, "password": "password"}, + follow_redirects=True, + ) + + assert b"An email to validate your email is on its way" in r.data + assert User.get_by(email=canonicalize_email(email)) is not None diff --git a/tests/dashboard/test_setting.py b/tests/dashboard/test_setting.py new file mode 100644 index 00000000..6f9274b3 --- /dev/null +++ b/tests/dashboard/test_setting.py @@ -0,0 +1,28 @@ +from flask import url_for + +from app import config +from app.models import EmailChange +from app.utils import canonicalize_email +from tests.utils import login, random_email, create_new_user + + +def test_setup_done(flask_client): + config.SKIP_MX_LOOKUP_ON_CHECK = True + user = create_new_user() + login(flask_client, user) + noncanonical_email = f"nonca.{random_email()}" + + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "update-email", + "email": noncanonical_email, + }, + follow_redirects=True, + ) + + assert r.status_code == 200 + email_change = EmailChange.get_by(user_id=user.id) + assert email_change is not None + assert email_change.new_email == canonicalize_email(noncanonical_email) + config.SKIP_MX_LOOKUP_ON_CHECK = False diff --git a/tests/test.env b/tests/test.env index 128461a1..86d383ae 100644 --- a/tests/test.env +++ b/tests/test.env @@ -67,4 +67,5 @@ RECOVERY_CODE_HMAC_SECRET=1234567890123456789 ENABLE_ALL_REVERSE_ALIAS_REPLACEMENT=true MAX_NB_REVERSE_ALIAS_REPLACEMENT=200 -MEM_STORE_URI=redis://localhost \ No newline at end of file +MEM_STORE_URI=redis://localhost + diff --git a/tests/test_utils.py b/tests/test_utils.py index 28b3d4b3..223828d6 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,7 +4,7 @@ from urllib.parse import parse_qs import pytest from app.config import ALLOWED_REDIRECT_DOMAINS -from app.utils import random_string, random_words, sanitize_next_url +from app.utils import random_string, random_words, sanitize_next_url, canonicalize_email def test_random_words(): @@ -59,3 +59,16 @@ def test_parse_querystring(): assert len(res) == len(expected) for k, v in expected.items(): assert res[k] == v + + +def canonicalize_email_cases(): + for domain in ("gmail.com", "protonmail.com", "proton.me", "pm.me"): + yield (f"a@{domain}", f"a@{domain}") + yield (f"a.b@{domain}", f"ab@{domain}") + yield (f"a.b+c@{domain}", f"ab@{domain}") + yield (f"a.b+c@other.com", f"a.b+c@other.com") + + +@pytest.mark.parametrize("dirty,clean", canonicalize_email_cases()) +def test_canonicalize_email(dirty: str, clean: str): + assert canonicalize_email(dirty) == clean diff --git a/tests/utils.py b/tests/utils.py index 9b2e3044..21efc2ca 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -13,12 +13,16 @@ from app.models import User from app.utils import random_string -def create_new_user() -> User: +def create_new_user(email: Optional[str] = None, name: Optional[str] = None) -> User: + if not email: + email = f"user_{random_token(10)}@mailbox.test" + if not name: + name = f"Test User" # new user has a different email address user = User.create( - email=f"user{random.random()}@mailbox.test", + email=email, password="password", - name="Test User", + name=name, activated=True, flush=True, ) @@ -26,8 +30,9 @@ def create_new_user() -> User: return user -def login(flask_client) -> User: - user = create_new_user() +def login(flask_client, user: Optional[User] = None) -> User: + if not user: + user = create_new_user() r = flask_client.post( url_for("auth.login"),