mirror of
https://github.com/simple-login/app.git
synced 2024-11-10 21:27:10 +01:00
686 lines
20 KiB
Python
686 lines
20 KiB
Python
import base64
|
|
import json
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
from flask import url_for
|
|
|
|
from app.extensions import db
|
|
from app.jose_utils import verify_id_token, decode_id_token
|
|
from app.models import Client, User
|
|
from app.oauth.views.authorize import (
|
|
get_host_name_and_scheme,
|
|
generate_access_token,
|
|
construct_url,
|
|
)
|
|
from tests.utils import login
|
|
|
|
|
|
def test_get_host_name_and_scheme():
|
|
assert get_host_name_and_scheme("http://localhost:8000?a=b") == (
|
|
"localhost",
|
|
"http",
|
|
)
|
|
|
|
assert get_host_name_and_scheme(
|
|
"https://www.bubblecode.net/en/2016/01/22/understanding-oauth2/#Implicit_Grant"
|
|
) == ("www.bubblecode.net", "https")
|
|
|
|
|
|
def test_generate_access_token(flask_client):
|
|
access_token = generate_access_token()
|
|
assert len(access_token) == 40
|
|
|
|
|
|
def test_construct_url():
|
|
url = construct_url("http://ab.cd", {"x": "1 2"})
|
|
assert url == "http://ab.cd?x=1%202"
|
|
|
|
|
|
def test_authorize_page_non_login_user(flask_client):
|
|
"""make sure to display login page for non-authenticated user"""
|
|
user = User.create("test@test.com", "test user")
|
|
db.session.commit()
|
|
|
|
client = Client.create_new("test client", user.id)
|
|
db.session.commit()
|
|
|
|
r = flask_client.get(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="code",
|
|
)
|
|
)
|
|
|
|
html = r.get_data(as_text=True)
|
|
assert r.status_code == 200
|
|
assert "In order to accept the request, you need to sign in" in html
|
|
|
|
|
|
def test_authorize_page_login_user_non_supported_flow(flask_client):
|
|
"""return 400 if the flow is not supported"""
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
db.session.commit()
|
|
|
|
# Not provide any flow
|
|
r = flask_client.get(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
# not provide response_type param here
|
|
)
|
|
)
|
|
|
|
# Provide a not supported flow
|
|
html = r.get_data(as_text=True)
|
|
assert r.status_code == 400
|
|
assert "SimpleLogin only support the following OIDC flows" in html
|
|
|
|
r = flask_client.get(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
# SL does not support this flow combination
|
|
response_type="code token id_token",
|
|
)
|
|
)
|
|
|
|
html = r.get_data(as_text=True)
|
|
assert r.status_code == 400
|
|
assert "SimpleLogin only support the following OIDC flows" in html
|
|
|
|
|
|
def test_authorize_page_login_user(flask_client):
|
|
"""make sure to display authorization page for authenticated user"""
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
r = flask_client.get(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="code",
|
|
)
|
|
)
|
|
|
|
html = r.get_data(as_text=True)
|
|
assert r.status_code == 200
|
|
assert "You can customize the info sent to this app" in html
|
|
assert "a@b.c (Personal Email)" in html
|
|
|
|
|
|
def test_authorize_code_flow_no_openid_scope(flask_client):
|
|
"""make sure the authorize redirects user to correct page for the *Code Flow*
|
|
and when the *openid* scope is not present
|
|
, ie when response_type=code, openid not in scope
|
|
"""
|
|
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
# user allows client on the authorization page
|
|
r = flask_client.post(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="code",
|
|
),
|
|
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
|
|
# user will be redirected to client page, do not allow redirection here
|
|
# to assert the redirect url
|
|
# follow_redirects=True,
|
|
)
|
|
|
|
assert r.status_code == 302 # user gets redirected back to client page
|
|
|
|
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
|
|
o = urlparse(r.location)
|
|
assert o.netloc == "localhost"
|
|
assert not o.fragment
|
|
|
|
# parse the query, should return something like
|
|
# {'state': ['teststate'], 'code': ['knuyjepwvg']}
|
|
queries = parse_qs(o.query)
|
|
assert len(queries) == 2
|
|
|
|
assert queries["state"] == ["teststate"]
|
|
assert len(queries["code"]) == 1
|
|
|
|
# Exchange the code to get access_token
|
|
basic_auth_headers = base64.b64encode(
|
|
f"{client.oauth_client_id}:{client.oauth_client_secret}".encode()
|
|
).decode("utf-8")
|
|
|
|
r = flask_client.post(
|
|
url_for("oauth.token"),
|
|
headers={"Authorization": "Basic " + basic_auth_headers},
|
|
data={"grant_type": "authorization_code", "code": queries["code"][0]},
|
|
)
|
|
|
|
# r.json should have this format
|
|
# {
|
|
# 'access_token': 'avmhluhonsouhcwwailydwvhankspptgidoggcbu',
|
|
# 'expires_in': 3600,
|
|
# 'scope': '',
|
|
# 'token_type': 'bearer',
|
|
# 'user': {
|
|
# 'avatar_url': None,
|
|
# 'client': 'test client',
|
|
# 'email': 'x@y.z',
|
|
# 'email_verified': True,
|
|
# 'id': 1,
|
|
# 'name': 'AB CD'
|
|
# }
|
|
# }
|
|
assert r.status_code == 200
|
|
assert r.json["access_token"]
|
|
assert r.json["expires_in"] == 3600
|
|
assert not r.json["scope"]
|
|
assert r.json["token_type"] == "Bearer"
|
|
|
|
assert r.json["user"] == {
|
|
"avatar_url": None,
|
|
"client": "test client",
|
|
"email": "x@y.z",
|
|
"email_verified": True,
|
|
"id": 1,
|
|
"name": "AB CD",
|
|
"sub": "1",
|
|
}
|
|
|
|
|
|
def test_authorize_code_flow_with_openid_scope(flask_client):
|
|
"""make sure the authorize redirects user to correct page for the *Code Flow*
|
|
and when the *openid* scope is present
|
|
, ie when response_type=code, openid in scope
|
|
|
|
The authorize endpoint should stay the same: return the *code*.
|
|
The token endpoint however should now return id_token in addition to the access_token
|
|
"""
|
|
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
# user allows client on the authorization page
|
|
r = flask_client.post(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="code",
|
|
scope="openid", # openid is in scope
|
|
),
|
|
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
|
|
# user will be redirected to client page, do not allow redirection here
|
|
# to assert the redirect url
|
|
# follow_redirects=True,
|
|
)
|
|
|
|
assert r.status_code == 302 # user gets redirected back to client page
|
|
|
|
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
|
|
o = urlparse(r.location)
|
|
assert o.netloc == "localhost"
|
|
assert not o.fragment
|
|
|
|
# parse the query, should return something like
|
|
# {'state': ['teststate'], 'code': ['knuyjepwvg'], 'scope': ["openid"]}
|
|
queries = parse_qs(o.query)
|
|
assert len(queries) == 3
|
|
|
|
assert queries["state"] == ["teststate"]
|
|
assert len(queries["code"]) == 1
|
|
|
|
# Exchange the code to get access_token
|
|
basic_auth_headers = base64.b64encode(
|
|
f"{client.oauth_client_id}:{client.oauth_client_secret}".encode()
|
|
).decode("utf-8")
|
|
|
|
r = flask_client.post(
|
|
url_for("oauth.token"),
|
|
headers={"Authorization": "Basic " + basic_auth_headers},
|
|
data={"grant_type": "authorization_code", "code": queries["code"][0]},
|
|
)
|
|
|
|
# r.json should have this format
|
|
# {
|
|
# 'access_token': 'avmhluhonsouhcwwailydwvhankspptgidoggcbu',
|
|
# 'expires_in': 3600,
|
|
# 'scope': '',
|
|
# 'token_type': 'bearer',
|
|
# 'user': {
|
|
# 'avatar_url': None,
|
|
# 'client': 'test client',
|
|
# 'email': 'x@y.z',
|
|
# 'email_verified': True,
|
|
# 'id': 1,
|
|
# 'name': 'AB CD'
|
|
# }
|
|
# }
|
|
assert r.status_code == 200
|
|
assert r.json["access_token"]
|
|
assert r.json["expires_in"] == 3600
|
|
assert r.json["scope"] == "openid"
|
|
assert r.json["token_type"] == "Bearer"
|
|
|
|
assert r.json["user"] == {
|
|
"avatar_url": None,
|
|
"client": "test client",
|
|
"email": "x@y.z",
|
|
"email_verified": True,
|
|
"id": 1,
|
|
"name": "AB CD",
|
|
"sub": "1",
|
|
}
|
|
|
|
# id_token must be returned
|
|
assert r.json["id_token"]
|
|
|
|
# id_token must be a valid, correctly signed JWT
|
|
assert verify_id_token(r.json["id_token"])
|
|
|
|
|
|
def test_authorize_token_flow(flask_client):
|
|
"""make sure the authorize redirects user to correct page for the *Token Flow*
|
|
, ie when response_type=token
|
|
The /authorize endpoint should return an access_token
|
|
"""
|
|
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
# user allows client on the authorization page
|
|
r = flask_client.post(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="token", # token flow
|
|
),
|
|
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
|
|
# user will be redirected to client page, do not allow redirection here
|
|
# to assert the redirect url
|
|
# follow_redirects=True,
|
|
)
|
|
|
|
assert r.status_code == 302 # user gets redirected back to client page
|
|
|
|
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
|
|
o = urlparse(r.location)
|
|
assert o.netloc == "localhost"
|
|
|
|
# in token flow, access_token is in fragment and not query
|
|
assert o.fragment
|
|
assert not o.query
|
|
|
|
# parse the fragment, should return something like
|
|
# {'state': ['teststate'], 'access_token': ['knuyjepwvg']}
|
|
queries = parse_qs(o.fragment)
|
|
assert len(queries) == 2
|
|
|
|
assert queries["state"] == ["teststate"]
|
|
|
|
# access_token must be returned
|
|
assert len(queries["access_token"]) == 1
|
|
|
|
|
|
def test_authorize_id_token_flow(flask_client):
|
|
"""make sure the authorize redirects user to correct page for the *ID-Token Flow*
|
|
, ie when response_type=id_token
|
|
The /authorize endpoint should return an id_token
|
|
"""
|
|
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
# user allows client on the authorization page
|
|
r = flask_client.post(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="id_token", # id_token flow
|
|
),
|
|
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
|
|
# user will be redirected to client page, do not allow redirection here
|
|
# to assert the redirect url
|
|
# follow_redirects=True,
|
|
)
|
|
|
|
assert r.status_code == 302 # user gets redirected back to client page
|
|
|
|
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
|
|
o = urlparse(r.location)
|
|
assert o.netloc == "localhost"
|
|
assert not o.fragment
|
|
assert o.query
|
|
|
|
# parse the fragment, should return something like
|
|
# {'state': ['teststate'], 'id_token': ['knuyjepwvg']}
|
|
queries = parse_qs(o.query)
|
|
assert len(queries) == 2
|
|
|
|
assert queries["state"] == ["teststate"]
|
|
|
|
# access_token must be returned
|
|
assert len(queries["id_token"]) == 1
|
|
|
|
# id_token must be a valid, correctly signed JWT
|
|
assert verify_id_token(queries["id_token"][0])
|
|
|
|
|
|
def test_authorize_token_id_token_flow(flask_client):
|
|
"""make sure the authorize redirects user to correct page for the *ID-Token Token Flow*
|
|
, ie when response_type=id_token,token
|
|
The /authorize endpoint should return an id_token and access_token
|
|
id_token, once decoded, should contain *at_hash* in payload
|
|
"""
|
|
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
# user allows client on the authorization page
|
|
r = flask_client.post(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="id_token token", # id_token,token flow
|
|
),
|
|
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
|
|
# user will be redirected to client page, do not allow redirection here
|
|
# to assert the redirect url
|
|
# follow_redirects=True,
|
|
)
|
|
|
|
assert r.status_code == 302 # user gets redirected back to client page
|
|
|
|
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
|
|
o = urlparse(r.location)
|
|
assert o.netloc == "localhost"
|
|
assert o.fragment
|
|
assert not o.query
|
|
|
|
# parse the fragment, should return something like
|
|
# {'state': ['teststate'], 'id_token': ['knuyjepwvg']}
|
|
queries = parse_qs(o.fragment)
|
|
assert len(queries) == 3
|
|
|
|
assert queries["state"] == ["teststate"]
|
|
|
|
# access_token must be returned
|
|
assert len(queries["id_token"]) == 1
|
|
assert len(queries["access_token"]) == 1
|
|
|
|
# id_token must be a valid, correctly signed JWT
|
|
id_token = queries["id_token"][0]
|
|
assert verify_id_token(id_token)
|
|
|
|
# make sure jwt has all the necessary fields
|
|
jwt = decode_id_token(id_token)
|
|
|
|
# payload should have this format
|
|
# {
|
|
# 'at_hash': 'jLDmoGpuOIHwxeyFEe9SKw',
|
|
# 'aud': 'testclient-sywcpwsyua',
|
|
# 'auth_time': 1565450736,
|
|
# 'avatar_url': None,
|
|
# 'client': 'test client',
|
|
# 'email': 'x@y.z',
|
|
# 'email_verified': True,
|
|
# 'exp': 1565454336,
|
|
# 'iat': 1565450736,
|
|
# 'id': 1,
|
|
# 'iss': 'http://localhost',
|
|
# 'name': 'AB CD',
|
|
# 'sub': '1'
|
|
# }
|
|
payload = json.loads(jwt.claims)
|
|
|
|
# at_hash MUST be present when the flow is id_token,token
|
|
assert "at_hash" in payload
|
|
|
|
assert "aud" in payload
|
|
assert "auth_time" in payload
|
|
assert "avatar_url" in payload
|
|
assert "client" in payload
|
|
assert "email" in payload
|
|
assert "email_verified" in payload
|
|
assert "exp" in payload
|
|
assert "iat" in payload
|
|
assert "id" in payload
|
|
assert "iss" in payload
|
|
assert "name" in payload
|
|
assert "sub" in payload
|
|
|
|
|
|
def test_authorize_code_id_token_flow(flask_client):
|
|
"""make sure the authorize redirects user to correct page for the *ID-Token Code Flow*
|
|
, ie when response_type=id_token,code
|
|
The /authorize endpoint should return an id_token, code and id_token must contain *c_hash*
|
|
|
|
The /token endpoint must return a access_token and an id_token
|
|
|
|
"""
|
|
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
# user allows client on the authorization page
|
|
r = flask_client.post(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="id_token code", # id_token,code flow
|
|
),
|
|
data={"button": "allow", "suggested-email": "x@y.z", "suggested-name": "AB CD"},
|
|
# user will be redirected to client page, do not allow redirection here
|
|
# to assert the redirect url
|
|
# follow_redirects=True,
|
|
)
|
|
|
|
assert r.status_code == 302 # user gets redirected back to client page
|
|
|
|
# r.location will have this form http://localhost?state=teststate&code=knuyjepwvg
|
|
o = urlparse(r.location)
|
|
assert o.netloc == "localhost"
|
|
assert not o.fragment
|
|
assert o.query
|
|
|
|
# parse the query, should return something like
|
|
# {'state': ['teststate'], 'id_token': ['knuyjepwvg'], 'code': ['longstring']}
|
|
queries = parse_qs(o.query)
|
|
assert len(queries) == 3
|
|
|
|
assert queries["state"] == ["teststate"]
|
|
|
|
assert len(queries["id_token"]) == 1
|
|
assert len(queries["code"]) == 1
|
|
|
|
# id_token must be a valid, correctly signed JWT
|
|
id_token = queries["id_token"][0]
|
|
assert verify_id_token(id_token)
|
|
|
|
# make sure jwt has all the necessary fields
|
|
jwt = decode_id_token(id_token)
|
|
|
|
# payload should have this format
|
|
# {
|
|
# 'at_hash': 'jLDmoGpuOIHwxeyFEe9SKw',
|
|
# 'aud': 'testclient-sywcpwsyua',
|
|
# 'auth_time': 1565450736,
|
|
# 'avatar_url': None,
|
|
# 'client': 'test client',
|
|
# 'email': 'x@y.z',
|
|
# 'email_verified': True,
|
|
# 'exp': 1565454336,
|
|
# 'iat': 1565450736,
|
|
# 'id': 1,
|
|
# 'iss': 'http://localhost',
|
|
# 'name': 'AB CD',
|
|
# 'sub': '1'
|
|
# }
|
|
payload = json.loads(jwt.claims)
|
|
|
|
# at_hash MUST be present when the flow is id_token,token
|
|
assert "c_hash" in payload
|
|
|
|
assert "aud" in payload
|
|
assert "auth_time" in payload
|
|
assert "avatar_url" in payload
|
|
assert "client" in payload
|
|
assert "email" in payload
|
|
assert "email_verified" in payload
|
|
assert "exp" in payload
|
|
assert "iat" in payload
|
|
assert "id" in payload
|
|
assert "iss" in payload
|
|
assert "name" in payload
|
|
assert "sub" in payload
|
|
|
|
# <<< Exchange the code to get access_token >>>
|
|
basic_auth_headers = base64.b64encode(
|
|
f"{client.oauth_client_id}:{client.oauth_client_secret}".encode()
|
|
).decode("utf-8")
|
|
|
|
r = flask_client.post(
|
|
url_for("oauth.token"),
|
|
headers={"Authorization": "Basic " + basic_auth_headers},
|
|
data={"grant_type": "authorization_code", "code": queries["code"][0]},
|
|
)
|
|
|
|
# r.json should have this format
|
|
# {
|
|
# 'access_token': 'avmhluhonsouhcwwailydwvhankspptgidoggcbu',
|
|
# 'id_token': 'ab.cd.xy',
|
|
# 'expires_in': 3600,
|
|
# 'scope': '',
|
|
# 'token_type': 'bearer',
|
|
# 'user': {
|
|
# 'avatar_url': None,
|
|
# 'client': 'test client',
|
|
# 'email': 'x@y.z',
|
|
# 'email_verified': True,
|
|
# 'id': 1,
|
|
# 'name': 'AB CD'
|
|
# }
|
|
# }
|
|
assert r.status_code == 200
|
|
assert r.json["access_token"]
|
|
assert r.json["expires_in"] == 3600
|
|
assert not r.json["scope"]
|
|
assert r.json["token_type"] == "Bearer"
|
|
|
|
assert r.json["user"] == {
|
|
"avatar_url": None,
|
|
"client": "test client",
|
|
"email": "x@y.z",
|
|
"email_verified": True,
|
|
"id": 1,
|
|
"name": "AB CD",
|
|
"sub": "1",
|
|
}
|
|
|
|
# id_token must be returned
|
|
assert r.json["id_token"]
|
|
|
|
# id_token must be a valid, correctly signed JWT
|
|
assert verify_id_token(r.json["id_token"])
|
|
|
|
|
|
def test_authorize_page_invalid_client_id(flask_client):
|
|
"""make sure to redirect user to redirect_url?error=invalid_client_id"""
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
r = flask_client.get(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id="invalid_client_id",
|
|
state="teststate",
|
|
redirect_uri="http://localhost",
|
|
response_type="code",
|
|
)
|
|
)
|
|
|
|
assert r.status_code == 302
|
|
assert (
|
|
r.location
|
|
== "http://localhost?error=invalid_client_id&client_id=invalid_client_id"
|
|
)
|
|
|
|
|
|
def test_authorize_page_http_not_allowed(flask_client):
|
|
"""make sure to redirect user to redirect_url?error=http_not_allowed"""
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
r = flask_client.get(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="http://mywebsite.com",
|
|
response_type="code",
|
|
)
|
|
)
|
|
|
|
assert r.status_code == 302
|
|
assert r.location == "http://mywebsite.com?error=http_not_allowed"
|
|
|
|
|
|
def test_authorize_page_unknown_redirect_uri(flask_client):
|
|
"""make sure to redirect user to redirect_url?error=unknown_redirect_uri"""
|
|
user = login(flask_client)
|
|
client = Client.create_new("test client", user.id)
|
|
|
|
db.session.commit()
|
|
|
|
r = flask_client.get(
|
|
url_for(
|
|
"oauth.authorize",
|
|
client_id=client.oauth_client_id,
|
|
state="teststate",
|
|
redirect_uri="https://unknown.com",
|
|
response_type="code",
|
|
)
|
|
)
|
|
|
|
assert r.status_code == 302
|
|
assert r.location == "https://unknown.com?error=unknown_redirect_uri"
|