From 25fde11a861c693a7e1ef35de446196df2544dbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Wed, 27 Jul 2022 17:40:22 +0200 Subject: [PATCH] Refactor alias suffix (#1194) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Extract suffix generation and validation to a module * Updated tests * Make custom alias use signed suffixes * Added the signature check to the module * Fix invalid route * Move more suffix related stuff * Fix tests Co-authored-by: Adrià Casajús --- app/alias_suffix.py | 162 +++++++++++++++++ app/api/views/alias_options.py | 8 +- app/api/views/new_custom_alias.py | 20 +-- app/api/views/new_random_alias.py | 9 +- app/dashboard/views/custom_alias.py | 246 ++------------------------ app/oauth/views/authorize.py | 17 +- templates/dashboard/custom_alias.html | 5 +- tests/api/test_new_custom_alias.py | 2 +- tests/api/test_new_random_alias.py | 2 +- tests/dashboard/test_custom_alias.py | 82 +++++---- 10 files changed, 247 insertions(+), 306 deletions(-) create mode 100644 app/alias_suffix.py diff --git a/app/alias_suffix.py b/app/alias_suffix.py new file mode 100644 index 00000000..56f4809f --- /dev/null +++ b/app/alias_suffix.py @@ -0,0 +1,162 @@ +from __future__ import annotations +import json +from dataclasses import asdict, dataclass +from typing import Optional + +import itsdangerous +from app import config +from app.log import LOG +from app.models import User + + +signer = itsdangerous.TimestampSigner(config.CUSTOM_ALIAS_SECRET) + + +@dataclass +class AliasSuffix: + # whether this is a custom domain + is_custom: bool + # Suffix + suffix: str + # Suffix signature + signed_suffix: str + # whether this is a premium SL domain. Not apply to custom domain + is_premium: bool + # can be either Custom or SL domain + domain: str + # if custom domain, whether the custom domain has MX verified, i.e. can receive emails + mx_verified: bool = True + + def serialize(self): + return json.dumps(asdict(self)) + + @classmethod + def deserialize(cls, data: str) -> AliasSuffix: + return AliasSuffix(**json.loads(data)) + + +def check_suffix_signature(signed_suffix: str) -> Optional[str]: + # hypothesis: user will click on the button in the 600 secs + try: + return signer.unsign(signed_suffix, max_age=600).decode() + except itsdangerous.BadSignature: + return None + + +def verify_prefix_suffix(user: User, alias_prefix, alias_suffix) -> bool: + """verify if user could create an alias with the given prefix and suffix""" + if not alias_prefix or not alias_suffix: # should be caught on frontend + return False + + user_custom_domains = [cd.domain for cd in user.verified_custom_domains()] + + # make sure alias_suffix is either .random_word@simplelogin.co or @my-domain.com + alias_suffix = alias_suffix.strip() + # alias_domain_prefix is either a .random_word or "" + alias_domain_prefix, alias_domain = alias_suffix.split("@", 1) + + # alias_domain must be either one of user custom domains or built-in domains + if alias_domain not in user.available_alias_domains(): + LOG.e("wrong alias suffix %s, user %s", alias_suffix, user) + return False + + # SimpleLogin domain case: + # 1) alias_suffix must start with "." and + # 2) alias_domain_prefix must come from the word list + if ( + alias_domain in user.available_sl_domains() + and alias_domain not in user_custom_domains + # when DISABLE_ALIAS_SUFFIX is true, alias_domain_prefix is empty + and not config.DISABLE_ALIAS_SUFFIX + ): + + if not alias_domain_prefix.startswith("."): + LOG.e("User %s submits a wrong alias suffix %s", user, alias_suffix) + return False + + else: + if alias_domain not in user_custom_domains: + if not config.DISABLE_ALIAS_SUFFIX: + LOG.e("wrong alias suffix %s, user %s", alias_suffix, user) + return False + + if alias_domain not in user.available_sl_domains(): + LOG.e("wrong alias suffix %s, user %s", alias_suffix, user) + return False + + return True + + +def get_alias_suffixes(user: User) -> [AliasSuffix]: + """ + Similar to as get_available_suffixes() but also return custom domain that doesn't have MX set up. + """ + user_custom_domains = user.verified_custom_domains() + + alias_suffixes: [AliasSuffix] = [] + + # put custom domain first + # for each user domain, generate both the domain and a random suffix version + for custom_domain in user_custom_domains: + if custom_domain.random_prefix_generation: + suffix = "." + user.get_random_alias_suffix() + "@" + custom_domain.domain + alias_suffix = AliasSuffix( + is_custom=True, + suffix=suffix, + signed_suffix=signer.sign(suffix).decode(), + is_premium=False, + domain=custom_domain.domain, + mx_verified=custom_domain.verified, + ) + if user.default_alias_custom_domain_id == custom_domain.id: + alias_suffixes.insert(0, alias_suffix) + else: + alias_suffixes.append(alias_suffix) + + suffix = "@" + custom_domain.domain + alias_suffix = AliasSuffix( + is_custom=True, + suffix=suffix, + signed_suffix=signer.sign(suffix).decode(), + is_premium=False, + domain=custom_domain.domain, + mx_verified=custom_domain.verified, + ) + + # put the default domain to top + # only if random_prefix_generation isn't enabled + if ( + user.default_alias_custom_domain_id == custom_domain.id + and not custom_domain.random_prefix_generation + ): + alias_suffixes.insert(0, alias_suffix) + else: + alias_suffixes.append(alias_suffix) + + # then SimpleLogin domain + for sl_domain in user.get_sl_domains(): + suffix = ( + ( + "" + if config.DISABLE_ALIAS_SUFFIX + else "." + user.get_random_alias_suffix() + ) + + "@" + + sl_domain.domain + ) + alias_suffix = AliasSuffix( + is_custom=False, + suffix=suffix, + signed_suffix=signer.sign(suffix).decode(), + is_premium=sl_domain.premium_only, + domain=sl_domain.domain, + mx_verified=True, + ) + + # put the default domain to top + if user.default_alias_public_domain_id == sl_domain.id: + alias_suffixes.insert(0, alias_suffix) + else: + alias_suffixes.append(alias_suffix) + + return alias_suffixes diff --git a/app/api/views/alias_options.py b/app/api/views/alias_options.py index 9f998258..7e53ece2 100644 --- a/app/api/views/alias_options.py +++ b/app/api/views/alias_options.py @@ -2,10 +2,8 @@ import tldextract from flask import jsonify, request, g from sqlalchemy import desc +from app.alias_suffix import get_alias_suffixes from app.api.base import api_bp, require_api_auth -from app.dashboard.views.custom_alias import ( - get_available_suffixes, -) from app.db import Session from app.log import LOG from app.models import AliasUsedOn, Alias, User @@ -68,7 +66,7 @@ def options_v4(): prefix_suggestion = convert_to_id(prefix_suggestion) ret["prefix_suggestion"] = prefix_suggestion - suffixes = get_available_suffixes(user) + suffixes = get_alias_suffixes(user) # custom domain should be put first ret["suffixes"] = list([suffix.suffix, suffix.signed_suffix] for suffix in suffixes) @@ -139,7 +137,7 @@ def options_v5(): prefix_suggestion = convert_to_id(prefix_suggestion) ret["prefix_suggestion"] = prefix_suggestion - suffixes = get_available_suffixes(user) + suffixes = get_alias_suffixes(user) # custom domain should be put first ret["suffixes"] = [ diff --git a/app/api/views/new_custom_alias.py b/app/api/views/new_custom_alias.py index e5ad2059..b39c02cd 100644 --- a/app/api/views/new_custom_alias.py +++ b/app/api/views/new_custom_alias.py @@ -1,7 +1,7 @@ from flask import g from flask import jsonify, request -from itsdangerous import SignatureExpired +from app.alias_suffix import check_suffix_signature, verify_prefix_suffix from app.alias_utils import check_alias_prefix from app.api.base import api_bp, require_api_auth from app.api.serializer import ( @@ -9,7 +9,6 @@ from app.api.serializer import ( get_alias_info_v2, ) from app.config import MAX_NB_EMAIL_FREE_PLAN, ALIAS_LIMIT -from app.dashboard.views.custom_alias import verify_prefix_suffix, signer from app.db import Session from app.extensions import limiter from app.log import LOG @@ -65,12 +64,11 @@ def new_custom_alias_v2(): note = data.get("note") alias_prefix = convert_to_id(alias_prefix) - # hypothesis: user will click on the button in the 600 secs try: - alias_suffix = signer.unsign(signed_suffix, max_age=600).decode() - except SignatureExpired: - LOG.w("Alias creation time expired for %s", user) - return jsonify(error="Alias creation time is expired, please retry"), 412 + alias_suffix = check_suffix_signature(signed_suffix) + if not alias_suffix: + LOG.w("Alias creation time expired for %s", user) + return jsonify(error="Alias creation time is expired, please retry"), 412 except Exception: LOG.w("Alias suffix is tampered, user %s", user) return jsonify(error="Tampered suffix"), 400 @@ -181,10 +179,10 @@ def new_custom_alias_v3(): # hypothesis: user will click on the button in the 600 secs try: - alias_suffix = signer.unsign(signed_suffix, max_age=600).decode() - except SignatureExpired: - LOG.w("Alias creation time expired for %s", user) - return jsonify(error="Alias creation time is expired, please retry"), 412 + alias_suffix = check_suffix_signature(signed_suffix) + if not alias_suffix: + LOG.w("Alias creation time expired for %s", user) + return jsonify(error="Alias creation time is expired, please retry"), 412 except Exception: LOG.w("Alias suffix is tampered, user %s", user) return jsonify(error="Tampered suffix"), 400 diff --git a/app/api/views/new_random_alias.py b/app/api/views/new_random_alias.py index a2af4b6c..ef5f4485 100644 --- a/app/api/views/new_random_alias.py +++ b/app/api/views/new_random_alias.py @@ -2,13 +2,13 @@ import tldextract from flask import g from flask import jsonify, request +from app.alias_suffix import get_alias_suffixes from app.api.base import api_bp, require_api_auth from app.api.serializer import ( get_alias_info_v2, serialize_alias_info_v2, ) from app.config import MAX_NB_EMAIL_FREE_PLAN, ALIAS_LIMIT -from app.dashboard.views.custom_alias import get_available_suffixes from app.db import Session from app.errors import AliasInTrashError from app.extensions import limiter @@ -57,7 +57,7 @@ def new_random_alias(): prefix_suggestion = ext.domain prefix_suggestion = convert_to_id(prefix_suggestion) - suffixes = get_available_suffixes(user) + suffixes = get_alias_suffixes(user) # use the first suffix suggested_alias = prefix_suggestion + suffixes[0].suffix @@ -105,8 +105,9 @@ def new_random_alias(): Session.commit() if hostname and not AliasUsedOn.get_by(alias_id=alias.id, hostname=hostname): - AliasUsedOn.create(alias_id=alias.id, hostname=hostname, user_id=alias.user_id) - Session.commit() + AliasUsedOn.create( + alias_id=alias.id, hostname=hostname, user_id=alias.user_id, commit=True + ) return ( jsonify(alias=alias.email, **serialize_alias_info_v2(get_alias_info_v2(alias))), diff --git a/app/dashboard/views/custom_alias.py b/app/dashboard/views/custom_alias.py index 30b20f02..6269609a 100644 --- a/app/dashboard/views/custom_alias.py +++ b/app/dashboard/views/custom_alias.py @@ -1,16 +1,15 @@ -import json -from dataclasses import dataclass, asdict - from email_validator import validate_email, EmailNotValidError from flask import render_template, redirect, url_for, flash, request from flask_login import login_required, current_user -from itsdangerous import TimestampSigner, SignatureExpired from sqlalchemy.exc import IntegrityError +from app.alias_suffix import ( + get_alias_suffixes, + check_suffix_signature, + verify_prefix_suffix, +) from app.alias_utils import check_alias_prefix from app.config import ( - DISABLE_ALIAS_SUFFIX, - CUSTOM_ALIAS_SECRET, ALIAS_LIMIT, ) from app.dashboard.base import dashboard_bp @@ -19,176 +18,12 @@ from app.extensions import limiter from app.log import LOG from app.models import ( Alias, - CustomDomain, DeletedAlias, Mailbox, - User, AliasMailbox, DomainDeletedAlias, ) -signer = TimestampSigner(CUSTOM_ALIAS_SECRET) - - -@dataclass -class SuffixInfo: - """ - Alias suffix info - WARNING: should use AliasSuffix instead - """ - - # whether this is a custom domain - is_custom: bool - suffix: str - signed_suffix: str - - # whether this is a premium SL domain. Not apply to custom domain - is_premium: bool - - -def get_available_suffixes(user: User) -> [SuffixInfo]: - """ - WARNING: should use get_alias_suffixes() instead - """ - user_custom_domains = user.verified_custom_domains() - - suffixes: [SuffixInfo] = [] - - # put custom domain first - # for each user domain, generate both the domain and a random suffix version - for custom_domain in user_custom_domains: - if custom_domain.random_prefix_generation: - suffix = "." + user.get_random_alias_suffix() + "@" + custom_domain.domain - suffix_info = SuffixInfo(True, suffix, signer.sign(suffix).decode(), False) - if user.default_alias_custom_domain_id == custom_domain.id: - suffixes.insert(0, suffix_info) - else: - suffixes.append(suffix_info) - - suffix = "@" + custom_domain.domain - suffix_info = SuffixInfo(True, suffix, signer.sign(suffix).decode(), False) - - # put the default domain to top - # only if random_prefix_generation isn't enabled - if ( - user.default_alias_custom_domain_id == custom_domain.id - and not custom_domain.random_prefix_generation - ): - suffixes.insert(0, suffix_info) - else: - suffixes.append(suffix_info) - - # then SimpleLogin domain - for sl_domain in user.get_sl_domains(): - suffix = ( - ("" if DISABLE_ALIAS_SUFFIX else "." + user.get_random_alias_suffix()) - + "@" - + sl_domain.domain - ) - suffix_info = SuffixInfo( - False, suffix, signer.sign(suffix).decode(), sl_domain.premium_only - ) - # put the default domain to top - if user.default_alias_public_domain_id == sl_domain.id: - suffixes.insert(0, suffix_info) - else: - suffixes.append(suffix_info) - - return suffixes - - -@dataclass -class AliasSuffix: - # whether this is a custom domain - is_custom: bool - suffix: str - - # whether this is a premium SL domain. Not apply to custom domain - is_premium: bool - - # can be either Custom or SL domain - domain: str - - # if custom domain, whether the custom domain has MX verified, i.e. can receive emails - mx_verified: bool = True - - def serialize(self): - return json.dumps(asdict(self)) - - @classmethod - def deserialize(cls, data: str) -> "AliasSuffix": - return AliasSuffix(**json.loads(data)) - - -def get_alias_suffixes(user: User) -> [AliasSuffix]: - """ - Similar to as get_available_suffixes() but also return custom domain that doesn't have MX set up. - """ - user_custom_domains = CustomDomain.filter_by( - user_id=user.id, ownership_verified=True - ).all() - - alias_suffixes: [AliasSuffix] = [] - - # put custom domain first - # for each user domain, generate both the domain and a random suffix version - for custom_domain in user_custom_domains: - if custom_domain.random_prefix_generation: - suffix = "." + user.get_random_alias_suffix() + "@" + custom_domain.domain - alias_suffix = AliasSuffix( - is_custom=True, - suffix=suffix, - is_premium=False, - domain=custom_domain.domain, - mx_verified=custom_domain.verified, - ) - if user.default_alias_custom_domain_id == custom_domain.id: - alias_suffixes.insert(0, alias_suffix) - else: - alias_suffixes.append(alias_suffix) - - suffix = "@" + custom_domain.domain - alias_suffix = AliasSuffix( - is_custom=True, - suffix=suffix, - is_premium=False, - domain=custom_domain.domain, - mx_verified=custom_domain.verified, - ) - - # put the default domain to top - # only if random_prefix_generation isn't enabled - if ( - user.default_alias_custom_domain_id == custom_domain.id - and not custom_domain.random_prefix_generation - ): - alias_suffixes.insert(0, alias_suffix) - else: - alias_suffixes.append(alias_suffix) - - # then SimpleLogin domain - for sl_domain in user.get_sl_domains(): - suffix = ( - ("" if DISABLE_ALIAS_SUFFIX else "." + user.get_random_alias_suffix()) - + "@" - + sl_domain.domain - ) - alias_suffix = AliasSuffix( - is_custom=False, - suffix=suffix, - is_premium=sl_domain.premium_only, - domain=sl_domain.domain, - mx_verified=True, - ) - - # put the default domain to top - if user.default_alias_public_domain_id == sl_domain.id: - alias_suffixes.insert(0, alias_suffix) - else: - alias_suffixes.append(alias_suffix) - - return alias_suffixes - @dashboard_bp.route("/custom_alias", methods=["GET", "POST"]) @limiter.limit(ALIAS_LIMIT, methods=["POST"]) @@ -211,11 +46,6 @@ def custom_alias(): at_least_a_premium_domain = True break - alias_suffixes_with_signature = [ - (alias_suffix, signer.sign(alias_suffix.serialize()).decode()) - for alias_suffix in alias_suffixes - ] - mailboxes = current_user.mailboxes() if request.method == "POST": @@ -249,25 +79,19 @@ def custom_alias(): flash("At least one mailbox must be selected", "error") return redirect(request.url) - # hypothesis: user will click on the button in the 600 secs try: - signed_alias_suffix_decoded = signer.unsign( - signed_alias_suffix, max_age=600 - ).decode() - alias_suffix: AliasSuffix = AliasSuffix.deserialize( - signed_alias_suffix_decoded - ) - except SignatureExpired: - LOG.w("Alias creation time expired for %s", current_user) - flash("Alias creation time is expired, please retry", "warning") - return redirect(request.url) + suffix = check_suffix_signature(signed_alias_suffix) + if not suffix: + LOG.w("Alias creation time expired for %s", current_user) + flash("Alias creation time is expired, please retry", "warning") + return redirect(request.url) except Exception: LOG.w("Alias suffix is tampered, user %s", current_user) flash("Unknown error, refresh the page", "error") return redirect(request.url) - if verify_prefix_suffix(current_user, alias_prefix, alias_suffix.suffix): - full_alias = alias_prefix + alias_suffix.suffix + if verify_prefix_suffix(current_user, alias_prefix, suffix): + full_alias = alias_prefix + suffix if ".." in full_alias: flash("Your alias can't contain 2 consecutive dots (..)", "error") @@ -342,51 +166,7 @@ def custom_alias(): return render_template( "dashboard/custom_alias.html", user_custom_domains=user_custom_domains, - alias_suffixes_with_signature=alias_suffixes_with_signature, + alias_suffixes=alias_suffixes, at_least_a_premium_domain=at_least_a_premium_domain, mailboxes=mailboxes, ) - - -def verify_prefix_suffix(user: User, alias_prefix, alias_suffix) -> bool: - """verify if user could create an alias with the given prefix and suffix""" - if not alias_prefix or not alias_suffix: # should be caught on frontend - return False - - user_custom_domains = [cd.domain for cd in user.verified_custom_domains()] - - # make sure alias_suffix is either .random_word@simplelogin.co or @my-domain.com - alias_suffix = alias_suffix.strip() - # alias_domain_prefix is either a .random_word or "" - alias_domain_prefix, alias_domain = alias_suffix.split("@", 1) - - # alias_domain must be either one of user custom domains or built-in domains - if alias_domain not in user.available_alias_domains(): - LOG.e("wrong alias suffix %s, user %s", alias_suffix, user) - return False - - # SimpleLogin domain case: - # 1) alias_suffix must start with "." and - # 2) alias_domain_prefix must come from the word list - if ( - alias_domain in user.available_sl_domains() - and alias_domain not in user_custom_domains - # when DISABLE_ALIAS_SUFFIX is true, alias_domain_prefix is empty - and not DISABLE_ALIAS_SUFFIX - ): - - if not alias_domain_prefix.startswith("."): - LOG.e("User %s submits a wrong alias suffix %s", user, alias_suffix) - return False - - else: - if alias_domain not in user_custom_domains: - if not DISABLE_ALIAS_SUFFIX: - LOG.e("wrong alias suffix %s, user %s", alias_suffix, user) - return False - - if alias_domain not in user.available_sl_domains(): - LOG.e("wrong alias suffix %s, user %s", alias_suffix, user) - return False - - return True diff --git a/app/oauth/views/authorize.py b/app/oauth/views/authorize.py index 1841b7ee..47afc1a3 100644 --- a/app/oauth/views/authorize.py +++ b/app/oauth/views/authorize.py @@ -3,11 +3,10 @@ from urllib.parse import urlparse from flask import request, render_template, redirect, flash, url_for from flask_login import current_user -from itsdangerous import SignatureExpired +from app.alias_suffix import get_alias_suffixes, check_suffix_signature from app.alias_utils import check_alias_prefix from app.config import EMAIL_DOMAIN -from app.dashboard.views.custom_alias import signer, get_available_suffixes from app.db import Session from app.jose_utils import make_id_token from app.log import LOG @@ -124,7 +123,7 @@ def authorize(): user_custom_domains = [ cd.domain for cd in current_user.verified_custom_domains() ] - suffixes = get_available_suffixes(current_user) + suffixes = get_alias_suffixes(current_user) return render_template( "oauth/authorize.html", @@ -182,11 +181,11 @@ def authorize(): # hypothesis: user will click on the button in the 600 secs try: - alias_suffix = signer.unsign(signed_suffix, max_age=600).decode() - except SignatureExpired: - LOG.w("Alias creation time expired for %s", current_user) - flash("Alias creation time is expired, please retry", "warning") - return redirect(request.url) + alias_suffix = check_suffix_signature(signed_suffix) + if not alias_suffix: + LOG.w("Alias creation time expired for %s", current_user) + flash("Alias creation time is expired, please retry", "warning") + return redirect(request.url) except Exception: LOG.w("Alias suffix is tampered, user %s", current_user) flash("Unknown error, refresh the page", "error") @@ -196,7 +195,7 @@ def authorize(): cd.domain for cd in current_user.verified_custom_domains() ] - from app.dashboard.views.custom_alias import verify_prefix_suffix + from app.alias_suffix import verify_prefix_suffix if verify_prefix_suffix(current_user, alias_prefix, alias_suffix): full_alias = alias_prefix + alias_suffix diff --git a/templates/dashboard/custom_alias.html b/templates/dashboard/custom_alias.html index c24c6789..88586585 100644 --- a/templates/dashboard/custom_alias.html +++ b/templates/dashboard/custom_alias.html @@ -40,10 +40,9 @@