add support for response_mode(query and fragment), nonce

This commit is contained in:
Son NK 2019-08-11 12:03:42 +02:00
parent e563dff496
commit 78e274addf
5 changed files with 129 additions and 37 deletions

View File

@ -1,3 +1,7 @@
import base64
import hashlib
from typing import Optional
import arrow
from jwcrypto import jwk, jwt
@ -6,14 +10,19 @@ from app.log import LOG
from app.models import ClientUser
with open(OPENID_PRIVATE_KEY_PATH, "rb") as f:
key = jwk.JWK.from_pem(f.read())
_key = jwk.JWK.from_pem(f.read())
def get_jwk_key() -> dict:
return key._public_params()
return _key._public_params()
def make_id_token(client_user: ClientUser):
def make_id_token(
client_user: ClientUser,
nonce: Optional[str] = None,
access_token: Optional[str] = None,
code: Optional[str] = None,
):
"""Make id_token for OpenID Connect
According to RFC 7519, these claims are mandatory:
- iss
@ -31,20 +40,42 @@ def make_id_token(client_user: ClientUser):
"auth_time": arrow.now().timestamp,
}
if nonce:
claims["nonce"] = nonce
if access_token:
claims["at_hash"] = id_token_hash(access_token)
if code:
claims["c_hash"] = id_token_hash(code)
claims = {**claims, **client_user.get_user_info()}
jwt_token = jwt.JWT(
header={"alg": "RS256", "kid": key._public_params()["kid"]}, claims=claims
header={"alg": "RS256", "kid": _key._public_params()["kid"]}, claims=claims
)
jwt_token.make_signed_token(key)
jwt_token.make_signed_token(_key)
return jwt_token.serialize()
def verify_id_token(id_token) -> bool:
try:
jwt.JWT(key=key, jwt=id_token)
jwt.JWT(key=_key, jwt=id_token)
except Exception:
LOG.exception("id token not verified")
return False
else:
return True
def decode_id_token(id_token) -> jwt.JWT:
return jwt.JWT(key=_key, jwt=id_token)
def id_token_hash(value, hashfunc=hashlib.sha256):
"""
Inspired from oauthlib
"""
digest = hashfunc(value.encode()).digest()
left_most = len(digest) // 2
return base64.urlsafe_b64encode(digest[:left_most]).decode().rstrip("=")

View File

@ -18,7 +18,14 @@ from app.models import (
OauthToken,
)
from app.oauth.base import oauth_bp
from app.oauth_models import get_response_types, ResponseType, Scope
from app.oauth_models import (
get_response_types,
ResponseType,
Scope,
SUPPORTED_OPENID_FLOWS,
SUPPORTED_OPENID_FLOWS_STR,
response_types_to_str,
)
from app.utils import random_string, encode_url, convert_to_id
@ -35,6 +42,8 @@ def authorize():
state = request.args.get("state")
scope = request.args.get("scope")
redirect_uri = request.args.get("redirect_uri")
response_mode = request.args.get("response_mode")
nonce = request.args.get("nonce")
try:
response_types: [ResponseType] = get_response_types(request)
@ -45,6 +54,12 @@ def authorize():
400,
)
if set(response_types) not in SUPPORTED_OPENID_FLOWS:
return (
f"SimpleLogin only support the following OIDC flows: {SUPPORTED_OPENID_FLOWS_STR}",
400,
)
if not redirect_uri:
LOG.d("no redirect uri")
return "redirect_uri must be set", 400
@ -191,36 +206,58 @@ def authorize():
if scope:
redirect_args["scope"] = scope
for response_type in response_types:
if response_type == ResponseType.CODE:
# Create authorization code
auth_code = AuthorizationCode.create(
client_id=client.id,
user_id=current_user.id,
code=random_string(),
scope=scope,
redirect_uri=redirect_uri,
)
db.session.add(auth_code)
redirect_args["code"] = auth_code.code
elif response_type == ResponseType.TOKEN:
# create access-token
oauth_token = OauthToken.create(
client_id=client.id,
user_id=current_user.id,
scope=scope,
redirect_uri=redirect_uri,
access_token=generate_access_token(),
)
db.session.add(oauth_token)
redirect_args["access_token"] = oauth_token.access_token
elif response_type == ResponseType.ID_TOKEN:
redirect_args["id_token"] = make_id_token(client_user)
auth_code = None
if ResponseType.CODE in response_types:
# Create authorization code
auth_code = AuthorizationCode.create(
client_id=client.id,
user_id=current_user.id,
code=random_string(),
scope=scope,
redirect_uri=redirect_uri,
response_type=response_types_to_str(response_types),
)
db.session.add(auth_code)
redirect_args["code"] = auth_code.code
oauth_token = None
if ResponseType.TOKEN in response_types:
# create access-token
oauth_token = OauthToken.create(
client_id=client.id,
user_id=current_user.id,
scope=scope,
redirect_uri=redirect_uri,
access_token=generate_access_token(),
response_type=response_types_to_str(response_types),
)
db.session.add(oauth_token)
redirect_args["access_token"] = oauth_token.access_token
if ResponseType.ID_TOKEN in response_types:
redirect_args["id_token"] = make_id_token(
client_user,
nonce,
oauth_token.access_token if oauth_token else None,
auth_code.code if auth_code else None,
)
db.session.commit()
# should all params appended the url using fragment (#) or query
fragment = False
if response_mode and response_mode == "fragment":
fragment = True
# if response_types contain "token" => implicit flow => should use fragment
# except if client sets explicitly response_mode
if not response_mode:
if ResponseType.TOKEN in response_types:
fragment = True
# construct redirect_uri with redirect_args
return redirect(construct_url(redirect_uri, redirect_args))
return redirect(construct_url(redirect_uri, redirect_args, fragment))
def create_or_choose_gen_email(user) -> GenEmail:
@ -238,13 +275,16 @@ def create_or_choose_gen_email(user) -> GenEmail:
return gen_email
def construct_url(url, args: Dict[str, str]):
def construct_url(url, args: Dict[str, str], fragment: bool = False):
for i, (k, v) in enumerate(args.items()):
# make sure to escape v
v = encode_url(v)
if i == 0:
url += f"?{k}={v}"
if fragment:
url += f"#{k}={v}"
else:
url += f"?{k}={v}"
else:
url += f"&{k}={v}"

View File

@ -7,7 +7,7 @@ from app.log import LOG
from app.models import Client, AuthorizationCode, OauthToken, ClientUser
from app.oauth.base import oauth_bp
from app.oauth.views.authorize import generate_access_token
from app.oauth_models import Scope
from app.oauth_models import Scope, get_response_types_from_str, ResponseType
@oauth_bp.route("/token", methods=["POST"])
@ -62,6 +62,7 @@ def token():
scope=auth_code.scope,
redirect_uri=auth_code.redirect_uri,
access_token=generate_access_token(),
response_type=auth_code.response_type,
)
db.session.add(oauth_token)
@ -87,4 +88,10 @@ def token():
if oauth_token.scope and Scope.OPENID.value in oauth_token.scope:
res["id_token"] = make_id_token(client_user)
# Also return id_token if the initial flow is "code,id_token"
# cf https://medium.com/@darutk/diagrams-of-all-the-openid-connect-flows-6968e3990660
response_types = get_response_types_from_str(auth_code.response_type)
if ResponseType.ID_TOKEN in response_types:
res["id_token"] = make_id_token(client_user)
return jsonify(res)

View File

@ -17,6 +17,20 @@ class ResponseType(enum.Enum):
ID_TOKEN = "id_token"
# All the OIDC flows supported by SimpleLogin
# CF https://medium.com/@darutk/diagrams-of-all-the-openid-connect-flows-6968e3990660
SUPPORTED_OPENID_FLOWS = [
{ResponseType.CODE},
{ResponseType.TOKEN},
{ResponseType.ID_TOKEN},
{ResponseType.ID_TOKEN, ResponseType.TOKEN},
{ResponseType.ID_TOKEN, ResponseType.CODE},
]
# String form of SUPPORTED_OPENID_FLOWS
SUPPORTED_OPENID_FLOWS_STR = "code|token|id_token|id_token,token|id_token,code"
def get_scopes(request: flask.Request) -> Set[Scope]:
scope_strs = _split_arg(request.args.getlist("scope"))

View File

@ -615,4 +615,4 @@ def test_authorize_code_id_token_flow(flask_client):
assert r.json["id_token"]
# id_token must be a valid, correctly signed JWT
assert verify_id_token(r.json["id_token"])
assert verify_id_token(r.json["id_token"])