diff --git a/app/api/__init__.py b/app/api/__init__.py index 0f7932f0..0f736331 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -13,4 +13,5 @@ from .views import ( setting, export, phone, + sudo, ) diff --git a/app/api/base.py b/app/api/base.py index 70151d8e..7bf177c0 100644 --- a/app/api/base.py +++ b/app/api/base.py @@ -1,4 +1,5 @@ from functools import wraps +from typing import Tuple, Optional import arrow from flask import Blueprint, request, jsonify, g @@ -9,30 +10,58 @@ from app.models import ApiKey api_bp = Blueprint(name="api", import_name=__name__, url_prefix="/api") +SUDO_MODE_MINUTES_VALID = 5 + + +def authorize_request() -> Optional[Tuple[str, int]]: + api_code = request.headers.get("Authentication") + api_key = ApiKey.get_by(code=api_code) + + if not api_key: + if current_user.is_authenticated: + g.user = current_user + else: + return jsonify(error="Wrong api key"), 401 + else: + # Update api key stats + api_key.last_used = arrow.now() + api_key.times += 1 + Session.commit() + + g.user = api_key.user + + if g.user.disabled: + return jsonify(error="Disabled account"), 403 + + g.api_key = api_key + return None + + +def check_sudo_mode_is_active(api_key: ApiKey) -> bool: + return api_key.sudo_mode_at and g.api_key.sudo_mode_at >= arrow.now().shift( + minutes=-SUDO_MODE_MINUTES_VALID + ) + def require_api_auth(f): @wraps(f) def decorated(*args, **kwargs): - api_code = request.headers.get("Authentication") - api_key = ApiKey.get_by(code=api_code) - - if not api_key: - # if user is authenticated, the request is authorized - if current_user.is_authenticated: - g.user = current_user - else: - return jsonify(error="Wrong api key"), 401 - else: - # Update api key stats - api_key.last_used = arrow.now() - api_key.times += 1 - Session.commit() - - g.user = api_key.user - - if g.user.disabled: - return jsonify(error="Disabled account"), 403 - + error_return = authorize_request() + if error_return: + return error_return + return f(*args, **kwargs) + + return decorated + + +def require_api_sudo(f): + @wraps(f) + def decorated(*args, **kwargs): + error_return = authorize_request() + if error_return: + return error_return + if not check_sudo_mode_is_active(g.api_key): + return jsonify(error="Need sudo"), 440 return f(*args, **kwargs) return decorated diff --git a/app/api/views/sudo.py b/app/api/views/sudo.py new file mode 100644 index 00000000..04806f82 --- /dev/null +++ b/app/api/views/sudo.py @@ -0,0 +1,27 @@ +from flask import jsonify, g, request +from sqlalchemy_utils.types.arrow import arrow + +from app.api.base import api_bp, require_api_auth +from app.db import Session + + +@api_bp.route("/sudo", methods=["PATCH"]) +@require_api_auth +def enter_sudo(): + """ + Enter sudo mode + + Input + - password: user password to validate request to enter sudo mode + """ + user = g.user + data = request.get_json() or {} + if "password" not in data: + return jsonify(error="Invalid request"), 403 + if not user.check_password(data["password"]): + return jsonify(error="Invalid request"), 403 + + g.api_key.sudo_mode_at = arrow.now() + Session.commit() + + return jsonify(ok=True) diff --git a/app/models.py b/app/models.py index fcf1d5bd..6156756b 100644 --- a/app/models.py +++ b/app/models.py @@ -2061,6 +2061,7 @@ class ApiKey(Base, ModelMixin): name = sa.Column(sa.String(128), nullable=True) last_used = sa.Column(ArrowType, default=None) times = sa.Column(sa.Integer, default=0, nullable=False) + sudo_mode_at = sa.Column(ArrowType, default=None) user = orm.relationship(User) diff --git a/docs/api.md b/docs/api.md index e1b0f2d3..79a5fc33 100644 --- a/docs/api.md +++ b/docs/api.md @@ -10,6 +10,7 @@ - [POST /api/auth/reactivate](##post-apiauthreactivate): Request a new activation code. - [POST /api/auth/forgot_password](#post-apiauthforgot_password): Request reset password link. - [GET /api/user_info](#get-apiuser_info): Get user's information. +- [PATCH /api/sudo](#patch-apiuser_sudo): Enable sudo mode. - [PATCH /api/user_info](#patch-apiuser_info): Update user's information. - [POST /api/api_key](#post-apiapi_key): Create a new API key. - [GET /api/logout](#get-apilogout): Log out. @@ -222,6 +223,18 @@ Input: Output: same as GET /api/user_info +#### PATCH /api/sudo + +Enable sudo mode + +Input: + +- password: User password to validate the user presence and enter sudo mode + +Output: + +- ok: True if sudo mode has been enabled + #### POST /api/api_key Create a new API Key diff --git a/migrations/versions/2022_062218_d1fb679f7eec_add_sudo_expiration_for_apikeys.py b/migrations/versions/2022_062218_d1fb679f7eec_add_sudo_expiration_for_apikeys.py new file mode 100644 index 00000000..6e1cda33 --- /dev/null +++ b/migrations/versions/2022_062218_d1fb679f7eec_add_sudo_expiration_for_apikeys.py @@ -0,0 +1,29 @@ +"""Add sudo expiration for ApiKeys + +Revision ID: d1fb679f7eec +Revises: 673a074e4215 +Create Date: 2022-06-22 18:24:38.983498 + +""" +import sqlalchemy_utils +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'd1fb679f7eec' +down_revision = '673a074e4215' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('api_key', sa.Column('sudo_mode_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('api_key', 'sudo_mode_at') + # ### end Alembic commands ### diff --git a/tests/api/test_sudo.py b/tests/api/test_sudo.py new file mode 100644 index 00000000..9437e885 --- /dev/null +++ b/tests/api/test_sudo.py @@ -0,0 +1,34 @@ +from random import random + +from flask import url_for + +from app.api.base import check_sudo_mode_is_active +from app.db import Session +from app.models import ApiKey +from tests.api.utils import get_new_user_and_api_key + + +def test_enter_sudo_mode(flask_client): + user, api_key = get_new_user_and_api_key() + password = f"passwd-{random()}" + user.set_password(password) + Session.commit() + + r = flask_client.patch( + url_for("api.enter_sudo"), + headers={"Authentication": api_key.code}, + json={"password": "invalid"}, + ) + + assert r.status_code == 403 + assert not check_sudo_mode_is_active(ApiKey.get(id=api_key.id)) + + r = flask_client.patch( + url_for("api.enter_sudo"), + headers={"Authentication": api_key.code}, + json={"password": password}, + ) + + assert r.status_code == 200 + assert r.json == {"ok": True} + assert check_sudo_mode_is_active(ApiKey.get(id=api_key.id))