mirror of
https://github.com/simple-login/app.git
synced 2024-11-18 01:40:38 +01:00
71 lines
2.4 KiB
Python
71 lines
2.4 KiB
Python
import arrow
|
|
from flask import request, render_template, redirect, url_for, flash, session, g
|
|
from flask_login import login_user
|
|
from flask_wtf import FlaskForm
|
|
from wtforms import StringField, validators
|
|
|
|
from app.auth.base import auth_bp
|
|
from app.config import MFA_USER_ID
|
|
from app.extensions import db, limiter
|
|
from app.log import LOG
|
|
from app.models import User, RecoveryCode
|
|
|
|
|
|
class RecoveryForm(FlaskForm):
|
|
code = StringField("Code", validators=[validators.DataRequired()])
|
|
|
|
|
|
@auth_bp.route("/recovery", methods=["GET", "POST"])
|
|
@limiter.limit(
|
|
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
|
|
)
|
|
def recovery_route():
|
|
# passed from login page
|
|
user_id = session.get(MFA_USER_ID)
|
|
|
|
# user access this page directly without passing by login page
|
|
if not user_id:
|
|
flash("Unknown error, redirect back to main page", "warning")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
user = User.get(user_id)
|
|
|
|
if not user.two_factor_authentication_enabled():
|
|
flash("Only user with MFA enabled should go to this page", "warning")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
recovery_form = RecoveryForm()
|
|
next_url = request.args.get("next")
|
|
|
|
if recovery_form.validate_on_submit():
|
|
code = recovery_form.code.data
|
|
recovery_code = RecoveryCode.get_by(user_id=user.id, code=code)
|
|
|
|
if recovery_code:
|
|
if recovery_code.used:
|
|
# Trigger rate limiter
|
|
g.deduct_limit = True
|
|
flash("Code already used", "error")
|
|
else:
|
|
del session[MFA_USER_ID]
|
|
|
|
login_user(user)
|
|
flash(f"Welcome back {user.name}!", "success")
|
|
|
|
recovery_code.used = True
|
|
recovery_code.used_at = arrow.now()
|
|
db.session.commit()
|
|
|
|
# User comes to login page from another page
|
|
if next_url:
|
|
LOG.debug("redirect user to %s", next_url)
|
|
return redirect(next_url)
|
|
else:
|
|
LOG.debug("redirect user to dashboard")
|
|
return redirect(url_for("dashboard.index"))
|
|
else:
|
|
# Trigger rate limiter
|
|
g.deduct_limit = True
|
|
flash("Incorrect code", "error")
|
|
|
|
return render_template("auth/recovery.html", recovery_form=recovery_form)
|