app-MAIL-temp/app/auth/views/oidc.py

136 lines
4.2 KiB
Python
Raw Normal View History

2024-03-13 14:30:00 +01:00
from flask import request, session, redirect, flash, url_for
from requests_oauthlib import OAuth2Session
2024-05-02 16:17:10 +02:00
import requests
2024-03-13 14:30:00 +01:00
from app import config
from app.auth.base import auth_bp
from app.auth.views.login_utils import after_login
from app.config import (
URL,
OIDC_SCOPES,
OIDC_NAME_FIELD,
)
from app.db import Session
from app.email_utils import send_welcome_email
from app.log import LOG
from app.models import User, SocialAuth
2024-05-02 16:17:10 +02:00
from app.utils import sanitize_email, sanitize_next_url
2024-03-13 14:30:00 +01:00
# need to set explicitly redirect_uri instead of leaving the lib to pre-fill redirect_uri
# when served behind nginx, the redirect_uri is localhost... and not the real url
2024-05-02 16:17:10 +02:00
redirect_uri = URL + "/auth/oidc/callback"
2024-03-13 14:30:00 +01:00
SESSION_STATE_KEY = "oauth_state"
2024-05-02 16:17:10 +02:00
SESSION_NEXT_KEY = "oauth_redirect_next"
2024-03-13 14:30:00 +01:00
@auth_bp.route("/oidc/login")
def oidc_login():
if config.OIDC_CLIENT_ID is None or config.OIDC_CLIENT_SECRET is None:
return redirect(url_for("auth.login"))
next_url = sanitize_next_url(request.args.get("next"))
2024-05-02 16:17:10 +02:00
auth_url = requests.get(config.OIDC_WELL_KNOWN_URL).json()["authorization_endpoint"]
2024-03-13 14:30:00 +01:00
oidc = OAuth2Session(
config.OIDC_CLIENT_ID, scope=[OIDC_SCOPES], redirect_uri=redirect_uri
)
2024-05-02 16:17:10 +02:00
authorization_url, state = oidc.authorization_url(auth_url)
2024-03-13 14:30:00 +01:00
# State is used to prevent CSRF, keep this for later.
session[SESSION_STATE_KEY] = state
2024-05-02 16:17:10 +02:00
session[SESSION_NEXT_KEY] = next_url
2024-03-13 14:30:00 +01:00
return redirect(authorization_url)
@auth_bp.route("/oidc/callback")
def oidc_callback():
if SESSION_STATE_KEY not in session:
flash("Invalid state, please retry", "error")
return redirect(url_for("auth.login"))
if config.OIDC_CLIENT_ID is None or config.OIDC_CLIENT_SECRET is None:
return redirect(url_for("auth.login"))
# user clicks on cancel
if "error" in request.args:
flash("Please use another sign in method then", "warning")
return redirect("/")
2024-05-02 16:17:10 +02:00
oidc_configuration = requests.get(config.OIDC_WELL_KNOWN_URL).json()
user_info_url = oidc_configuration["userinfo_endpoint"]
token_url = oidc_configuration["token_endpoint"]
2024-03-13 14:30:00 +01:00
oidc = OAuth2Session(
config.OIDC_CLIENT_ID,
state=session[SESSION_STATE_KEY],
scope=[OIDC_SCOPES],
2024-05-02 16:17:10 +02:00
redirect_uri=redirect_uri,
2024-03-13 14:30:00 +01:00
)
oidc.fetch_token(
2024-05-02 16:17:10 +02:00
token_url,
2024-03-13 14:30:00 +01:00
client_secret=config.OIDC_CLIENT_SECRET,
authorization_response=request.url,
)
2024-05-02 16:17:10 +02:00
oidc_user_data = oidc.get(user_info_url)
2024-03-13 14:30:00 +01:00
if oidc_user_data.status_code != 200:
LOG.e(
f"cannot get oidc user data {oidc_user_data.status_code} {oidc_user_data.text}"
)
flash(
"Cannot get user data from OIDC, please use another way to login/sign up",
"error",
)
return redirect(url_for("auth.login"))
oidc_user_data = oidc_user_data.json()
email = oidc_user_data.get("email")
if not email:
LOG.e(f"cannot get email for OIDC user {oidc_user_data} {email}")
flash(
"Cannot get a valid email from OIDC, please another way to login/sign up",
"error",
)
return redirect(url_for("auth.login"))
email = sanitize_email(email)
user = User.get_by(email=email)
if not user and config.DISABLE_REGISTRATION:
flash(
"Sorry you cannot sign up via the OIDC provider. Please sign-up first with your email.",
"error",
)
return redirect(url_for("auth.register"))
elif not user:
user = create_user(email, oidc_user_data)
if not SocialAuth.get_by(user_id=user.id, social="oidc"):
SocialAuth.create(user_id=user.id, social="oidc")
Session.commit()
# The activation link contains the original page, for ex authorize page
2024-05-02 16:17:10 +02:00
next_url = session[SESSION_NEXT_KEY]
session[SESSION_NEXT_KEY] = None
2024-03-13 14:30:00 +01:00
return after_login(user, next_url)
def create_user(email, oidc_user_data):
new_user = User.create(
email=email,
name=oidc_user_data.get(OIDC_NAME_FIELD),
password="",
activated=True,
)
LOG.i(f"Created new user for login request from OIDC. New user {new_user.id}")
Session.commit()
send_welcome_email(new_user)
return new_user