Refactor alias suffix (#1194)
* Extract suffix generation and validation to a module * Updated tests * Make custom alias use signed suffixes * Added the signature check to the module * Fix invalid route * Move more suffix related stuff * Fix tests Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
parent
bd044304f0
commit
25fde11a86
|
@ -0,0 +1,162 @@
|
|||
from __future__ import annotations
|
||||
import json
|
||||
from dataclasses import asdict, dataclass
|
||||
from typing import Optional
|
||||
|
||||
import itsdangerous
|
||||
from app import config
|
||||
from app.log import LOG
|
||||
from app.models import User
|
||||
|
||||
|
||||
signer = itsdangerous.TimestampSigner(config.CUSTOM_ALIAS_SECRET)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AliasSuffix:
|
||||
# whether this is a custom domain
|
||||
is_custom: bool
|
||||
# Suffix
|
||||
suffix: str
|
||||
# Suffix signature
|
||||
signed_suffix: str
|
||||
# whether this is a premium SL domain. Not apply to custom domain
|
||||
is_premium: bool
|
||||
# can be either Custom or SL domain
|
||||
domain: str
|
||||
# if custom domain, whether the custom domain has MX verified, i.e. can receive emails
|
||||
mx_verified: bool = True
|
||||
|
||||
def serialize(self):
|
||||
return json.dumps(asdict(self))
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, data: str) -> AliasSuffix:
|
||||
return AliasSuffix(**json.loads(data))
|
||||
|
||||
|
||||
def check_suffix_signature(signed_suffix: str) -> Optional[str]:
|
||||
# hypothesis: user will click on the button in the 600 secs
|
||||
try:
|
||||
return signer.unsign(signed_suffix, max_age=600).decode()
|
||||
except itsdangerous.BadSignature:
|
||||
return None
|
||||
|
||||
|
||||
def verify_prefix_suffix(user: User, alias_prefix, alias_suffix) -> bool:
|
||||
"""verify if user could create an alias with the given prefix and suffix"""
|
||||
if not alias_prefix or not alias_suffix: # should be caught on frontend
|
||||
return False
|
||||
|
||||
user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
|
||||
|
||||
# make sure alias_suffix is either .random_word@simplelogin.co or @my-domain.com
|
||||
alias_suffix = alias_suffix.strip()
|
||||
# alias_domain_prefix is either a .random_word or ""
|
||||
alias_domain_prefix, alias_domain = alias_suffix.split("@", 1)
|
||||
|
||||
# alias_domain must be either one of user custom domains or built-in domains
|
||||
if alias_domain not in user.available_alias_domains():
|
||||
LOG.e("wrong alias suffix %s, user %s", alias_suffix, user)
|
||||
return False
|
||||
|
||||
# SimpleLogin domain case:
|
||||
# 1) alias_suffix must start with "." and
|
||||
# 2) alias_domain_prefix must come from the word list
|
||||
if (
|
||||
alias_domain in user.available_sl_domains()
|
||||
and alias_domain not in user_custom_domains
|
||||
# when DISABLE_ALIAS_SUFFIX is true, alias_domain_prefix is empty
|
||||
and not config.DISABLE_ALIAS_SUFFIX
|
||||
):
|
||||
|
||||
if not alias_domain_prefix.startswith("."):
|
||||
LOG.e("User %s submits a wrong alias suffix %s", user, alias_suffix)
|
||||
return False
|
||||
|
||||
else:
|
||||
if alias_domain not in user_custom_domains:
|
||||
if not config.DISABLE_ALIAS_SUFFIX:
|
||||
LOG.e("wrong alias suffix %s, user %s", alias_suffix, user)
|
||||
return False
|
||||
|
||||
if alias_domain not in user.available_sl_domains():
|
||||
LOG.e("wrong alias suffix %s, user %s", alias_suffix, user)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def get_alias_suffixes(user: User) -> [AliasSuffix]:
|
||||
"""
|
||||
Similar to as get_available_suffixes() but also return custom domain that doesn't have MX set up.
|
||||
"""
|
||||
user_custom_domains = user.verified_custom_domains()
|
||||
|
||||
alias_suffixes: [AliasSuffix] = []
|
||||
|
||||
# put custom domain first
|
||||
# for each user domain, generate both the domain and a random suffix version
|
||||
for custom_domain in user_custom_domains:
|
||||
if custom_domain.random_prefix_generation:
|
||||
suffix = "." + user.get_random_alias_suffix() + "@" + custom_domain.domain
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=True,
|
||||
suffix=suffix,
|
||||
signed_suffix=signer.sign(suffix).decode(),
|
||||
is_premium=False,
|
||||
domain=custom_domain.domain,
|
||||
mx_verified=custom_domain.verified,
|
||||
)
|
||||
if user.default_alias_custom_domain_id == custom_domain.id:
|
||||
alias_suffixes.insert(0, alias_suffix)
|
||||
else:
|
||||
alias_suffixes.append(alias_suffix)
|
||||
|
||||
suffix = "@" + custom_domain.domain
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=True,
|
||||
suffix=suffix,
|
||||
signed_suffix=signer.sign(suffix).decode(),
|
||||
is_premium=False,
|
||||
domain=custom_domain.domain,
|
||||
mx_verified=custom_domain.verified,
|
||||
)
|
||||
|
||||
# put the default domain to top
|
||||
# only if random_prefix_generation isn't enabled
|
||||
if (
|
||||
user.default_alias_custom_domain_id == custom_domain.id
|
||||
and not custom_domain.random_prefix_generation
|
||||
):
|
||||
alias_suffixes.insert(0, alias_suffix)
|
||||
else:
|
||||
alias_suffixes.append(alias_suffix)
|
||||
|
||||
# then SimpleLogin domain
|
||||
for sl_domain in user.get_sl_domains():
|
||||
suffix = (
|
||||
(
|
||||
""
|
||||
if config.DISABLE_ALIAS_SUFFIX
|
||||
else "." + user.get_random_alias_suffix()
|
||||
)
|
||||
+ "@"
|
||||
+ sl_domain.domain
|
||||
)
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=False,
|
||||
suffix=suffix,
|
||||
signed_suffix=signer.sign(suffix).decode(),
|
||||
is_premium=sl_domain.premium_only,
|
||||
domain=sl_domain.domain,
|
||||
mx_verified=True,
|
||||
)
|
||||
|
||||
# put the default domain to top
|
||||
if user.default_alias_public_domain_id == sl_domain.id:
|
||||
alias_suffixes.insert(0, alias_suffix)
|
||||
else:
|
||||
alias_suffixes.append(alias_suffix)
|
||||
|
||||
return alias_suffixes
|
|
@ -2,10 +2,8 @@ import tldextract
|
|||
from flask import jsonify, request, g
|
||||
from sqlalchemy import desc
|
||||
|
||||
from app.alias_suffix import get_alias_suffixes
|
||||
from app.api.base import api_bp, require_api_auth
|
||||
from app.dashboard.views.custom_alias import (
|
||||
get_available_suffixes,
|
||||
)
|
||||
from app.db import Session
|
||||
from app.log import LOG
|
||||
from app.models import AliasUsedOn, Alias, User
|
||||
|
@ -68,7 +66,7 @@ def options_v4():
|
|||
prefix_suggestion = convert_to_id(prefix_suggestion)
|
||||
ret["prefix_suggestion"] = prefix_suggestion
|
||||
|
||||
suffixes = get_available_suffixes(user)
|
||||
suffixes = get_alias_suffixes(user)
|
||||
|
||||
# custom domain should be put first
|
||||
ret["suffixes"] = list([suffix.suffix, suffix.signed_suffix] for suffix in suffixes)
|
||||
|
@ -139,7 +137,7 @@ def options_v5():
|
|||
prefix_suggestion = convert_to_id(prefix_suggestion)
|
||||
ret["prefix_suggestion"] = prefix_suggestion
|
||||
|
||||
suffixes = get_available_suffixes(user)
|
||||
suffixes = get_alias_suffixes(user)
|
||||
|
||||
# custom domain should be put first
|
||||
ret["suffixes"] = [
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from flask import g
|
||||
from flask import jsonify, request
|
||||
from itsdangerous import SignatureExpired
|
||||
|
||||
from app.alias_suffix import check_suffix_signature, verify_prefix_suffix
|
||||
from app.alias_utils import check_alias_prefix
|
||||
from app.api.base import api_bp, require_api_auth
|
||||
from app.api.serializer import (
|
||||
|
@ -9,7 +9,6 @@ from app.api.serializer import (
|
|||
get_alias_info_v2,
|
||||
)
|
||||
from app.config import MAX_NB_EMAIL_FREE_PLAN, ALIAS_LIMIT
|
||||
from app.dashboard.views.custom_alias import verify_prefix_suffix, signer
|
||||
from app.db import Session
|
||||
from app.extensions import limiter
|
||||
from app.log import LOG
|
||||
|
@ -65,12 +64,11 @@ def new_custom_alias_v2():
|
|||
note = data.get("note")
|
||||
alias_prefix = convert_to_id(alias_prefix)
|
||||
|
||||
# hypothesis: user will click on the button in the 600 secs
|
||||
try:
|
||||
alias_suffix = signer.unsign(signed_suffix, max_age=600).decode()
|
||||
except SignatureExpired:
|
||||
LOG.w("Alias creation time expired for %s", user)
|
||||
return jsonify(error="Alias creation time is expired, please retry"), 412
|
||||
alias_suffix = check_suffix_signature(signed_suffix)
|
||||
if not alias_suffix:
|
||||
LOG.w("Alias creation time expired for %s", user)
|
||||
return jsonify(error="Alias creation time is expired, please retry"), 412
|
||||
except Exception:
|
||||
LOG.w("Alias suffix is tampered, user %s", user)
|
||||
return jsonify(error="Tampered suffix"), 400
|
||||
|
@ -181,10 +179,10 @@ def new_custom_alias_v3():
|
|||
|
||||
# hypothesis: user will click on the button in the 600 secs
|
||||
try:
|
||||
alias_suffix = signer.unsign(signed_suffix, max_age=600).decode()
|
||||
except SignatureExpired:
|
||||
LOG.w("Alias creation time expired for %s", user)
|
||||
return jsonify(error="Alias creation time is expired, please retry"), 412
|
||||
alias_suffix = check_suffix_signature(signed_suffix)
|
||||
if not alias_suffix:
|
||||
LOG.w("Alias creation time expired for %s", user)
|
||||
return jsonify(error="Alias creation time is expired, please retry"), 412
|
||||
except Exception:
|
||||
LOG.w("Alias suffix is tampered, user %s", user)
|
||||
return jsonify(error="Tampered suffix"), 400
|
||||
|
|
|
@ -2,13 +2,13 @@ import tldextract
|
|||
from flask import g
|
||||
from flask import jsonify, request
|
||||
|
||||
from app.alias_suffix import get_alias_suffixes
|
||||
from app.api.base import api_bp, require_api_auth
|
||||
from app.api.serializer import (
|
||||
get_alias_info_v2,
|
||||
serialize_alias_info_v2,
|
||||
)
|
||||
from app.config import MAX_NB_EMAIL_FREE_PLAN, ALIAS_LIMIT
|
||||
from app.dashboard.views.custom_alias import get_available_suffixes
|
||||
from app.db import Session
|
||||
from app.errors import AliasInTrashError
|
||||
from app.extensions import limiter
|
||||
|
@ -57,7 +57,7 @@ def new_random_alias():
|
|||
prefix_suggestion = ext.domain
|
||||
prefix_suggestion = convert_to_id(prefix_suggestion)
|
||||
|
||||
suffixes = get_available_suffixes(user)
|
||||
suffixes = get_alias_suffixes(user)
|
||||
# use the first suffix
|
||||
suggested_alias = prefix_suggestion + suffixes[0].suffix
|
||||
|
||||
|
@ -105,8 +105,9 @@ def new_random_alias():
|
|||
Session.commit()
|
||||
|
||||
if hostname and not AliasUsedOn.get_by(alias_id=alias.id, hostname=hostname):
|
||||
AliasUsedOn.create(alias_id=alias.id, hostname=hostname, user_id=alias.user_id)
|
||||
Session.commit()
|
||||
AliasUsedOn.create(
|
||||
alias_id=alias.id, hostname=hostname, user_id=alias.user_id, commit=True
|
||||
)
|
||||
|
||||
return (
|
||||
jsonify(alias=alias.email, **serialize_alias_info_v2(get_alias_info_v2(alias))),
|
||||
|
|
|
@ -1,16 +1,15 @@
|
|||
import json
|
||||
from dataclasses import dataclass, asdict
|
||||
|
||||
from email_validator import validate_email, EmailNotValidError
|
||||
from flask import render_template, redirect, url_for, flash, request
|
||||
from flask_login import login_required, current_user
|
||||
from itsdangerous import TimestampSigner, SignatureExpired
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from app.alias_suffix import (
|
||||
get_alias_suffixes,
|
||||
check_suffix_signature,
|
||||
verify_prefix_suffix,
|
||||
)
|
||||
from app.alias_utils import check_alias_prefix
|
||||
from app.config import (
|
||||
DISABLE_ALIAS_SUFFIX,
|
||||
CUSTOM_ALIAS_SECRET,
|
||||
ALIAS_LIMIT,
|
||||
)
|
||||
from app.dashboard.base import dashboard_bp
|
||||
|
@ -19,176 +18,12 @@ from app.extensions import limiter
|
|||
from app.log import LOG
|
||||
from app.models import (
|
||||
Alias,
|
||||
CustomDomain,
|
||||
DeletedAlias,
|
||||
Mailbox,
|
||||
User,
|
||||
AliasMailbox,
|
||||
DomainDeletedAlias,
|
||||
)
|
||||
|
||||
signer = TimestampSigner(CUSTOM_ALIAS_SECRET)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SuffixInfo:
|
||||
"""
|
||||
Alias suffix info
|
||||
WARNING: should use AliasSuffix instead
|
||||
"""
|
||||
|
||||
# whether this is a custom domain
|
||||
is_custom: bool
|
||||
suffix: str
|
||||
signed_suffix: str
|
||||
|
||||
# whether this is a premium SL domain. Not apply to custom domain
|
||||
is_premium: bool
|
||||
|
||||
|
||||
def get_available_suffixes(user: User) -> [SuffixInfo]:
|
||||
"""
|
||||
WARNING: should use get_alias_suffixes() instead
|
||||
"""
|
||||
user_custom_domains = user.verified_custom_domains()
|
||||
|
||||
suffixes: [SuffixInfo] = []
|
||||
|
||||
# put custom domain first
|
||||
# for each user domain, generate both the domain and a random suffix version
|
||||
for custom_domain in user_custom_domains:
|
||||
if custom_domain.random_prefix_generation:
|
||||
suffix = "." + user.get_random_alias_suffix() + "@" + custom_domain.domain
|
||||
suffix_info = SuffixInfo(True, suffix, signer.sign(suffix).decode(), False)
|
||||
if user.default_alias_custom_domain_id == custom_domain.id:
|
||||
suffixes.insert(0, suffix_info)
|
||||
else:
|
||||
suffixes.append(suffix_info)
|
||||
|
||||
suffix = "@" + custom_domain.domain
|
||||
suffix_info = SuffixInfo(True, suffix, signer.sign(suffix).decode(), False)
|
||||
|
||||
# put the default domain to top
|
||||
# only if random_prefix_generation isn't enabled
|
||||
if (
|
||||
user.default_alias_custom_domain_id == custom_domain.id
|
||||
and not custom_domain.random_prefix_generation
|
||||
):
|
||||
suffixes.insert(0, suffix_info)
|
||||
else:
|
||||
suffixes.append(suffix_info)
|
||||
|
||||
# then SimpleLogin domain
|
||||
for sl_domain in user.get_sl_domains():
|
||||
suffix = (
|
||||
("" if DISABLE_ALIAS_SUFFIX else "." + user.get_random_alias_suffix())
|
||||
+ "@"
|
||||
+ sl_domain.domain
|
||||
)
|
||||
suffix_info = SuffixInfo(
|
||||
False, suffix, signer.sign(suffix).decode(), sl_domain.premium_only
|
||||
)
|
||||
# put the default domain to top
|
||||
if user.default_alias_public_domain_id == sl_domain.id:
|
||||
suffixes.insert(0, suffix_info)
|
||||
else:
|
||||
suffixes.append(suffix_info)
|
||||
|
||||
return suffixes
|
||||
|
||||
|
||||
@dataclass
|
||||
class AliasSuffix:
|
||||
# whether this is a custom domain
|
||||
is_custom: bool
|
||||
suffix: str
|
||||
|
||||
# whether this is a premium SL domain. Not apply to custom domain
|
||||
is_premium: bool
|
||||
|
||||
# can be either Custom or SL domain
|
||||
domain: str
|
||||
|
||||
# if custom domain, whether the custom domain has MX verified, i.e. can receive emails
|
||||
mx_verified: bool = True
|
||||
|
||||
def serialize(self):
|
||||
return json.dumps(asdict(self))
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, data: str) -> "AliasSuffix":
|
||||
return AliasSuffix(**json.loads(data))
|
||||
|
||||
|
||||
def get_alias_suffixes(user: User) -> [AliasSuffix]:
|
||||
"""
|
||||
Similar to as get_available_suffixes() but also return custom domain that doesn't have MX set up.
|
||||
"""
|
||||
user_custom_domains = CustomDomain.filter_by(
|
||||
user_id=user.id, ownership_verified=True
|
||||
).all()
|
||||
|
||||
alias_suffixes: [AliasSuffix] = []
|
||||
|
||||
# put custom domain first
|
||||
# for each user domain, generate both the domain and a random suffix version
|
||||
for custom_domain in user_custom_domains:
|
||||
if custom_domain.random_prefix_generation:
|
||||
suffix = "." + user.get_random_alias_suffix() + "@" + custom_domain.domain
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=True,
|
||||
suffix=suffix,
|
||||
is_premium=False,
|
||||
domain=custom_domain.domain,
|
||||
mx_verified=custom_domain.verified,
|
||||
)
|
||||
if user.default_alias_custom_domain_id == custom_domain.id:
|
||||
alias_suffixes.insert(0, alias_suffix)
|
||||
else:
|
||||
alias_suffixes.append(alias_suffix)
|
||||
|
||||
suffix = "@" + custom_domain.domain
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=True,
|
||||
suffix=suffix,
|
||||
is_premium=False,
|
||||
domain=custom_domain.domain,
|
||||
mx_verified=custom_domain.verified,
|
||||
)
|
||||
|
||||
# put the default domain to top
|
||||
# only if random_prefix_generation isn't enabled
|
||||
if (
|
||||
user.default_alias_custom_domain_id == custom_domain.id
|
||||
and not custom_domain.random_prefix_generation
|
||||
):
|
||||
alias_suffixes.insert(0, alias_suffix)
|
||||
else:
|
||||
alias_suffixes.append(alias_suffix)
|
||||
|
||||
# then SimpleLogin domain
|
||||
for sl_domain in user.get_sl_domains():
|
||||
suffix = (
|
||||
("" if DISABLE_ALIAS_SUFFIX else "." + user.get_random_alias_suffix())
|
||||
+ "@"
|
||||
+ sl_domain.domain
|
||||
)
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=False,
|
||||
suffix=suffix,
|
||||
is_premium=sl_domain.premium_only,
|
||||
domain=sl_domain.domain,
|
||||
mx_verified=True,
|
||||
)
|
||||
|
||||
# put the default domain to top
|
||||
if user.default_alias_public_domain_id == sl_domain.id:
|
||||
alias_suffixes.insert(0, alias_suffix)
|
||||
else:
|
||||
alias_suffixes.append(alias_suffix)
|
||||
|
||||
return alias_suffixes
|
||||
|
||||
|
||||
@dashboard_bp.route("/custom_alias", methods=["GET", "POST"])
|
||||
@limiter.limit(ALIAS_LIMIT, methods=["POST"])
|
||||
|
@ -211,11 +46,6 @@ def custom_alias():
|
|||
at_least_a_premium_domain = True
|
||||
break
|
||||
|
||||
alias_suffixes_with_signature = [
|
||||
(alias_suffix, signer.sign(alias_suffix.serialize()).decode())
|
||||
for alias_suffix in alias_suffixes
|
||||
]
|
||||
|
||||
mailboxes = current_user.mailboxes()
|
||||
|
||||
if request.method == "POST":
|
||||
|
@ -249,25 +79,19 @@ def custom_alias():
|
|||
flash("At least one mailbox must be selected", "error")
|
||||
return redirect(request.url)
|
||||
|
||||
# hypothesis: user will click on the button in the 600 secs
|
||||
try:
|
||||
signed_alias_suffix_decoded = signer.unsign(
|
||||
signed_alias_suffix, max_age=600
|
||||
).decode()
|
||||
alias_suffix: AliasSuffix = AliasSuffix.deserialize(
|
||||
signed_alias_suffix_decoded
|
||||
)
|
||||
except SignatureExpired:
|
||||
LOG.w("Alias creation time expired for %s", current_user)
|
||||
flash("Alias creation time is expired, please retry", "warning")
|
||||
return redirect(request.url)
|
||||
suffix = check_suffix_signature(signed_alias_suffix)
|
||||
if not suffix:
|
||||
LOG.w("Alias creation time expired for %s", current_user)
|
||||
flash("Alias creation time is expired, please retry", "warning")
|
||||
return redirect(request.url)
|
||||
except Exception:
|
||||
LOG.w("Alias suffix is tampered, user %s", current_user)
|
||||
flash("Unknown error, refresh the page", "error")
|
||||
return redirect(request.url)
|
||||
|
||||
if verify_prefix_suffix(current_user, alias_prefix, alias_suffix.suffix):
|
||||
full_alias = alias_prefix + alias_suffix.suffix
|
||||
if verify_prefix_suffix(current_user, alias_prefix, suffix):
|
||||
full_alias = alias_prefix + suffix
|
||||
|
||||
if ".." in full_alias:
|
||||
flash("Your alias can't contain 2 consecutive dots (..)", "error")
|
||||
|
@ -342,51 +166,7 @@ def custom_alias():
|
|||
return render_template(
|
||||
"dashboard/custom_alias.html",
|
||||
user_custom_domains=user_custom_domains,
|
||||
alias_suffixes_with_signature=alias_suffixes_with_signature,
|
||||
alias_suffixes=alias_suffixes,
|
||||
at_least_a_premium_domain=at_least_a_premium_domain,
|
||||
mailboxes=mailboxes,
|
||||
)
|
||||
|
||||
|
||||
def verify_prefix_suffix(user: User, alias_prefix, alias_suffix) -> bool:
|
||||
"""verify if user could create an alias with the given prefix and suffix"""
|
||||
if not alias_prefix or not alias_suffix: # should be caught on frontend
|
||||
return False
|
||||
|
||||
user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
|
||||
|
||||
# make sure alias_suffix is either .random_word@simplelogin.co or @my-domain.com
|
||||
alias_suffix = alias_suffix.strip()
|
||||
# alias_domain_prefix is either a .random_word or ""
|
||||
alias_domain_prefix, alias_domain = alias_suffix.split("@", 1)
|
||||
|
||||
# alias_domain must be either one of user custom domains or built-in domains
|
||||
if alias_domain not in user.available_alias_domains():
|
||||
LOG.e("wrong alias suffix %s, user %s", alias_suffix, user)
|
||||
return False
|
||||
|
||||
# SimpleLogin domain case:
|
||||
# 1) alias_suffix must start with "." and
|
||||
# 2) alias_domain_prefix must come from the word list
|
||||
if (
|
||||
alias_domain in user.available_sl_domains()
|
||||
and alias_domain not in user_custom_domains
|
||||
# when DISABLE_ALIAS_SUFFIX is true, alias_domain_prefix is empty
|
||||
and not DISABLE_ALIAS_SUFFIX
|
||||
):
|
||||
|
||||
if not alias_domain_prefix.startswith("."):
|
||||
LOG.e("User %s submits a wrong alias suffix %s", user, alias_suffix)
|
||||
return False
|
||||
|
||||
else:
|
||||
if alias_domain not in user_custom_domains:
|
||||
if not DISABLE_ALIAS_SUFFIX:
|
||||
LOG.e("wrong alias suffix %s, user %s", alias_suffix, user)
|
||||
return False
|
||||
|
||||
if alias_domain not in user.available_sl_domains():
|
||||
LOG.e("wrong alias suffix %s, user %s", alias_suffix, user)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
|
|
@ -3,11 +3,10 @@ from urllib.parse import urlparse
|
|||
|
||||
from flask import request, render_template, redirect, flash, url_for
|
||||
from flask_login import current_user
|
||||
from itsdangerous import SignatureExpired
|
||||
|
||||
from app.alias_suffix import get_alias_suffixes, check_suffix_signature
|
||||
from app.alias_utils import check_alias_prefix
|
||||
from app.config import EMAIL_DOMAIN
|
||||
from app.dashboard.views.custom_alias import signer, get_available_suffixes
|
||||
from app.db import Session
|
||||
from app.jose_utils import make_id_token
|
||||
from app.log import LOG
|
||||
|
@ -124,7 +123,7 @@ def authorize():
|
|||
user_custom_domains = [
|
||||
cd.domain for cd in current_user.verified_custom_domains()
|
||||
]
|
||||
suffixes = get_available_suffixes(current_user)
|
||||
suffixes = get_alias_suffixes(current_user)
|
||||
|
||||
return render_template(
|
||||
"oauth/authorize.html",
|
||||
|
@ -182,11 +181,11 @@ def authorize():
|
|||
|
||||
# hypothesis: user will click on the button in the 600 secs
|
||||
try:
|
||||
alias_suffix = signer.unsign(signed_suffix, max_age=600).decode()
|
||||
except SignatureExpired:
|
||||
LOG.w("Alias creation time expired for %s", current_user)
|
||||
flash("Alias creation time is expired, please retry", "warning")
|
||||
return redirect(request.url)
|
||||
alias_suffix = check_suffix_signature(signed_suffix)
|
||||
if not alias_suffix:
|
||||
LOG.w("Alias creation time expired for %s", current_user)
|
||||
flash("Alias creation time is expired, please retry", "warning")
|
||||
return redirect(request.url)
|
||||
except Exception:
|
||||
LOG.w("Alias suffix is tampered, user %s", current_user)
|
||||
flash("Unknown error, refresh the page", "error")
|
||||
|
@ -196,7 +195,7 @@ def authorize():
|
|||
cd.domain for cd in current_user.verified_custom_domains()
|
||||
]
|
||||
|
||||
from app.dashboard.views.custom_alias import verify_prefix_suffix
|
||||
from app.alias_suffix import verify_prefix_suffix
|
||||
|
||||
if verify_prefix_suffix(current_user, alias_prefix, alias_suffix):
|
||||
full_alias = alias_prefix + alias_suffix
|
||||
|
|
|
@ -40,10 +40,9 @@
|
|||
</div>
|
||||
<div class="col-sm-6 p-1">
|
||||
<select class="form-control" name="signed-alias-suffix">
|
||||
{% for suffix_info in alias_suffixes_with_signature %}
|
||||
{% for alias_suffix in alias_suffixes %}
|
||||
|
||||
{% set alias_suffix = suffix_info[0] %}
|
||||
<option value="{{ suffix_info[1] }}" {% if alias_suffix.is_premium %}
|
||||
<option value="{{ alias_suffix.signed_suffix }}" {% if alias_suffix.is_premium %}
|
||||
title="Only available to Premium accounts" {% elif not alias_suffix.is_custom and at_least_a_premium_domain %} title="Available to all accounts" {% endif %}>
|
||||
{% if alias_suffix.is_custom %}
|
||||
{% if alias_suffix.mx_verified %}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
from flask import g
|
||||
|
||||
from app.alias_suffix import signer
|
||||
from app.alias_utils import delete_alias
|
||||
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
|
||||
from app.dashboard.views.custom_alias import signer
|
||||
from app.db import Session
|
||||
from app.models import Alias, CustomDomain, Mailbox, AliasUsedOn
|
||||
from app.utils import random_word
|
||||
|
|
|
@ -33,7 +33,7 @@ def test_with_hostname(flask_client):
|
|||
assert "enabled" in res
|
||||
assert "note" in res
|
||||
|
||||
alias_used_on: AliasUsedOn = AliasUsedOn.first()
|
||||
alias_used_on: AliasUsedOn = AliasUsedOn.order_by(AliasUsedOn.id.desc()).first()
|
||||
assert alias_used_on.hostname == "www.test.com"
|
||||
assert alias_used_on.alias_id == res["id"]
|
||||
|
||||
|
|
|
@ -2,20 +2,19 @@ from random import random
|
|||
|
||||
from flask import url_for, g
|
||||
|
||||
from app.alias_utils import delete_alias
|
||||
from app.config import EMAIL_DOMAIN
|
||||
from app.dashboard.views.custom_alias import (
|
||||
from app.alias_suffix import (
|
||||
get_alias_suffixes,
|
||||
AliasSuffix,
|
||||
signer,
|
||||
verify_prefix_suffix,
|
||||
get_available_suffixes,
|
||||
AliasSuffix,
|
||||
)
|
||||
from app.alias_utils import delete_alias
|
||||
from app.config import EMAIL_DOMAIN
|
||||
from app.db import Session
|
||||
from app.models import (
|
||||
Mailbox,
|
||||
CustomDomain,
|
||||
Alias,
|
||||
User,
|
||||
DomainDeletedAlias,
|
||||
DeletedAlias,
|
||||
SLDomain,
|
||||
|
@ -27,20 +26,21 @@ from tests.utils import login, random_domain, create_new_user
|
|||
def test_add_alias_success(flask_client):
|
||||
user = login(flask_client)
|
||||
|
||||
suffix = f".{int(random() * 100000)}@{EMAIL_DOMAIN}"
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=False,
|
||||
suffix=f".{int(random()*100000)}@{EMAIL_DOMAIN}",
|
||||
suffix=suffix,
|
||||
signed_suffix=signer.sign(suffix).decode(),
|
||||
is_premium=False,
|
||||
domain=EMAIL_DOMAIN,
|
||||
)
|
||||
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
|
||||
|
||||
# create with a single mailbox
|
||||
r = flask_client.post(
|
||||
url_for("dashboard.custom_alias"),
|
||||
data={
|
||||
"prefix": "prefix",
|
||||
"signed-alias-suffix": signed_alias_suffix,
|
||||
"signed-alias-suffix": alias_suffix.signed_suffix,
|
||||
"mailboxes": [user.default_mailbox_id],
|
||||
},
|
||||
follow_redirects=True,
|
||||
|
@ -56,13 +56,14 @@ def test_add_alias_multiple_mailboxes(flask_client):
|
|||
user = login(flask_client)
|
||||
Session.commit()
|
||||
|
||||
suffix = f".{int(random() * 100000)}@{EMAIL_DOMAIN}"
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=False,
|
||||
suffix=f".{int(random()*100000)}@{EMAIL_DOMAIN}",
|
||||
suffix=suffix,
|
||||
signed_suffix=signer.sign(suffix).decode(),
|
||||
is_premium=False,
|
||||
domain=EMAIL_DOMAIN,
|
||||
)
|
||||
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
|
||||
|
||||
# create with a multiple mailboxes
|
||||
mb1 = Mailbox.create(user_id=user.id, email="m1@example.com", verified=True)
|
||||
|
@ -72,7 +73,7 @@ def test_add_alias_multiple_mailboxes(flask_client):
|
|||
url_for("dashboard.custom_alias"),
|
||||
data={
|
||||
"prefix": "prefix",
|
||||
"signed-alias-suffix": signed_alias_suffix,
|
||||
"signed-alias-suffix": alias_suffix.signed_suffix,
|
||||
"mailboxes": [user.default_mailbox_id, mb1.id],
|
||||
},
|
||||
follow_redirects=True,
|
||||
|
@ -118,10 +119,10 @@ def test_available_suffixes(flask_client):
|
|||
|
||||
CustomDomain.create(user_id=user.id, domain="test.com", ownership_verified=True)
|
||||
|
||||
assert len(get_available_suffixes(user)) > 0
|
||||
assert len(get_alias_suffixes(user)) > 0
|
||||
|
||||
# first suffix is custom domain
|
||||
first_suffix = get_available_suffixes(user)[0]
|
||||
first_suffix = get_alias_suffixes(user)[0]
|
||||
assert first_suffix.is_custom
|
||||
assert first_suffix.suffix == "@test.com"
|
||||
assert first_suffix.signed_suffix.startswith("@test.com")
|
||||
|
@ -138,12 +139,12 @@ def test_available_suffixes_default_domain(flask_client):
|
|||
user.default_alias_public_domain_id = sl_domain.id
|
||||
|
||||
# first suffix is SL Domain
|
||||
first_suffix = get_available_suffixes(user)[0]
|
||||
first_suffix = get_alias_suffixes(user)[0]
|
||||
assert first_suffix.suffix.endswith(f"@{sl_domain.domain}")
|
||||
|
||||
user.default_alias_public_domain_id = None
|
||||
# first suffix is custom domain
|
||||
first_suffix = get_available_suffixes(user)[0]
|
||||
first_suffix = get_alias_suffixes(user)[0]
|
||||
assert first_suffix.suffix == "@test.com"
|
||||
|
||||
|
||||
|
@ -160,26 +161,26 @@ def test_available_suffixes_random_prefix_generation(flask_client):
|
|||
user.default_alias_custom_domain_id = cd2.id
|
||||
|
||||
# first suffix is test2.com
|
||||
first_suffix = get_available_suffixes(user)[0]
|
||||
first_suffix = get_alias_suffixes(user)[0]
|
||||
assert first_suffix.suffix == "@test2.com"
|
||||
|
||||
cd2.random_prefix_generation = True
|
||||
# e.g. .meo@test2.com
|
||||
first_suffix = get_available_suffixes(user)[0]
|
||||
first_suffix = get_alias_suffixes(user)[0]
|
||||
assert first_suffix.suffix.endswith("@test2.com")
|
||||
assert first_suffix.suffix.startswith(".")
|
||||
|
||||
|
||||
def test_available_suffixes_hidden_domain(flask_client):
|
||||
user = login(flask_client)
|
||||
nb_suffix = len(get_available_suffixes(user))
|
||||
nb_suffix = len(get_alias_suffixes(user))
|
||||
|
||||
sl_domain = SLDomain.create(domain=random_domain(), commit=True)
|
||||
assert len(get_available_suffixes(user)) == nb_suffix + 1
|
||||
assert len(get_alias_suffixes(user)) == nb_suffix + 1
|
||||
|
||||
sl_domain.hidden = True
|
||||
Session.commit()
|
||||
assert len(get_available_suffixes(user)) == nb_suffix
|
||||
assert len(get_alias_suffixes(user)) == nb_suffix
|
||||
|
||||
|
||||
def test_available_suffixes_domain_order(flask_client):
|
||||
|
@ -188,13 +189,13 @@ def test_available_suffixes_domain_order(flask_client):
|
|||
domain = random_domain()
|
||||
# will be the last domain as other domains have order=0
|
||||
sl_domain = SLDomain.create(domain=domain, order=1, commit=True)
|
||||
last_suffix_info = get_available_suffixes(user)[-1]
|
||||
last_suffix_info = get_alias_suffixes(user)[-1]
|
||||
assert last_suffix_info.suffix.endswith(domain)
|
||||
|
||||
# now will be the first domain
|
||||
sl_domain.order = -1
|
||||
Session.commit()
|
||||
first_suffix_info = get_available_suffixes(user)[0]
|
||||
first_suffix_info = get_alias_suffixes(user)[0]
|
||||
assert first_suffix_info.suffix.endswith(domain)
|
||||
|
||||
|
||||
|
@ -202,21 +203,18 @@ def test_add_already_existed_alias(flask_client):
|
|||
user = login(flask_client)
|
||||
Session.commit()
|
||||
|
||||
another_user = User.create(
|
||||
email="a2@b.c",
|
||||
password="password",
|
||||
name="Test User",
|
||||
activated=True,
|
||||
commit=True,
|
||||
)
|
||||
another_user = create_new_user()
|
||||
|
||||
word = random_word()
|
||||
suffix = f".{word}@{EMAIL_DOMAIN}"
|
||||
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=False, suffix=suffix, is_premium=False, domain=EMAIL_DOMAIN
|
||||
is_custom=False,
|
||||
suffix=suffix,
|
||||
signed_suffix=signer.sign(suffix).decode(),
|
||||
is_premium=False,
|
||||
domain=EMAIL_DOMAIN,
|
||||
)
|
||||
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
|
||||
|
||||
# alias already exist
|
||||
Alias.create(
|
||||
|
@ -231,7 +229,7 @@ def test_add_already_existed_alias(flask_client):
|
|||
url_for("dashboard.custom_alias"),
|
||||
data={
|
||||
"prefix": "prefix",
|
||||
"signed-alias-suffix": signed_alias_suffix,
|
||||
"signed-alias-suffix": alias_suffix.signed_suffix,
|
||||
"mailboxes": [user.default_mailbox_id],
|
||||
},
|
||||
follow_redirects=True,
|
||||
|
@ -249,9 +247,12 @@ def test_add_alias_in_global_trash(flask_client):
|
|||
word = random_word()
|
||||
suffix = f".{word}@{EMAIL_DOMAIN}"
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=False, suffix=suffix, is_premium=False, domain=EMAIL_DOMAIN
|
||||
is_custom=False,
|
||||
suffix=suffix,
|
||||
signed_suffix=signer.sign(suffix).decode(),
|
||||
is_premium=False,
|
||||
domain=EMAIL_DOMAIN,
|
||||
)
|
||||
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
|
||||
|
||||
# delete an alias: alias should go the DeletedAlias
|
||||
alias = Alias.create(
|
||||
|
@ -270,7 +271,7 @@ def test_add_alias_in_global_trash(flask_client):
|
|||
url_for("dashboard.custom_alias"),
|
||||
data={
|
||||
"prefix": "prefix",
|
||||
"signed-alias-suffix": signed_alias_suffix,
|
||||
"signed-alias-suffix": alias_suffix.signed_suffix,
|
||||
"mailboxes": [user.default_mailbox_id],
|
||||
},
|
||||
follow_redirects=True,
|
||||
|
@ -304,15 +305,18 @@ def test_add_alias_in_custom_domain_trash(flask_client):
|
|||
suffix = f"@{domain}"
|
||||
|
||||
alias_suffix = AliasSuffix(
|
||||
is_custom=False, suffix=suffix, is_premium=False, domain=EMAIL_DOMAIN
|
||||
is_custom=False,
|
||||
suffix=suffix,
|
||||
signed_suffix=signer.sign(suffix).decode(),
|
||||
is_premium=False,
|
||||
domain=EMAIL_DOMAIN,
|
||||
)
|
||||
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
|
||||
|
||||
r = flask_client.post(
|
||||
url_for("dashboard.custom_alias"),
|
||||
data={
|
||||
"prefix": "prefix",
|
||||
"signed-alias-suffix": signed_alias_suffix,
|
||||
"signed-alias-suffix": alias_suffix.signed_suffix,
|
||||
"mailboxes": [user.default_mailbox_id],
|
||||
},
|
||||
follow_redirects=True,
|
||||
|
|
Loading…
Reference in New Issue