Sign the whole Alias Suffix Info instead of just the suffix

This commit is contained in:
Son Nguyen Kim 2021-07-19 20:14:59 +02:00
parent 3141bf1367
commit d181cd49dd
4 changed files with 162 additions and 38 deletions

View File

@ -41,22 +41,23 @@
<div class="col-sm-6 p-1"> <div class="col-sm-6 p-1">
<select class="form-control" name="suffix"> <select class="form-control" name="signed-alias-suffix">
{% for suffix_info in suffixes %} {% for suffix_info in alias_suffixes_with_signature %}
<option value="{{ suffix_info.signed_suffix }}" {% set alias_suffix = suffix_info[0] %}
{% if suffix_info.is_premium %} <option value="{{ suffix_info[1] }}"
{% if alias_suffix.is_premium %}
title="Only available to Premium accounts" title="Only available to Premium accounts"
{% elif not suffix_info.is_custom and at_least_a_premium_domain %} {% elif not alias_suffix.is_custom and at_least_a_premium_domain %}
title="Available to all accounts" title="Available to all accounts"
{% endif %} {% endif %}
> >
{% if suffix_info.is_custom %} {% if alias_suffix.is_custom %}
{{ suffix_info.suffix }} (your domain) {{ alias_suffix.suffix }} (your domain)
{% else %} {% else %}
{% if suffix_info.is_premium %} {% if alias_suffix.is_premium %}
{{ suffix_info.suffix }} (Premium domain) {{ alias_suffix.suffix }} (Premium domain)
{% else %} {% else %}
{{ suffix_info.suffix }} (Public domain) {{ alias_suffix.suffix }} (Public domain)
{% endif %} {% endif %}
{% endif %} {% endif %}
</option> </option>

View File

@ -1,4 +1,5 @@
from dataclasses import dataclass import json
from dataclasses import dataclass, asdict
from flask import render_template, redirect, url_for, flash, request from flask import render_template, redirect, url_for, flash, request
from flask_login import login_required, current_user from flask_login import login_required, current_user
@ -29,7 +30,10 @@ signer = TimestampSigner(CUSTOM_ALIAS_SECRET)
@dataclass @dataclass
class SuffixInfo: class SuffixInfo:
"""Alias suffix info""" """
Alias suffix info
WARNING: should use AliasSuffix instead
"""
# whether this is a custom domain # whether this is a custom domain
is_custom: bool is_custom: bool
@ -44,6 +48,8 @@ def get_available_suffixes(user: User) -> [SuffixInfo]:
""" """
Similar to as available_suffixes() but also return whether the suffix comes from a premium domain Similar to as available_suffixes() but also return whether the suffix comes from a premium domain
Note that is-premium-domain is only relevant for SL domain Note that is-premium-domain is only relevant for SL domain
WARNING: should use get_alias_suffixes() instead
""" """
user_custom_domains = user.verified_custom_domains() user_custom_domains = user.verified_custom_domains()
@ -92,6 +98,89 @@ def get_available_suffixes(user: User) -> [SuffixInfo]:
return suffixes return suffixes
@dataclass
class AliasSuffix:
# whether this is a custom domain
is_custom: bool
suffix: str
# whether this is a premium SL domain. Not apply to custom domain
is_premium: bool
# can be either Custom or SL domain
domain: str
def serialize(self):
return json.dumps(asdict(self))
@classmethod
def deserialize(cls, data: str) -> "AliasSuffix":
return AliasSuffix(**json.loads(data))
def get_alias_suffixes(user: User) -> [AliasSuffix]:
"""
Similar to as available_suffixes() but also return whether the suffix comes from a premium domain
Note that is-premium-domain is only relevant for SL domain
"""
user_custom_domains = user.verified_custom_domains()
alias_suffixes: [AliasSuffix] = []
# put custom domain first
# for each user domain, generate both the domain and a random suffix version
for custom_domain in user_custom_domains:
if custom_domain.random_prefix_generation:
suffix = "." + user.get_random_alias_suffix() + "@" + custom_domain.domain
alias_suffix = AliasSuffix(
is_custom=True,
suffix=suffix,
is_premium=False,
domain=custom_domain.domain,
)
if user.default_alias_custom_domain_id == custom_domain.id:
alias_suffixes.insert(0, alias_suffix)
else:
alias_suffixes.append(alias_suffix)
suffix = "@" + custom_domain.domain
alias_suffix = AliasSuffix(
is_custom=True, suffix=suffix, is_premium=False, domain=custom_domain.domain
)
# put the default domain to top
# only if random_prefix_generation isn't enabled
if (
user.default_alias_custom_domain_id == custom_domain.id
and not custom_domain.random_prefix_generation
):
alias_suffixes.insert(0, alias_suffix)
else:
alias_suffixes.append(alias_suffix)
# then SimpleLogin domain
for sl_domain in user.get_sl_domains():
suffix = (
("" if DISABLE_ALIAS_SUFFIX else "." + user.get_random_alias_suffix())
+ "@"
+ sl_domain.domain
)
alias_suffix = AliasSuffix(
is_custom=False,
suffix=suffix,
is_premium=sl_domain.premium_only,
domain=sl_domain.domain,
)
# put the default domain to top
if user.default_alias_public_domain_id == sl_domain.id:
alias_suffixes.insert(0, alias_suffix)
else:
alias_suffixes.append(alias_suffix)
return alias_suffixes
@dashboard_bp.route("/custom_alias", methods=["GET", "POST"]) @dashboard_bp.route("/custom_alias", methods=["GET", "POST"])
@limiter.limit(ALIAS_LIMIT, methods=["POST"]) @limiter.limit(ALIAS_LIMIT, methods=["POST"])
@login_required @login_required
@ -106,18 +195,23 @@ def custom_alias():
return redirect(url_for("dashboard.index")) return redirect(url_for("dashboard.index"))
user_custom_domains = [cd.domain for cd in current_user.verified_custom_domains()] user_custom_domains = [cd.domain for cd in current_user.verified_custom_domains()]
suffixes = get_available_suffixes(current_user) alias_suffixes = get_alias_suffixes(current_user)
at_least_a_premium_domain = False at_least_a_premium_domain = False
for suffix in suffixes: for alias_suffix in alias_suffixes:
if not suffix.is_custom and suffix.is_premium: if not alias_suffix.is_custom and alias_suffix.is_premium:
at_least_a_premium_domain = True at_least_a_premium_domain = True
break break
alias_suffixes_with_signature = [
(alias_suffix, signer.sign(alias_suffix.serialize()).decode())
for alias_suffix in alias_suffixes
]
mailboxes = current_user.mailboxes() mailboxes = current_user.mailboxes()
if request.method == "POST": if request.method == "POST":
alias_prefix = request.form.get("prefix").strip().lower().replace(" ", "") alias_prefix = request.form.get("prefix").strip().lower().replace(" ", "")
signed_suffix = request.form.get("suffix") signed_alias_suffix = request.form.get("signed-alias-suffix")
mailbox_ids = request.form.getlist("mailboxes") mailbox_ids = request.form.getlist("mailboxes")
alias_note = request.form.get("note") alias_note = request.form.get("note")
@ -148,7 +242,12 @@ def custom_alias():
# hypothesis: user will click on the button in the 600 secs # hypothesis: user will click on the button in the 600 secs
try: try:
alias_suffix = signer.unsign(signed_suffix, max_age=600).decode() signed_alias_suffix_decoded = signer.unsign(
signed_alias_suffix, max_age=600
).decode()
alias_suffix: AliasSuffix = AliasSuffix.deserialize(
signed_alias_suffix_decoded
)
except SignatureExpired: except SignatureExpired:
LOG.warning("Alias creation time expired for %s", current_user) LOG.warning("Alias creation time expired for %s", current_user)
flash("Alias creation time is expired, please retry", "warning") flash("Alias creation time is expired, please retry", "warning")
@ -158,8 +257,8 @@ def custom_alias():
flash("Unknown error, refresh the page", "error") flash("Unknown error, refresh the page", "error")
return redirect(url_for("dashboard.custom_alias")) return redirect(url_for("dashboard.custom_alias"))
if verify_prefix_suffix(current_user, alias_prefix, alias_suffix): if verify_prefix_suffix(current_user, alias_prefix, alias_suffix.suffix):
full_alias = alias_prefix + alias_suffix full_alias = alias_prefix + alias_suffix.suffix
general_error_msg = f"{full_alias} cannot be used" general_error_msg = f"{full_alias} cannot be used"
@ -193,8 +292,8 @@ def custom_alias():
else: else:
custom_domain_id = None custom_domain_id = None
# get the custom_domain_id if alias is created with a custom domain # get the custom_domain_id if alias is created with a custom domain
if alias_suffix.startswith("@"): if alias_suffix.is_custom:
alias_domain = alias_suffix[1:] alias_domain = alias_suffix.domain
domain = CustomDomain.get_by(domain=alias_domain) domain = CustomDomain.get_by(domain=alias_domain)
if domain: if domain:
@ -232,7 +331,7 @@ def custom_alias():
return render_template( return render_template(
"dashboard/custom_alias.html", "dashboard/custom_alias.html",
user_custom_domains=user_custom_domains, user_custom_domains=user_custom_domains,
suffixes=suffixes, alias_suffixes_with_signature=alias_suffixes_with_signature,
at_least_a_premium_domain=at_least_a_premium_domain, at_least_a_premium_domain=at_least_a_premium_domain,
mailboxes=mailboxes, mailboxes=mailboxes,
) )

View File

@ -101,6 +101,7 @@ from app.models import (
ManualSubscription, ManualSubscription,
Payout, Payout,
Coupon, Coupon,
SLDomain,
) )
from app.monitor.base import monitor_bp from app.monitor.base import monitor_bp
from app.oauth.base import oauth_bp from app.oauth.base import oauth_bp
@ -418,6 +419,8 @@ def fake_data():
commit=True, commit=True,
) )
SLDomain.create(domain="premium.com", premium_only=True, commit=True)
@login_manager.user_loader @login_manager.user_loader
def load_user(user_id): def load_user(user_id):

View File

@ -6,6 +6,7 @@ from app.dashboard.views.custom_alias import (
signer, signer,
verify_prefix_suffix, verify_prefix_suffix,
get_available_suffixes, get_available_suffixes,
AliasSuffix,
) )
from app.extensions import db from app.extensions import db
from app.models import ( from app.models import (
@ -24,22 +25,26 @@ from tests.utils import login
def test_add_alias_success(flask_client): def test_add_alias_success(flask_client):
user = login(flask_client) user = login(flask_client)
word = random_word() alias_suffix = AliasSuffix(
suffix = f".{word}@{EMAIL_DOMAIN}" is_custom=False,
suffix = signer.sign(suffix).decode() suffix=f".12345@{EMAIL_DOMAIN}",
is_premium=False,
domain=EMAIL_DOMAIN,
)
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
# create with a single mailbox # create with a single mailbox
r = flask_client.post( r = flask_client.post(
url_for("dashboard.custom_alias"), url_for("dashboard.custom_alias"),
data={ data={
"prefix": "prefix", "prefix": "prefix",
"suffix": suffix, "signed-alias-suffix": signed_alias_suffix,
"mailboxes": [user.default_mailbox_id], "mailboxes": [user.default_mailbox_id],
}, },
follow_redirects=True, follow_redirects=True,
) )
assert r.status_code == 200 assert r.status_code == 200
assert f"Alias prefix.{word}@{EMAIL_DOMAIN} has been created" in str(r.data) assert f"Alias prefix.12345@{EMAIL_DOMAIN} has been created" in str(r.data)
alias = Alias.query.order_by(Alias.created_at.desc()).first() alias = Alias.query.order_by(Alias.created_at.desc()).first()
assert not alias._mailboxes assert not alias._mailboxes
@ -49,9 +54,13 @@ def test_add_alias_multiple_mailboxes(flask_client):
user = login(flask_client) user = login(flask_client)
db.session.commit() db.session.commit()
word = random_word() alias_suffix = AliasSuffix(
suffix = f".{word}@{EMAIL_DOMAIN}" is_custom=False,
suffix = signer.sign(suffix).decode() suffix=f".12345@{EMAIL_DOMAIN}",
is_premium=False,
domain=EMAIL_DOMAIN,
)
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
# create with a multiple mailboxes # create with a multiple mailboxes
mb1 = Mailbox.create(user_id=user.id, email="m1@example.com", verified=True) mb1 = Mailbox.create(user_id=user.id, email="m1@example.com", verified=True)
@ -61,13 +70,13 @@ def test_add_alias_multiple_mailboxes(flask_client):
url_for("dashboard.custom_alias"), url_for("dashboard.custom_alias"),
data={ data={
"prefix": "prefix", "prefix": "prefix",
"suffix": suffix, "signed-alias-suffix": signed_alias_suffix,
"mailboxes": [user.default_mailbox_id, mb1.id], "mailboxes": [user.default_mailbox_id, mb1.id],
}, },
follow_redirects=True, follow_redirects=True,
) )
assert r.status_code == 200 assert r.status_code == 200
assert f"Alias prefix.{word}@{EMAIL_DOMAIN} has been created" in str(r.data) assert f"Alias prefix.12345@{EMAIL_DOMAIN} has been created" in str(r.data)
alias = Alias.query.order_by(Alias.created_at.desc()).first() alias = Alias.query.order_by(Alias.created_at.desc()).first()
assert alias._mailboxes assert alias._mailboxes
@ -169,7 +178,11 @@ def test_add_already_existed_alias(flask_client):
word = random_word() word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}" suffix = f".{word}@{EMAIL_DOMAIN}"
signed_suffix = signer.sign(suffix).decode()
alias_suffix = AliasSuffix(
is_custom=False, suffix=suffix, is_premium=False, domain=EMAIL_DOMAIN
)
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
# alias already exist # alias already exist
Alias.create( Alias.create(
@ -184,7 +197,7 @@ def test_add_already_existed_alias(flask_client):
url_for("dashboard.custom_alias"), url_for("dashboard.custom_alias"),
data={ data={
"prefix": "prefix", "prefix": "prefix",
"suffix": signed_suffix, "signed-alias-suffix": signed_alias_suffix,
"mailboxes": [user.default_mailbox_id], "mailboxes": [user.default_mailbox_id],
}, },
follow_redirects=True, follow_redirects=True,
@ -207,7 +220,10 @@ def test_add_alias_in_global_trash(flask_client):
word = random_word() word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}" suffix = f".{word}@{EMAIL_DOMAIN}"
signed_suffix = signer.sign(suffix).decode() alias_suffix = AliasSuffix(
is_custom=False, suffix=suffix, is_premium=False, domain=EMAIL_DOMAIN
)
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
# delete an alias: alias should go the DeletedAlias # delete an alias: alias should go the DeletedAlias
alias = Alias.create( alias = Alias.create(
@ -226,7 +242,7 @@ def test_add_alias_in_global_trash(flask_client):
url_for("dashboard.custom_alias"), url_for("dashboard.custom_alias"),
data={ data={
"prefix": "prefix", "prefix": "prefix",
"suffix": signed_suffix, "signed-alias-suffix": signed_alias_suffix,
"mailboxes": [user.default_mailbox_id], "mailboxes": [user.default_mailbox_id],
}, },
follow_redirects=True, follow_redirects=True,
@ -257,12 +273,17 @@ def test_add_alias_in_custom_domain_trash(flask_client):
# create the same alias, should return error # create the same alias, should return error
suffix = "@ab.cd" suffix = "@ab.cd"
signed_suffix = signer.sign(suffix).decode()
alias_suffix = AliasSuffix(
is_custom=False, suffix=suffix, is_premium=False, domain=EMAIL_DOMAIN
)
signed_alias_suffix = signer.sign(alias_suffix.serialize()).decode()
r = flask_client.post( r = flask_client.post(
url_for("dashboard.custom_alias"), url_for("dashboard.custom_alias"),
data={ data={
"prefix": "prefix", "prefix": "prefix",
"suffix": signed_suffix, "signed-alias-suffix": signed_alias_suffix,
"mailboxes": [user.default_mailbox_id], "mailboxes": [user.default_mailbox_id],
}, },
follow_redirects=True, follow_redirects=True,