mirror of
https://github.com/simple-login/app.git
synced 2024-11-13 07:31:12 +01:00
131 lines
4 KiB
Python
131 lines
4 KiB
Python
from flask import request, session, redirect, flash, url_for
|
|
from requests_oauthlib import OAuth2Session
|
|
|
|
from app import config
|
|
from app.auth.base import auth_bp
|
|
from app.auth.views.login_utils import after_login
|
|
from app.config import (
|
|
URL,
|
|
OIDC_AUTHORIZATION_URL,
|
|
OIDC_USER_INFO_URL,
|
|
OIDC_TOKEN_URL,
|
|
OIDC_SCOPES,
|
|
OIDC_NAME_FIELD,
|
|
)
|
|
from app.db import Session
|
|
from app.email_utils import send_welcome_email
|
|
from app.log import LOG
|
|
from app.models import User, SocialAuth
|
|
from app.utils import encode_url, sanitize_email, sanitize_next_url
|
|
|
|
|
|
# need to set explicitly redirect_uri instead of leaving the lib to pre-fill redirect_uri
|
|
# when served behind nginx, the redirect_uri is localhost... and not the real url
|
|
_redirect_uri = URL + "/auth/oidc/callback"
|
|
|
|
SESSION_STATE_KEY = "oauth_state"
|
|
|
|
|
|
@auth_bp.route("/oidc/login")
|
|
def oidc_login():
|
|
if config.OIDC_CLIENT_ID is None or config.OIDC_CLIENT_SECRET is None:
|
|
return redirect(url_for("auth.login"))
|
|
|
|
next_url = sanitize_next_url(request.args.get("next"))
|
|
if next_url:
|
|
redirect_uri = _redirect_uri + "?next=" + encode_url(next_url)
|
|
else:
|
|
redirect_uri = _redirect_uri
|
|
|
|
oidc = OAuth2Session(
|
|
config.OIDC_CLIENT_ID, scope=[OIDC_SCOPES], redirect_uri=redirect_uri
|
|
)
|
|
authorization_url, state = oidc.authorization_url(OIDC_AUTHORIZATION_URL)
|
|
|
|
# State is used to prevent CSRF, keep this for later.
|
|
session[SESSION_STATE_KEY] = state
|
|
return redirect(authorization_url)
|
|
|
|
|
|
@auth_bp.route("/oidc/callback")
|
|
def oidc_callback():
|
|
if SESSION_STATE_KEY not in session:
|
|
flash("Invalid state, please retry", "error")
|
|
return redirect(url_for("auth.login"))
|
|
if config.OIDC_CLIENT_ID is None or config.OIDC_CLIENT_SECRET is None:
|
|
return redirect(url_for("auth.login"))
|
|
|
|
# user clicks on cancel
|
|
if "error" in request.args:
|
|
flash("Please use another sign in method then", "warning")
|
|
return redirect("/")
|
|
|
|
oidc = OAuth2Session(
|
|
config.OIDC_CLIENT_ID,
|
|
state=session[SESSION_STATE_KEY],
|
|
scope=[OIDC_SCOPES],
|
|
redirect_uri=_redirect_uri,
|
|
)
|
|
oidc.fetch_token(
|
|
OIDC_TOKEN_URL,
|
|
client_secret=config.OIDC_CLIENT_SECRET,
|
|
authorization_response=request.url,
|
|
)
|
|
|
|
oidc_user_data = oidc.get(OIDC_USER_INFO_URL)
|
|
if oidc_user_data.status_code != 200:
|
|
LOG.e(
|
|
f"cannot get oidc user data {oidc_user_data.status_code} {oidc_user_data.text}"
|
|
)
|
|
flash(
|
|
"Cannot get user data from OIDC, please use another way to login/sign up",
|
|
"error",
|
|
)
|
|
return redirect(url_for("auth.login"))
|
|
oidc_user_data = oidc_user_data.json()
|
|
|
|
email = oidc_user_data.get("email")
|
|
|
|
if not email:
|
|
LOG.e(f"cannot get email for OIDC user {oidc_user_data} {email}")
|
|
flash(
|
|
"Cannot get a valid email from OIDC, please another way to login/sign up",
|
|
"error",
|
|
)
|
|
return redirect(url_for("auth.login"))
|
|
|
|
email = sanitize_email(email)
|
|
user = User.get_by(email=email)
|
|
|
|
if not user and config.DISABLE_REGISTRATION:
|
|
flash(
|
|
"Sorry you cannot sign up via the OIDC provider. Please sign-up first with your email.",
|
|
"error",
|
|
)
|
|
return redirect(url_for("auth.register"))
|
|
elif not user:
|
|
user = create_user(email, oidc_user_data)
|
|
|
|
if not SocialAuth.get_by(user_id=user.id, social="oidc"):
|
|
SocialAuth.create(user_id=user.id, social="oidc")
|
|
Session.commit()
|
|
|
|
# The activation link contains the original page, for ex authorize page
|
|
next_url = sanitize_next_url(request.args.get("next")) if request.args else None
|
|
|
|
return after_login(user, next_url)
|
|
|
|
|
|
def create_user(email, oidc_user_data):
|
|
new_user = User.create(
|
|
email=email,
|
|
name=oidc_user_data.get(OIDC_NAME_FIELD),
|
|
password="",
|
|
activated=True,
|
|
)
|
|
LOG.i(f"Created new user for login request from OIDC. New user {new_user.id}")
|
|
Session.commit()
|
|
|
|
send_welcome_email(new_user)
|
|
|
|
return new_user
|