Add an audit log for the admin panel

This commit is contained in:
Adrià Casajús 2022-03-10 16:13:31 +01:00
parent 7da06ba424
commit 1d15af53b7
No known key found for this signature in database
GPG Key ID: F0033226A5AFC9B9
3 changed files with 165 additions and 3 deletions

View File

@ -1,12 +1,22 @@
import arrow
import sqlalchemy
from app import models
from flask import redirect, url_for, request, flash
from flask_admin import expose, AdminIndexView
from flask_admin.actions import action
from flask_admin.contrib import sqla
from flask_login import current_user, login_user
from app.log import LOG
from app.db import Session
from app.models import User, ManualSubscription, Fido, Subscription, AppleSubscription
from app.models import (
User,
ManualSubscription,
Fido,
Subscription,
AppleSubscription,
AdminAuditLog,
)
class SLModelView(sqla.ModelView):
@ -25,6 +35,37 @@ class SLModelView(sqla.ModelView):
# redirect to login page if user doesn't have access
return redirect(url_for("auth.login", next=request.url))
def on_model_change(self, form, model, is_created):
if is_created:
action = AdminAuditLog.ACTION_CREATE_OBJECT
else:
action = AdminAuditLog.ACTION_UPDATE_OBJECT
changes = {}
for attr in sqlalchemy.inspect(model).attrs:
if attr.history.has_changes() and attr.key not in (
"created_at",
"updated_at",
):
value = attr.value
if issubclass(type(value), models.Base):
value = value.id
changes[attr.key] = value
AdminAuditLog.create(
admin_user_id=current_user.id,
model=model.__class__.__name__,
model_id=model.id,
action=action,
data=changes,
)
def on_model_delete(self, model):
AdminAuditLog.create(
admin_user_id=current_user.id,
model=model.__class__.__name__,
model_id=model.id,
action=AdminAuditLog.ACTION_DELETE_OBJECT,
)
class SLAdminIndexView(AdminIndexView):
@expose("/")
@ -105,6 +146,7 @@ class UserAdmin(SLModelView):
user.trial_end = arrow.now().shift(weeks=1)
flash(f"Extend trial for {user} to {user.trial_end}", "success")
AdminAuditLog.extend_trial_1w(current_user.id, user.id, user.trial_end)
Session.commit()
@ -115,14 +157,19 @@ class UserAdmin(SLModelView):
)
def disable_otp_fido(self, ids):
for user in User.filter(User.id.in_(ids)):
user_had_otp = user.enable_otp
if user.enable_otp:
user.enable_otp = False
flash(f"Disable OTP for {user}", "info")
user_had_fido = user.fido_uuid is not None
if user.fido_uuid:
Fido.filter_by(uuid=user.fido_uuid).delete()
user.fido_uuid = None
flash(f"Disable FIDO for {user}", "info")
AdminAuditLog.disable_otp_fido(
current_user.id, user.id, user_had_otp, user_had_fido
)
Session.commit()
@ -137,6 +184,8 @@ class UserAdmin(SLModelView):
return
for user in User.filter(User.id.in_(ids)):
AdminAuditLog.logged_as_user(current_user.id, user.id)
Session.commit()
login_user(user)
flash(f"Login as user {user}", "success")
return redirect("/")
@ -164,6 +213,7 @@ def manual_upgrade(way: str, ids: [int], is_giveaway: bool):
)
continue
AdminAuditLog.create_manual_upgrade(current_user.id, way, user.id, is_giveaway)
manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=user.id)
if manual_sub:
# renew existing subscription
@ -171,7 +221,6 @@ def manual_upgrade(way: str, ids: [int], is_giveaway: bool):
manual_sub.end_at = manual_sub.end_at.shift(years=1)
else:
manual_sub.end_at = arrow.now().shift(years=1, days=1)
Session.commit()
flash(f"Subscription extended to {manual_sub.end_at.humanize()}", "success")
continue
@ -180,10 +229,10 @@ def manual_upgrade(way: str, ids: [int], is_giveaway: bool):
end_at=arrow.now().shift(years=1, days=1),
comment=way,
is_giveaway=is_giveaway,
commit=True,
)
flash(f"New {way} manual subscription for {user} is created", "success")
Session.commit()
class EmailLogAdmin(SLModelView):

View File

@ -2873,4 +2873,77 @@ class PhoneMessage(Base, ModelMixin):
number = orm.relationship(PhoneNumber)
class AdminAuditLog(Base):
__tablename__ = "admin_audit_log"
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
created_at = sa.Column(ArrowType, default=arrow.utcnow, nullable=False)
admin_user_id = sa.Column(sa.ForeignKey(User.id), nullable=False)
action = sa.Column(sa.Integer, nullable=False)
model = sa.Column(sa.Text, nullable=False)
model_id = sa.Column(sa.Integer, nullable=False)
data = sa.Column(sa.JSON, nullable=False)
ACTION_CREATE_OBJECT = 1
ACTION_UPDATE_OBJECT = 2
ACTION_DELETE_OBJECT = 3
ACTION_MANUAL_UPGRADE = 4
ACTION_EXTEND_TRIAL = 5
ACTION_DISABLE_2FA = 6
ACTION_LOGGED_AS_USER = 7
@classmethod
def create(cls, **kw):
r = cls(**kw)
Session.add(r)
return r
@classmethod
def create_manual_upgrade(
cls, admin_user_id: int, upgrade_type: str, user_id: int, giveaway: bool
):
cls.create(
admin_user_id=admin_user_id,
action=cls.ACTION_MANUAL_UPGRADE,
model="User",
model_id=user_id,
data={
"upgrade_type": upgrade_type,
"giveaway": giveaway,
},
)
@classmethod
def extend_trial_1w(cls, admin_user_id: int, user_id: int, trial_end: arrow.Arrow):
cls.create(
admin_user_id=admin_user_id,
action=cls.ACTION_EXTEND_TRIAL,
model="User",
model_id=user_id,
data={"trial_end": trial_end.format(arrow.FORMAT_RFC3339)},
)
@classmethod
def disable_otp_fido(
cls, admin_user_id: int, user_id: int, had_otp: bool, had_fido: bool
):
cls.create(
admin_user_id=admin_user_id,
action=cls.ACTION_DISABLE_2FA,
model="User",
model_id=user_id,
data={"had_otp": had_otp, "had_fido": had_fido},
)
@classmethod
def logged_as_user(cls, admin_user_id: int, user_id: int):
cls.create(
admin_user_id=admin_user_id,
action=cls.ACTION_LOGGED_AS_USER,
model="User",
model_id=user_id,
)
# endregion

View File

@ -0,0 +1,40 @@
"""Create admin audit log
Revision ID: b500363567e3
Revises: 9282e982bc05
Create Date: 2022-03-10 15:26:54.538717
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'b500363567e3'
down_revision = '9282e982bc05'
branch_labels = None
depends_on = None
def upgrade():
op.create
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('admin_audit_log', 'data',
existing_type=postgresql.JSONB(astext_type=sa.Text()),
nullable=False)
op.alter_column('admin_audit_log', 'model_id',
existing_type=sa.INTEGER(),
nullable=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('admin_audit_log', 'model_id',
existing_type=sa.INTEGER(),
nullable=True)
op.alter_column('admin_audit_log', 'data',
existing_type=postgresql.JSONB(astext_type=sa.Text()),
nullable=True)
# ### end Alembic commands ###