app-MAIL-temp/app/api/views/auth.py

367 lines
11 KiB
Python
Raw Normal View History

2020-06-24 10:30:01 +02:00
import random
2020-02-28 11:29:33 +01:00
import facebook
import google.oauth2.credentials
import googleapiclient.discovery
2020-05-24 16:14:48 +02:00
from flask import jsonify, request, g
2020-07-04 10:39:38 +02:00
from flask_login import login_user
2020-01-20 14:36:39 +01:00
from itsdangerous import Signer
2020-02-27 16:57:24 +01:00
from app import email_utils
2020-02-28 11:29:33 +01:00
from app.api.base import api_bp
2020-03-05 11:00:58 +01:00
from app.config import FLASK_SECRET, DISABLE_REGISTRATION
2020-03-18 18:43:04 +01:00
from app.dashboard.views.setting import send_reset_password_email
from app.email_utils import (
email_can_be_used_as_mailbox,
personal_email_already_used,
send_email,
render,
)
from app.utils import sanitize_email
2020-05-24 16:14:48 +02:00
from app.extensions import db, limiter
2020-01-20 14:36:39 +01:00
from app.log import LOG
from app.models import User, ApiKey, SocialAuth, AccountActivation
2020-01-20 14:36:39 +01:00
@api_bp.route("/auth/login", methods=["POST"])
2020-05-24 16:14:48 +02:00
@limiter.limit(
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
)
2020-01-20 14:36:39 +01:00
def auth_login():
"""
Authenticate user
Input:
email
password
device: to create an ApiKey associated with this device
Output:
200 and user info containing:
{
name: "John Wick",
mfa_enabled: true,
mfa_key: "a long string",
api_key: "a long string"
}
"""
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
email = sanitize_email(data.get("email"))
2020-01-20 14:36:39 +01:00
password = data.get("password")
device = data.get("device")
user = User.filter_by(email=email).first()
if not user or not user.check_password(password):
2020-05-24 16:14:48 +02:00
# Trigger rate limiter
g.deduct_limit = True
2020-01-20 14:36:39 +01:00
return jsonify(error="Email or password incorrect"), 400
2020-10-04 12:48:49 +02:00
elif user.disabled:
return jsonify(error="Account disabled"), 400
2020-01-20 14:36:39 +01:00
elif not user.activated:
return jsonify(error="Account not activated"), 400
2020-05-07 21:54:36 +02:00
elif user.fido_enabled():
# allow user who has TOTP enabled to continue using the mobile app
if not user.enable_otp:
return jsonify(error="Currently we don't support FIDO on mobile yet"), 403
2020-01-20 14:36:39 +01:00
2020-02-27 16:26:29 +01:00
return jsonify(**auth_payload(user, device)), 200
@api_bp.route("/auth/register", methods=["POST"])
def auth_register():
"""
User signs up - will need to activate their account with an activation code.
Input:
email
password
Output:
200: user needs to confirm their account
"""
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
email = sanitize_email(data.get("email"))
password = data.get("password")
if DISABLE_REGISTRATION:
return jsonify(error="registration is closed"), 400
if not email_can_be_used_as_mailbox(email) or personal_email_already_used(email):
return jsonify(error=f"cannot use {email} as personal inbox"), 400
if not password or len(password) < 8:
return jsonify(error="password too short"), 400
LOG.debug("create user %s", email)
user = User.create(email=email, name="", password=password)
db.session.flush()
# create activation code
code = "".join([str(random.randint(0, 9)) for _ in range(6)])
AccountActivation.create(user_id=user.id, code=code)
db.session.commit()
send_email(
email,
2020-12-06 22:11:22 +01:00
"Just one more step to join SimpleLogin",
render("transactional/code-activation.txt", code=code),
render("transactional/code-activation.html", code=code),
)
return jsonify(msg="User needs to confirm their account"), 200
@api_bp.route("/auth/activate", methods=["POST"])
2020-05-24 16:14:48 +02:00
@limiter.limit(
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
)
def auth_activate():
"""
User enters the activation code to confirm their account.
Input:
email
code
Output:
200: user account is now activated, user can login now
400: wrong email, code
410: wrong code too many times
"""
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
email = sanitize_email(data.get("email"))
code = data.get("code")
user = User.get_by(email=email)
# do not use a different message to avoid exposing existing email
if not user or user.activated:
2020-05-24 16:14:48 +02:00
# Trigger rate limiter
g.deduct_limit = True
return jsonify(error="Wrong email or code"), 400
account_activation = AccountActivation.get_by(user_id=user.id)
if not account_activation:
2020-05-24 16:14:48 +02:00
# Trigger rate limiter
g.deduct_limit = True
return jsonify(error="Wrong email or code"), 400
if account_activation.code != code:
# decrement nb tries
account_activation.tries -= 1
db.session.commit()
2020-05-24 16:14:48 +02:00
# Trigger rate limiter
g.deduct_limit = True
if account_activation.tries == 0:
AccountActivation.delete(account_activation.id)
db.session.commit()
return jsonify(error="Too many wrong tries"), 410
return jsonify(error="Wrong email or code"), 400
LOG.debug("activate user %s", user)
user.activated = True
AccountActivation.delete(account_activation.id)
db.session.commit()
return jsonify(msg="Account is activated, user can login now"), 200
@api_bp.route("/auth/reactivate", methods=["POST"])
def auth_reactivate():
"""
User asks for another activation code
Input:
email
Output:
200: user is going to receive an email for activate their account
"""
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
email = sanitize_email(data.get("email"))
user = User.get_by(email=email)
# do not use a different message to avoid exposing existing email
if not user or user.activated:
return jsonify(error="Something went wrong"), 400
account_activation = AccountActivation.get_by(user_id=user.id)
if account_activation:
AccountActivation.delete(account_activation.id)
db.session.commit()
# create activation code
code = "".join([str(random.randint(0, 9)) for _ in range(6)])
AccountActivation.create(user_id=user.id, code=code)
db.session.commit()
send_email(
email,
2020-12-06 22:11:22 +01:00
"Just one more step to join SimpleLogin",
render("transactional/code-activation.txt", code=code),
render("transactional/code-activation.html", code=code),
)
return jsonify(msg="User needs to confirm their account"), 200
2020-02-27 16:57:24 +01:00
@api_bp.route("/auth/facebook", methods=["POST"])
def auth_facebook():
"""
Authenticate user with Facebook
Input:
facebook_token: facebook access token
device: to create an ApiKey associated with this device
Output:
200 and user info containing:
{
name: "John Wick",
mfa_enabled: true,
mfa_key: "a long string",
api_key: "a long string"
}
"""
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
facebook_token = data.get("facebook_token")
device = data.get("device")
graph = facebook.GraphAPI(access_token=facebook_token)
user_info = graph.get_object("me", fields="email,name")
email = sanitize_email(user_info.get("email"))
2020-02-27 16:57:24 +01:00
user = User.get_by(email=email)
if not user:
if DISABLE_REGISTRATION:
return jsonify(error="registration is closed"), 400
if not email_can_be_used_as_mailbox(email) or personal_email_already_used(
2020-05-15 23:18:42 +02:00
email
):
2020-02-27 16:57:24 +01:00
return jsonify(error=f"cannot use {email} as personal inbox"), 400
LOG.d("create facebook user with %s", user_info)
user = User.create(email=email, name=user_info["name"], activated=True)
2020-02-27 16:57:24 +01:00
db.session.commit()
email_utils.send_welcome_email(user)
if not SocialAuth.get_by(user_id=user.id, social="facebook"):
SocialAuth.create(user_id=user.id, social="facebook")
db.session.commit()
return jsonify(**auth_payload(user, device)), 200
2020-02-28 11:29:33 +01:00
@api_bp.route("/auth/google", methods=["POST"])
def auth_google():
"""
Authenticate user with Facebook
Input:
google_token: Google access token
device: to create an ApiKey associated with this device
Output:
200 and user info containing:
{
name: "John Wick",
mfa_enabled: true,
mfa_key: "a long string",
api_key: "a long string"
}
"""
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
google_token = data.get("google_token")
device = data.get("device")
cred = google.oauth2.credentials.Credentials(token=google_token)
build = googleapiclient.discovery.build("oauth2", "v2", credentials=cred)
user_info = build.userinfo().get().execute()
email = sanitize_email(user_info.get("email"))
2020-02-28 11:29:33 +01:00
user = User.get_by(email=email)
if not user:
if DISABLE_REGISTRATION:
return jsonify(error="registration is closed"), 400
if not email_can_be_used_as_mailbox(email) or personal_email_already_used(
2020-05-15 23:18:42 +02:00
email
):
2020-02-28 11:29:33 +01:00
return jsonify(error=f"cannot use {email} as personal inbox"), 400
LOG.d("create Google user with %s", user_info)
user = User.create(email=email, name="", activated=True)
2020-02-28 11:29:33 +01:00
db.session.commit()
email_utils.send_welcome_email(user)
if not SocialAuth.get_by(user_id=user.id, social="google"):
SocialAuth.create(user_id=user.id, social="google")
db.session.commit()
return jsonify(**auth_payload(user, device)), 200
2020-02-27 16:26:29 +01:00
def auth_payload(user, device) -> dict:
ret = {"name": user.name or "", "email": user.email, "mfa_enabled": user.enable_otp}
2020-01-20 14:36:39 +01:00
# do not give api_key, user can only obtain api_key after OTP verification
if user.enable_otp:
s = Signer(FLASK_SECRET)
ret["mfa_key"] = s.sign(str(user.id))
ret["api_key"] = None
2020-01-20 14:36:39 +01:00
else:
2020-02-05 12:05:26 +01:00
api_key = ApiKey.get_by(user_id=user.id, name=device)
if not api_key:
LOG.d("create new api key for %s and %s", user, device)
api_key = ApiKey.create(user.id, device)
db.session.commit()
ret["mfa_key"] = None
2020-01-20 14:36:39 +01:00
ret["api_key"] = api_key.code
2020-07-04 10:39:38 +02:00
# so user is automatically logged in on the web
login_user(user)
2020-02-27 16:26:29 +01:00
return ret
2020-03-18 18:43:04 +01:00
@api_bp.route("/auth/forgot_password", methods=["POST"])
def forgot_password():
"""
User forgot password
Input:
email
Output:
200 and a reset password email is sent to user
400 if email not exist
"""
data = request.get_json()
2020-03-18 21:55:50 +01:00
if not data or not data.get("email"):
return jsonify(error="request body must contain email"), 400
2020-03-18 18:43:04 +01:00
email = sanitize_email(data.get("email"))
2020-03-18 18:43:04 +01:00
user = User.get_by(email=email)
2020-03-18 21:55:50 +01:00
if user:
send_reset_password_email(user)
2020-03-18 18:43:04 +01:00
2020-03-18 21:55:50 +01:00
return jsonify(ok=True)