Merge pull request #148 from simple-login/anti-tamper

Anti alias suffix tamper
This commit is contained in:
Son Nguyen Kim 2020-05-02 16:32:43 +02:00 committed by GitHub
commit 7ceb9440de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 482 additions and 78 deletions

View File

@ -659,7 +659,7 @@ Output: if api key is correct, return a json with user name and whether user is
If api key is incorrect, return 401.
#### GET /api/v3/alias/options
#### GET /api/v4/alias/options
User alias info and suggestion. Used by the first extension screen when user opens the extension.
@ -669,7 +669,7 @@ Input:
Output: a json with the following field:
- can_create: boolean. Whether user can create new alias
- suffixes: list of string. List of alias `suffix` that user can use. If user doesn't have custom domain, this list has a single element which is the alias default domain (simplelogin.co).
- suffixes: list of `[suffix, signed-suffix]`. List of alias `suffix` that user can use. The `signed-suffix` is necessary to avoid request tampering.
- prefix_suggestion: string. Suggestion for the `alias prefix`. Usually this is the website name extracted from `hostname`. If no `hostname`, then the `prefix_suggestion` is empty.
- recommendation: optional field, dictionary. If an alias is already used for this website, the recommendation will be returned. There are 2 subfields in `recommendation`: `alias` which is the recommended alias and `hostname` is the website on which this alias is used before.
@ -677,20 +677,21 @@ For ex:
```json
{
"can_create": true,
"prefix_suggestion": "test",
"recommendation": {
"alias": "e1.cat@sl.local",
"hostname": "www.test.com"
},
"prefix_suggestion": "",
"suffixes": [
"@very-long-domain.com.net.org",
"@ab.cd",
".cat@sl.local"
[
"@ab.cd",
"@ab.cd.Xq2BOA.zBebBB-QYikFkbPZ9CPKGpJ2-PU"
],
[
".yeah@local1.localhost",
".yeah@local1.localhost.Xq2BOA.dM9gyHyHcSXuJ8ps4i3wpJZ_Frw"
]
]
}
```
#### POST /api/alias/custom/new
#### POST /api/v2/alias/custom/new
Create a new custom alias.
@ -699,7 +700,7 @@ Input:
- (Optional but recommended) `hostname` passed in query string
- Request Message Body in json (`Content-Type` is `application/json`)
- alias_prefix: string. The first part of the alias that user can choose.
- alias_suffix: should be one of the suffixes returned in the `GET /api/v2/alias/options` endpoint.
- signed_suffix: should be one of the suffixes returned in the `GET /api/v4/alias/options` endpoint.
- (Optional) note: alias note
Output:
@ -1168,7 +1169,9 @@ Generate the migration script and make sure to review it before committing it. S
flask db migrate
```
In local the database creation in Sqlite doesn't use migration and uses directly `db.create_all()` (cf `fake_data()` method). This is because Sqlite doesn't handle well the migration. As sqlite is only used during development, the database is deleted and re-populated at each run.
In local the database creation in Sqlite doesn't use migration and uses directly `db.create_all()` (cf `fake_data()` method).
This is because Sqlite doesn't handle well the migration. As sqlite is only used during development, the database is deleted
and re-populated at each run.
### Code structure

View File

@ -4,6 +4,7 @@ from sqlalchemy import desc
from app.api.base import api_bp, require_api_auth
from app.config import ALIAS_DOMAINS, DISABLE_ALIAS_SUFFIX
from app.dashboard.views.custom_alias import available_suffixes
from app.extensions import db
from app.log import LOG
from app.models import AliasUsedOn, Alias, User
@ -26,7 +27,7 @@ def options():
existing: array of existing aliases
"""
LOG.warning("/v2/alias/options should be used instead")
LOG.warning("/alias/options is obsolete")
user = g.user
hostname = request.args.get("hostname")
@ -106,6 +107,8 @@ def options_v2():
"""
LOG.warning("/v2/alias/options is obsolete")
user = g.user
hostname = request.args.get("hostname")
@ -185,6 +188,7 @@ def options_v3():
"""
LOG.warning("/v3/alias/options is obsolete")
user = g.user
hostname = request.args.get("hostname")
@ -239,3 +243,71 @@ def options_v3():
ret["suffixes"] = list(reversed(ret["suffixes"]))
return jsonify(ret)
@api_bp.route("/v4/alias/options")
@cross_origin()
@require_api_auth
def options_v4():
"""
Return what options user has when creating new alias.
Same as v3 but return time-based signed-suffix in addition to suffix. To be used with /v2/alias/custom/new
Input:
a valid api-key in "Authentication" header and
optional "hostname" in args
Output: cf README
can_create: bool
suffixes: [[suffix, signed_suffix]]
prefix_suggestion: str
recommendation: Optional dict
alias: str
hostname: str
"""
user = g.user
hostname = request.args.get("hostname")
ret = {
"can_create": user.can_create_new_alias(),
"suffixes": [],
"prefix_suggestion": "",
}
# recommendation alias if exist
if hostname:
# put the latest used alias first
q = (
db.session.query(AliasUsedOn, Alias, User)
.filter(
AliasUsedOn.alias_id == Alias.id,
Alias.user_id == user.id,
AliasUsedOn.hostname == hostname,
)
.order_by(desc(AliasUsedOn.created_at))
)
r = q.first()
if r:
_, alias, _ = r
LOG.d("found alias %s %s %s", alias, hostname, user)
ret["recommendation"] = {"alias": alias.email, "hostname": hostname}
# custom alias suggestion and suffix
if hostname:
# keep only the domain name of hostname, ignore TLD and subdomain
# for ex www.groupon.com -> groupon
domain_name = hostname
if "." in hostname:
parts = hostname.split(".")
domain_name = parts[-2]
domain_name = convert_to_id(domain_name)
ret["prefix_suggestion"] = domain_name
# List of (is_custom_domain, alias-suffix, time-signed alias-suffix)
suffixes = available_suffixes(user)
# custom domain should be put first
ret["suffixes"] = list([suffix[1], suffix[2]] for suffix in suffixes)
return jsonify(ret)

View File

@ -1,11 +1,12 @@
from flask import g
from flask import jsonify, request
from flask_cors import cross_origin
from itsdangerous import SignatureExpired
from app.api.base import api_bp, require_api_auth
from app.api.serializer import serialize_alias_info, get_alias_info
from app.config import MAX_NB_EMAIL_FREE_PLAN, ALIAS_DOMAINS
from app.dashboard.views.custom_alias import verify_prefix_suffix
from app.dashboard.views.custom_alias import verify_prefix_suffix, signer
from app.extensions import db
from app.log import LOG
from app.models import Alias, AliasUsedOn, User, CustomDomain
@ -28,6 +29,7 @@ def new_custom_alias():
409 if the alias already exists
"""
LOG.warning("/alias/custom/new is obsolete")
user: User = g.user
if not user.can_create_new_alias():
LOG.d("user %s cannot create any custom alias", user)
@ -39,7 +41,6 @@ def new_custom_alias():
400,
)
user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
hostname = request.args.get("hostname")
data = request.get_json()
@ -51,7 +52,85 @@ def new_custom_alias():
note = data.get("note")
alias_prefix = convert_to_id(alias_prefix)
if not verify_prefix_suffix(user, alias_prefix, alias_suffix, user_custom_domains):
if not verify_prefix_suffix(user, alias_prefix, alias_suffix):
return jsonify(error="wrong alias prefix or suffix"), 400
full_alias = alias_prefix + alias_suffix
if Alias.get_by(email=full_alias):
LOG.d("full alias already used %s", full_alias)
return jsonify(error=f"alias {full_alias} already exists"), 409
alias = Alias.create(
user_id=user.id, email=full_alias, mailbox_id=user.default_mailbox_id, note=note
)
if alias_suffix.startswith("@"):
alias_domain = alias_suffix[1:]
if alias_domain not in ALIAS_DOMAINS:
domain = CustomDomain.get_by(domain=alias_domain)
LOG.d("set alias %s to domain %s", full_alias, domain)
alias.custom_domain_id = domain.id
db.session.commit()
if hostname:
AliasUsedOn.create(alias_id=alias.id, hostname=hostname, user_id=alias.user_id)
db.session.commit()
return jsonify(alias=full_alias, **serialize_alias_info(get_alias_info(alias))), 201
@api_bp.route("/v2/alias/custom/new", methods=["POST"])
@cross_origin()
@require_api_auth
def new_custom_alias_v2():
"""
Create a new custom alias
Same as v1 but signed_suffix is actually the suffix with signature, e.g.
.random_word@SL.co.Xq19rQ.s99uWQ7jD1s5JZDZqczYI5TbNNU
Input:
alias_prefix, for ex "www_groupon_com"
signed_suffix, either .random_letters@simplelogin.co or @my-domain.com
optional "hostname" in args
optional "note"
Output:
201 if success
409 if the alias already exists
"""
user: User = g.user
if not user.can_create_new_alias():
LOG.d("user %s cannot create any custom alias", user)
return (
jsonify(
error="You have reached the limitation of a free account with the maximum of "
f"{MAX_NB_EMAIL_FREE_PLAN} aliases, please upgrade your plan to create more aliases"
),
400,
)
hostname = request.args.get("hostname")
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
alias_prefix = data.get("alias_prefix", "").strip()
signed_suffix = data.get("signed_suffix", "").strip()
note = data.get("note")
alias_prefix = convert_to_id(alias_prefix)
# hypothesis: user will click on the button in the 300 secs
try:
alias_suffix = signer.unsign(signed_suffix, max_age=300).decode()
except SignatureExpired:
LOG.error("Alias creation time expired")
return jsonify(error="alias creation is expired, please try again"), 400
except Exception:
LOG.error("Alias suffix is tampered, user %s", user)
return jsonify(error="Tampered suffix"), 400
if not verify_prefix_suffix(user, alias_prefix, alias_suffix):
return jsonify(error="wrong alias prefix or suffix"), 400
full_alias = alias_prefix + alias_suffix

View File

@ -123,6 +123,7 @@ DB_URI = os.environ["DB_URI"]
# Flask secret
FLASK_SECRET = os.environ["FLASK_SECRET"]
MAILBOX_SECRET = FLASK_SECRET + "mailbox"
CUSTOM_ALIAS_SECRET = FLASK_SECRET + "custom_alias"
# AWS
AWS_REGION = "eu-west-3"

View File

@ -42,7 +42,7 @@
<div class="col-sm-6 p-1">
<select class="form-control" name="suffix">
{% for suffix in suffixes %}
<option value="{{ suffix[1] }}">
<option value="{{ suffix[2] }}">
{% if suffix[0] %}
{{ suffix[1] }} (your domain)
{% else %}

View File

@ -122,7 +122,7 @@
<div class="mb-3">Choose how to create your email alias by default</div>
<form method="post" class="form-inline">
<input type="hidden" name="form-name" value="change-alias-generator">
<select class="mr-sm-2" name="alias-generator-scheme">
<select class="form-control mr-sm-2" name="alias-generator-scheme">
<option value="{{ AliasGeneratorEnum.word.value }}"
{% if current_user.alias_generator == AliasGeneratorEnum.word.value %} selected {% endif %} >Based on
Random {{ AliasGeneratorEnum.word.name.capitalize() }}</option>

View File

@ -1,14 +1,41 @@
from flask import render_template, redirect, url_for, flash, request
from flask_login import login_required, current_user
from itsdangerous import TimestampSigner, SignatureExpired
from app.config import DISABLE_ALIAS_SUFFIX, ALIAS_DOMAINS
from app.config import (
DISABLE_ALIAS_SUFFIX,
ALIAS_DOMAINS,
CUSTOM_ALIAS_SECRET,
)
from app.dashboard.base import dashboard_bp
from app.email_utils import email_belongs_to_alias_domains, get_email_domain_part
from app.extensions import db
from app.log import LOG
from app.models import Alias, CustomDomain, DeletedAlias, Mailbox
from app.models import Alias, CustomDomain, DeletedAlias, Mailbox, User
from app.utils import convert_to_id, random_word, word_exist
signer = TimestampSigner(CUSTOM_ALIAS_SECRET)
def available_suffixes(user: User) -> [bool, str, str]:
"""Return (is_custom_domain, alias-suffix, time-signed alias-suffix)"""
user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
# List of (is_custom_domain, alias-suffix, time-signed alias-suffix)
suffixes = []
# put custom domain first
for alias_domain in user_custom_domains:
suffix = "@" + alias_domain
suffixes.append((True, suffix, signer.sign(suffix).decode()))
# then default domain
for domain in ALIAS_DOMAINS:
suffix = ("" if DISABLE_ALIAS_SUFFIX else "." + random_word()) + "@" + domain
suffixes.append((False, suffix, signer.sign(suffix).decode()))
return suffixes
@dashboard_bp.route("/custom_alias", methods=["GET", "POST"])
@login_required
@ -24,27 +51,14 @@ def custom_alias():
return redirect(url_for("dashboard.index"))
user_custom_domains = [cd.domain for cd in current_user.verified_custom_domains()]
# List of (is_custom_domain, alias-suffix)
suffixes = []
# put custom domain first
for alias_domain in user_custom_domains:
suffixes.append((True, "@" + alias_domain))
# then default domain
for domain in ALIAS_DOMAINS:
suffixes.append(
(
False,
("" if DISABLE_ALIAS_SUFFIX else "." + random_word()) + "@" + domain,
)
)
# List of (is_custom_domain, alias-suffix, time-signed alias-suffix)
suffixes = available_suffixes(current_user)
mailboxes = [mb.email for mb in current_user.mailboxes()]
if request.method == "POST":
alias_prefix = request.form.get("prefix")
alias_suffix = request.form.get("suffix")
signed_suffix = request.form.get("suffix")
mailbox_email = request.form.get("mailbox")
alias_note = request.form.get("note")
@ -55,9 +69,19 @@ def custom_alias():
flash("Something went wrong, please retry", "warning")
return redirect(url_for("dashboard.custom_alias"))
if verify_prefix_suffix(
current_user, alias_prefix, alias_suffix, user_custom_domains
):
# hypothesis: user will click on the button in the 300 secs
try:
alias_suffix = signer.unsign(signed_suffix, max_age=300).decode()
except SignatureExpired:
LOG.error("Alias creation time expired")
flash("Alias creation time is expired, please retry", "warning")
return redirect(url_for("dashboard.custom_alias"))
except Exception:
LOG.error("Alias suffix is tampered, user %s", current_user)
flash("Unknown error, refresh the page", "error")
return redirect(url_for("dashboard.custom_alias"))
if verify_prefix_suffix(current_user, alias_prefix, alias_suffix):
full_alias = alias_prefix + alias_suffix
if Alias.get_by(email=full_alias) or DeletedAlias.get_by(email=full_alias):
@ -91,14 +115,20 @@ def custom_alias():
else:
flash("something went wrong", "warning")
return render_template("dashboard/custom_alias.html", **locals())
return render_template(
"dashboard/custom_alias.html",
user_custom_domains=user_custom_domains,
suffixes=suffixes,
mailboxes=mailboxes,
)
def verify_prefix_suffix(user, alias_prefix, alias_suffix, user_custom_domains) -> bool:
def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool:
"""verify if user could create an alias with the given prefix and suffix"""
if not alias_prefix or not alias_suffix: # should be caught on frontend
return False
user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
alias_prefix = alias_prefix.strip()
alias_prefix = convert_to_id(alias_prefix)

View File

@ -314,7 +314,9 @@ class User(db.Model, ModelMixin, UserMixin):
"""return suggested email and other email choices """
website_name = convert_to_id(website_name)
all_aliases = [ge.email for ge in Alias.filter_by(user_id=self.id)]
all_aliases = [
ge.email for ge in Alias.filter_by(user_id=self.id, enabled=True)
]
if self.can_create_new_alias():
suggested_alias = Alias.create_new(self, prefix=website_name).email
else:

View File

@ -82,7 +82,7 @@
<label style="padding-top: .5rem">Email</label>
</div>
<div class="col-md-9">
<select class="" name="suggested-email">
<select class="form-control" name="suggested-email">
<option selected value="{{ suggested_email }}">{{ suggested_email }}</option>
<option value="{{ current_user.email }}">{{ current_user.email }} (Personal Email)</option>
{% for email in other_emails %}
@ -108,7 +108,7 @@
style="padding-left: 5px">
<select class="form-control" name="suffix">
{% for suffix in suffixes %}
<option value="{{ suffix[1] }}">
<option value="{{ suffix[2] }}">
{% if suffix[0] %}
{{ suffix[1] }} (your domain)
{% else %}
@ -132,7 +132,7 @@
<label style="padding-top: .5rem">Name</label>
</div>
<div class="col-md-9">
<select class="" name="suggested-name">
<select class="form-control" name="suggested-name">
<option selected value="{{ suggested_name }}">{{ suggested_name }}</option>
{% for name in other_names %}
<option value="{{ name }}">{{ name }}</option>

View File

@ -3,10 +3,12 @@ from urllib.parse import urlparse
from flask import request, render_template, redirect, flash
from flask_login import current_user
from itsdangerous import SignatureExpired
from app.config import EMAIL_DOMAIN, ALIAS_DOMAINS, DISABLE_ALIAS_SUFFIX
from app.email_utils import get_email_domain_part
from app.extensions import db
from app.dashboard.views.custom_alias import available_suffixes, signer
from app.jose_utils import make_id_token
from app.log import LOG
from app.models import (
@ -109,23 +111,8 @@ def authorize():
user_custom_domains = [
cd.domain for cd in current_user.verified_custom_domains()
]
# List of (is_custom_domain, alias-suffix)
suffixes = []
# put custom domain first
for alias_domain in user_custom_domains:
suffixes.append((True, "@" + alias_domain))
# then default domain
for domain in ALIAS_DOMAINS:
suffixes.append(
(
False,
("" if DISABLE_ALIAS_SUFFIX else "." + random_word())
+ "@"
+ domain,
)
)
# List of (is_custom_domain, alias-suffix, time-signed alias-suffix)
suffixes = available_suffixes(current_user)
return render_template(
"oauth/authorize.html",
@ -155,7 +142,7 @@ def authorize():
LOG.d("user %s has already allowed client %s", current_user, client)
else:
alias_prefix = request.form.get("prefix")
alias_suffix = request.form.get("suffix")
signed_suffix = request.form.get("suffix")
alias = None
@ -165,15 +152,25 @@ def authorize():
if not current_user.can_create_new_alias():
raise Exception(f"User {current_user} cannot create custom email")
# hypothesis: user will click on the button in the 300 secs
try:
alias_suffix = signer.unsign(signed_suffix, max_age=300).decode()
except SignatureExpired:
LOG.error("Alias creation time expired")
flash("Alias creation time is expired, please retry", "warning")
return redirect(request.url)
except Exception:
LOG.error("Alias suffix is tampered, user %s", current_user)
flash("Unknown error, refresh the page", "error")
return redirect(request.url)
user_custom_domains = [
cd.domain for cd in current_user.verified_custom_domains()
]
from app.dashboard.views.custom_alias import verify_prefix_suffix
if verify_prefix_suffix(
current_user, alias_prefix, alias_suffix, user_custom_domains
):
if verify_prefix_suffix(current_user, alias_prefix, alias_suffix):
full_alias = alias_prefix + alias_suffix
if Alias.get_by(email=full_alias) or DeletedAlias.get_by(

89
oauth_tester.py Normal file
View File

@ -0,0 +1,89 @@
"""
This is an example on how to integrate SimpleLogin
with Requests-OAuthlib, a popular library to work with OAuth in Python.
The step-to-step guide can be found on https://docs.simplelogin.io
This example is based on
https://requests-oauthlib.readthedocs.io/en/latest/examples/real_world_example.html
"""
import os
from flask import Flask, request, redirect, session, url_for
from flask.json import jsonify
from requests_oauthlib import OAuth2Session
app = Flask(__name__)
# this demo uses flask.session that requires the `secret_key` to be set
app.secret_key = "very secret"
# "prettify" the returned json in /profile
app.config["JSONIFY_PRETTYPRINT_REGULAR"] = True
# This client credential is obtained upon registration of a new SimpleLogin App on
# https://app.simplelogin.io/developer/new_client
# Please make sure to export these credentials to env variables:
# export CLIENT_ID={your_client_id}
# export CLIENT_SECRET={your_client_secret}
client_id = os.environ.get("CLIENT_ID") or "client-id"
client_secret = os.environ.get("CLIENT_SECRET") or "client-secret"
# SimpleLogin urls
authorization_base_url = "http://localhost:7777/oauth2/authorize"
token_url = "http://localhost:7777/oauth2/token"
userinfo_url = "http://localhost:7777/oauth2/userinfo"
@app.route("/")
def demo():
"""Step 1: User Authorization.
Redirect the user/resource owner to the OAuth provider (i.e. SimpleLogin)
using an URL with a few key OAuth parameters.
"""
simplelogin = OAuth2Session(
client_id, redirect_uri="http://127.0.0.1:5000/callback"
)
authorization_url, state = simplelogin.authorization_url(authorization_base_url)
# State is used to prevent CSRF, keep this for later.
session["oauth_state"] = state
return redirect(authorization_url)
# Step 2: User authorization, this happens on the provider.
@app.route("/callback", methods=["GET"])
def callback():
""" Step 3: Retrieving an access token.
The user has been redirected back from the provider to your registered
callback URL. With this redirection comes an authorization code included
in the redirect URL. We will use that to obtain an access token.
"""
simplelogin = OAuth2Session(client_id, state=session["oauth_state"])
token = simplelogin.fetch_token(
token_url, client_secret=client_secret, authorization_response=request.url
)
# At this point you can fetch protected resources but lets save
# the token and show how this is done from a persisted token
# in /profile.
session["oauth_token"] = token
return redirect(url_for(".profile"))
@app.route("/profile", methods=["GET"])
def profile():
"""Fetching a protected resource using an OAuth 2 token.
"""
simplelogin = OAuth2Session(client_id, token=session["oauth_token"])
return jsonify(simplelogin.get(userinfo_url).json())
# This allows us to use a plain HTTP callback
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
if __name__ == "__main__":
app.secret_key = os.urandom(24)
app.run(debug=True)

View File

@ -143,3 +143,51 @@ def test_different_scenarios_v3(flask_client):
)
assert r.json["recommendation"]["alias"] == alias.email
assert r.json["recommendation"]["hostname"] == "www.test.com"
def test_different_scenarios_v4(flask_client):
user = User.create(
email="a@b.c", password="password", name="Test User", activated=True
)
db.session.commit()
# create api_key
api_key = ApiKey.create(user.id, "for test")
db.session.commit()
# <<< without hostname >>>
r = flask_client.get(
url_for("api.options_v4"), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert r.json["can_create"]
assert r.json["suffixes"]
assert r.json["prefix_suggestion"] == "" # no hostname => no suggestion
for (suffix, signed_suffix) in r.json["suffixes"]:
assert signed_suffix.startswith(suffix)
# <<< with hostname >>>
r = flask_client.get(
url_for("api.options_v4", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["prefix_suggestion"] == "test"
# <<< with recommendation >>>
alias = Alias.create_new(user, prefix="test")
db.session.commit()
AliasUsedOn.create(
alias_id=alias.id, hostname="www.test.com", user_id=alias.user_id
)
db.session.commit()
r = flask_client.get(
url_for("api.options_v4", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["recommendation"]["alias"] == alias.email
assert r.json["recommendation"]["hostname"] == "www.test.com"

View File

@ -1,6 +1,7 @@
from flask import url_for
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
from app.dashboard.views.custom_alias import signer
from app.extensions import db
from app.models import User, ApiKey, Alias
from app.utils import random_word
@ -98,3 +99,43 @@ def test_out_of_quota(flask_client):
assert r.json == {
"error": "You have reached the limitation of a free account with the maximum of 3 aliases, please upgrade your plan to create more aliases"
}
def test_success_v2(flask_client):
user = User.create(
email="a@b.c", password="password", name="Test User", activated=True
)
db.session.commit()
# create api_key
api_key = ApiKey.create(user.id, "for test")
db.session.commit()
# create new alias with note
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
suffix = signer.sign(suffix).decode()
r = flask_client.post(
url_for("api.new_custom_alias_v2", hostname="www.test.com"),
headers={"Authentication": api_key.code},
json={"alias_prefix": "prefix", "signed_suffix": suffix, "note": "test note",},
)
assert r.status_code == 201
assert r.json["alias"] == f"prefix.{word}@{EMAIL_DOMAIN}"
# assert returned field
res = r.json
assert "id" in res
assert "email" in res
assert "creation_date" in res
assert "creation_timestamp" in res
assert "nb_forward" in res
assert "nb_block" in res
assert "nb_reply" in res
assert "enabled" in res
assert "note" in res
new_ge = Alias.get_by(email=r.json["alias"])
assert new_ge.note == "test note"

View File

@ -1,8 +1,13 @@
from flask import url_for
from app.config import EMAIL_DOMAIN
from app.dashboard.views.custom_alias import (
signer,
verify_prefix_suffix,
available_suffixes,
)
from app.extensions import db
from app.models import Mailbox
from app.models import Mailbox, CustomDomain
from app.utils import random_word
from tests.utils import login
@ -12,14 +17,12 @@ def test_add_alias_success(flask_client):
db.session.commit()
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
suffix = signer.sign(suffix).decode()
r = flask_client.post(
url_for("dashboard.custom_alias"),
data={
"prefix": "prefix",
"suffix": f".{word}@{EMAIL_DOMAIN}",
"mailbox": user.email,
},
data={"prefix": "prefix", "suffix": suffix, "mailbox": user.email,},
follow_redirects=True,
)
@ -40,3 +43,32 @@ def test_not_show_unverified_mailbox(flask_client):
assert "m1@example.com" in str(r.data)
assert "m2@example.com" not in str(r.data)
def test_verify_prefix_suffix(flask_client):
user = login(flask_client)
db.session.commit()
CustomDomain.create(user_id=user.id, domain="test.com", verified=True)
assert verify_prefix_suffix(user, "prefix", "@test.com")
assert not verify_prefix_suffix(user, "prefix", "@abcd.com")
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
assert verify_prefix_suffix(user, "prefix", suffix)
def test_available_suffixes(flask_client):
user = login(flask_client)
db.session.commit()
CustomDomain.create(user_id=user.id, domain="test.com", verified=True)
assert len(available_suffixes(user)) > 0
# first suffix is custom domain
first_suffix = available_suffixes(user)[0]
assert first_suffix[0]
assert first_suffix[1] == "@test.com"
assert first_suffix[2].startswith("@test.com")

View File

@ -1,15 +1,25 @@
from app.extensions import db
from app.jose_utils import make_id_token, verify_id_token
from app.models import ClientUser
from server import fake_data
from app.models import ClientUser, User, Client
def test_encode_decode(flask_app):
with flask_app.app_context():
fake_data()
ClientUser.create(client_id=-1, user_id=-1)
user = User.create(
email="a@b.c", password="password", name="Test User", activated=True
)
db.session.commit()
jwt_token = make_id_token(ClientUser.get(1))
client1 = Client.create_new(name="Demo", user_id=user.id)
client1.oauth_client_id = "client-id"
client1.oauth_client_secret = "client-secret"
client1.published = True
db.session.commit()
client_user = ClientUser.create(client_id=client1.id, user_id=user.id)
db.session.commit()
jwt_token = make_id_token(client_user)
assert type(jwt_token) is str
assert verify_id_token(jwt_token)