import arrow from flask import request, session, redirect, url_for, flash from flask_login import login_user from requests_oauthlib import OAuth2Session from app import s3 from app.auth.base import auth_bp from app.config import URL, GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET from app.email_utils import notify_admin from app.extensions import db from app.log import LOG from app.models import User, File from app.utils import random_string authorization_base_url = "https://accounts.google.com/o/oauth2/v2/auth" token_url = "https://www.googleapis.com/oauth2/v4/token" scope = [ "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/userinfo.profile", "openid", ] # need to set explicitly redirect_uri instead of leaving the lib to pre-fill redirect_uri # when served behind nginx, the redirect_uri is localhost... and not the real url redirect_uri = URL + "/auth/google/callback" @auth_bp.route("/google/login") def google_login(): # to avoid flask-login displaying the login error message session.pop("_flashes", None) google = OAuth2Session(GOOGLE_CLIENT_ID, scope=scope, redirect_uri=redirect_uri) authorization_url, state = google.authorization_url(authorization_base_url) # State is used to prevent CSRF, keep this for later. session["oauth_state"] = state return redirect(authorization_url) @auth_bp.route("/google/callback") def google_callback(): google = OAuth2Session( GOOGLE_CLIENT_ID, state=session["oauth_state"], scope=scope, redirect_uri=redirect_uri, ) token = google.fetch_token( token_url, client_secret=GOOGLE_CLIENT_SECRET, authorization_response=request.url, ) # Fetch a protected resource, i.e. user profile # { # "email": "abcd@gmail.com", # "family_name": "First name", # "given_name": "Last name", # "id": "1234", # "locale": "en", # "name": "First Last", # "picture": "http://profile.jpg", # "verified_email": true # } google_user_data = google.get( "https://www.googleapis.com/oauth2/v1/userinfo" ).json() email = google_user_data["email"] user = User.get_by(email=email) picture_url = google_user_data.get("picture") if user: if picture_url and not user.profile_picture_id: LOG.d("set user profile picture to %s", picture_url) file = create_file_from_url(picture_url) user.profile_picture_id = file.id db.session.commit() login_user(user) # create user else: LOG.d("create google user with %s", google_user_data) user = User.create(email=email, name=google_user_data["name"]) # set a random password user.set_password(random_string(20)) user.activated = True if picture_url: LOG.d("set user profile picture to %s", picture_url) file = create_file_from_url(picture_url) user.profile_picture_id = file.id db.session.commit() login_user(user) flash(f"Welcome to SimpleLogin {user.name}!", "success") notify_admin( f"new user signs up {user.email}", f"{user.name} signs up at {arrow.now()}" ) # The activation link contains the original page, for ex authorize page if "next" in request.args: next_url = request.args.get("next") LOG.debug("redirect user to %s", next_url) return redirect(next_url) else: LOG.debug("redirect user to dashboard") return redirect(url_for("dashboard.index")) def create_file_from_url(url) -> File: file_path = random_string(30) file = File.create(path=file_path) s3.upload_from_url(url, file_path) db.session.flush() LOG.d("upload file %s to s3", file) return file