Allow to set sudo mode for api requests (#1108)

* Allow to set sudo mode for api requests

* Rebase migration on top of master

* PR comments

* Added missing migration

* Removed unused import

* Apply suggestions from code review

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
Adrià Casajús 2022-06-23 14:26:36 +02:00 committed by GitHub
parent 9cc9d38dce
commit de31e6d072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 154 additions and 20 deletions

View File

@ -13,4 +13,5 @@ from .views import (
setting,
export,
phone,
sudo,
)

View File

@ -1,4 +1,5 @@
from functools import wraps
from typing import Tuple, Optional
import arrow
from flask import Blueprint, request, jsonify, g
@ -9,30 +10,58 @@ from app.models import ApiKey
api_bp = Blueprint(name="api", import_name=__name__, url_prefix="/api")
SUDO_MODE_MINUTES_VALID = 5
def authorize_request() -> Optional[Tuple[str, int]]:
api_code = request.headers.get("Authentication")
api_key = ApiKey.get_by(code=api_code)
if not api_key:
if current_user.is_authenticated:
g.user = current_user
else:
return jsonify(error="Wrong api key"), 401
else:
# Update api key stats
api_key.last_used = arrow.now()
api_key.times += 1
Session.commit()
g.user = api_key.user
if g.user.disabled:
return jsonify(error="Disabled account"), 403
g.api_key = api_key
return None
def check_sudo_mode_is_active(api_key: ApiKey) -> bool:
return api_key.sudo_mode_at and g.api_key.sudo_mode_at >= arrow.now().shift(
minutes=-SUDO_MODE_MINUTES_VALID
)
def require_api_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
api_code = request.headers.get("Authentication")
api_key = ApiKey.get_by(code=api_code)
if not api_key:
# if user is authenticated, the request is authorized
if current_user.is_authenticated:
g.user = current_user
else:
return jsonify(error="Wrong api key"), 401
else:
# Update api key stats
api_key.last_used = arrow.now()
api_key.times += 1
Session.commit()
g.user = api_key.user
if g.user.disabled:
return jsonify(error="Disabled account"), 403
error_return = authorize_request()
if error_return:
return error_return
return f(*args, **kwargs)
return decorated
def require_api_sudo(f):
@wraps(f)
def decorated(*args, **kwargs):
error_return = authorize_request()
if error_return:
return error_return
if not check_sudo_mode_is_active(g.api_key):
return jsonify(error="Need sudo"), 440
return f(*args, **kwargs)
return decorated

27
app/api/views/sudo.py Normal file
View File

@ -0,0 +1,27 @@
from flask import jsonify, g, request
from sqlalchemy_utils.types.arrow import arrow
from app.api.base import api_bp, require_api_auth
from app.db import Session
@api_bp.route("/sudo", methods=["PATCH"])
@require_api_auth
def enter_sudo():
"""
Enter sudo mode
Input
- password: user password to validate request to enter sudo mode
"""
user = g.user
data = request.get_json() or {}
if "password" not in data:
return jsonify(error="Invalid request"), 403
if not user.check_password(data["password"]):
return jsonify(error="Invalid request"), 403
g.api_key.sudo_mode_at = arrow.now()
Session.commit()
return jsonify(ok=True)

View File

@ -2061,6 +2061,7 @@ class ApiKey(Base, ModelMixin):
name = sa.Column(sa.String(128), nullable=True)
last_used = sa.Column(ArrowType, default=None)
times = sa.Column(sa.Integer, default=0, nullable=False)
sudo_mode_at = sa.Column(ArrowType, default=None)
user = orm.relationship(User)

View File

@ -10,6 +10,7 @@
- [POST /api/auth/reactivate](##post-apiauthreactivate): Request a new activation code.
- [POST /api/auth/forgot_password](#post-apiauthforgot_password): Request reset password link.
- [GET /api/user_info](#get-apiuser_info): Get user's information.
- [PATCH /api/sudo](#patch-apiuser_sudo): Enable sudo mode.
- [PATCH /api/user_info](#patch-apiuser_info): Update user's information.
- [POST /api/api_key](#post-apiapi_key): Create a new API key.
- [GET /api/logout](#get-apilogout): Log out.
@ -222,6 +223,18 @@ Input:
Output: same as GET /api/user_info
#### PATCH /api/sudo
Enable sudo mode
Input:
- password: User password to validate the user presence and enter sudo mode
Output:
- ok: True if sudo mode has been enabled
#### POST /api/api_key
Create a new API Key

View File

@ -0,0 +1,29 @@
"""Add sudo expiration for ApiKeys
Revision ID: d1fb679f7eec
Revises: 673a074e4215
Create Date: 2022-06-22 18:24:38.983498
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'd1fb679f7eec'
down_revision = '673a074e4215'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('api_key', sa.Column('sudo_mode_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('api_key', 'sudo_mode_at')
# ### end Alembic commands ###

34
tests/api/test_sudo.py Normal file
View File

@ -0,0 +1,34 @@
from random import random
from flask import url_for
from app.api.base import check_sudo_mode_is_active
from app.db import Session
from app.models import ApiKey
from tests.api.utils import get_new_user_and_api_key
def test_enter_sudo_mode(flask_client):
user, api_key = get_new_user_and_api_key()
password = f"passwd-{random()}"
user.set_password(password)
Session.commit()
r = flask_client.patch(
url_for("api.enter_sudo"),
headers={"Authentication": api_key.code},
json={"password": "invalid"},
)
assert r.status_code == 403
assert not check_sudo_mode_is_active(ApiKey.get(id=api_key.id))
r = flask_client.patch(
url_for("api.enter_sudo"),
headers={"Authentication": api_key.code},
json={"password": password},
)
assert r.status_code == 200
assert r.json == {"ok": True}
assert check_sudo_mode_is_active(ApiKey.get(id=api_key.id))