feat: audit log and admin panel improvements (#2274)

* feat: add deleted alias audit logs

* feat: emit event when user is created or password reset

* feat: improve admin panel
This commit is contained in:
Carlos Quintana 2024-10-18 15:32:26 +02:00 committed by GitHub
parent 2d67bf3689
commit 67b3820f61
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 105 additions and 23 deletions

View file

@ -115,7 +115,7 @@ class SLAdminIndexView(AdminIndexView):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect(url_for("auth.login", next=request.url))
return redirect("/admin/user")
return redirect("/admin/email_search")
class UserAdmin(SLModelView):
@ -743,13 +743,17 @@ class EmailSearchResult:
mailbox: List[Mailbox] = []
mailbox_count: int = 0
deleted_alias: Optional[DeletedAlias] = None
deleted_custom_alias: Optional[DomainDeletedAlias] = None
deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None
domain_deleted_alias: Optional[DomainDeletedAlias] = None
domain_deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None
user: Optional[User] = None
user_audit_log: Optional[List[UserAuditLog]] = None
query: str
@staticmethod
def from_email(email: str) -> EmailSearchResult:
output = EmailSearchResult()
output.query = email
alias = Alias.get_by(email=email)
if alias:
output.alias = alias
@ -768,6 +772,15 @@ class EmailSearchResult:
.all()
)
output.no_match = False
user_audit_log = (
UserAuditLog.filter_by(user_email=email)
.order_by(UserAuditLog.created_at.desc())
.all()
)
if user_audit_log:
output.user_audit_log = user_audit_log
output.no_match = False
mailboxes = (
Mailbox.filter_by(email=email).order_by(Mailbox.id.desc()).limit(10).all()
)
@ -778,10 +791,20 @@ class EmailSearchResult:
deleted_alias = DeletedAlias.get_by(email=email)
if deleted_alias:
output.deleted_alias = deleted_alias
output.deleted_alias_audit_log = (
AliasAuditLog.filter_by(alias_email=deleted_alias.email)
.order_by(AliasAuditLog.created_at.desc())
.all()
)
output.no_match = False
domain_deleted_alias = DomainDeletedAlias.get_by(email=email)
if domain_deleted_alias:
output.domain_deleted_alias = domain_deleted_alias
output.domain_deleted_alias_audit_log = (
AliasAuditLog.filter_by(alias_email=domain_deleted_alias.email)
.order_by(AliasAuditLog.created_at.desc())
.all()
)
output.no_match = False
return output

View file

@ -23,6 +23,7 @@ from app.events.auth_event import LoginEvent, RegisterEvent
from app.extensions import limiter
from app.log import LOG
from app.models import User, ApiKey, SocialAuth, AccountActivation
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
from app.utils import sanitize_email, canonicalize_email
@ -187,6 +188,11 @@ def auth_activate():
LOG.d("activate user %s", user)
user.activated = True
emit_user_audit_log(
user=user,
action=UserAuditLogAction.ActivateUser,
message=f"User has been activated: {user.email}",
)
AccountActivation.delete(account_activation.id)
Session.commit()

View file

@ -7,6 +7,7 @@ from app.db import Session
from app.extensions import limiter
from app.log import LOG
from app.models import ActivationCode
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
from app.utils import sanitize_next_url
@ -47,6 +48,11 @@ def activate():
user = activation_code.user
user.activated = True
emit_user_audit_log(
user=user,
action=UserAuditLogAction.ActivateUser,
message=f"User has been activated: {user.email}",
)
login_user(user)
# activation code is to be used only once

View file

@ -9,6 +9,7 @@ from app.auth.views.login_utils import after_login
from app.db import Session
from app.extensions import limiter
from app.models import ResetPasswordCode
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
class ResetPasswordForm(FlaskForm):
@ -59,6 +60,11 @@ def reset_password():
# this can be served to activate user too
user.activated = True
emit_user_audit_log(
user=user,
action=UserAuditLogAction.ResetPassword,
message="User has reset their password",
)
# remove all reset password codes
ResetPasswordCode.filter_by(user_id=user.id).delete()

View file

@ -616,6 +616,15 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if "alternative_id" not in kwargs:
user.alternative_id = str(uuid.uuid4())
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
trail = ". Created from partner" if from_partner else ""
emit_user_audit_log(
user=user,
action=UserAuditLogAction.CreateUser,
message=f"Created user {email}{trail}",
)
# If the user is created from partner, do not notify
# nor give a trial
if from_partner:

View file

@ -4,6 +4,10 @@ from app.models import User, UserAuditLog
class UserAuditLogAction(Enum):
CreateUser = "create_user"
ActivateUser = "activate_user"
ResetPassword = "reset_password"
Upgrade = "upgrade"
SubscriptionExtended = "subscription_extended"
SubscriptionCancelled = "subscription_cancelled"

View file

@ -442,10 +442,10 @@ def init_admin(app):
admin = Admin(name="SimpleLogin", template_mode="bootstrap4")
admin.init_app(app, index_view=SLAdminIndexView())
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="email_search"))
admin.add_view(UserAdmin(User, Session))
admin.add_view(AliasAdmin(Alias, Session))
admin.add_view(MailboxAdmin(Mailbox, Session))
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="email_search"))
admin.add_view(CouponAdmin(Coupon, Session))
admin.add_view(ManualSubscriptionAdmin(ManualSubscription, Session))
admin.add_view(CustomDomainAdmin(CustomDomain, Session))

View file

@ -239,6 +239,11 @@
{{ show_user(data.user) }}
{{ list_mailboxes("Mailboxes for user", helper.mailbox_count(data.user) , helper.mailbox_list(data.user) ) }}
{{ list_alias(helper.alias_count(data.user) ,helper.alias_list(data.user)) }}
</div>
{% endif %}
{% if data.user_audit_log %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">Audit log entries for user {{ data.query }}</h3>
{{ list_user_audit_log(data.user_audit_log) }}
</div>
{% endif %}
@ -260,6 +265,7 @@
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">Found DeletedAlias {{ data.deleted_alias.email }}</h3>
{{ show_deleted_alias(data.deleted_alias) }}
{{ list_alias_audit_log(data.deleted_alias_audit_log) }}
</div>
{% endif %}
{% if data.domain_deleted_alias %}
@ -267,6 +273,7 @@
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">Found DomainDeletedAlias {{ data.domain_deleted_alias.email }}</h3>
{{ show_domain_deleted_alias(data.domain_deleted_alias) }}
{{ list_alias_audit_log(data.domain_deleted_alias_audit_log) }}
</div>
{% endif %}
{% endblock %}

View file

@ -94,10 +94,12 @@ def test_login_case_from_partner():
)
assert res.user.activated is True
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all()
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=res.user.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
def test_login_case_from_partner_with_uppercase_email():
@ -133,7 +135,10 @@ def test_login_case_from_web():
assert 0 == (res.user.flags & User.FLAG_CREATED_FROM_PARTNER)
assert res.user.activated is True
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all()
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=res.user.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
@ -218,7 +223,10 @@ def test_link_account_with_proton_account_same_address(flask_client):
)
assert partner_user.partner_id == get_proton_partner().id
assert partner_user.external_user_id == partner_user_id
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all()
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=res.user.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
@ -246,7 +254,10 @@ def test_link_account_with_proton_account_different_address(flask_client):
assert partner_user.partner_id == get_proton_partner().id
assert partner_user.external_user_id == partner_user_id
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(user_id=res.user.id).all()
audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=res.user.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(audit_logs) == 1
assert audit_logs[0].user_id == res.user.id
assert audit_logs[0].action == UserAuditLogAction.LinkAccount.value
@ -304,19 +315,19 @@ def test_link_account_with_proton_account_same_address_but_linked_to_other_user(
# Ensure audit logs for sl_user_1 show the link action
sl_user_1_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_1.id
user_id=sl_user_1.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(sl_user_1_audit_logs) == 1
assert sl_user_1_audit_logs[0].user_id == sl_user_1.id
assert sl_user_1_audit_logs[0].action == UserAuditLogAction.LinkAccount.value
# Ensure audit logs for sl_user_2 show the unlink action
sl_user_2_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_2.id
user_id=sl_user_2.id,
action=UserAuditLogAction.UnlinkAccount.value,
).all()
assert len(sl_user_2_audit_logs) == 1
assert sl_user_2_audit_logs[0].user_id == sl_user_2.id
assert sl_user_2_audit_logs[0].action == UserAuditLogAction.UnlinkAccount.value
def test_link_account_with_proton_account_different_address_and_linked_to_other_user(
@ -356,19 +367,19 @@ def test_link_account_with_proton_account_different_address_and_linked_to_other_
# Ensure audit logs for sl_user_1 show the link action
sl_user_1_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_1.id
user_id=sl_user_1.id,
action=UserAuditLogAction.LinkAccount.value,
).all()
assert len(sl_user_1_audit_logs) == 1
assert sl_user_1_audit_logs[0].user_id == sl_user_1.id
assert sl_user_1_audit_logs[0].action == UserAuditLogAction.LinkAccount.value
# Ensure audit logs for sl_user_2 show the unlink action
sl_user_2_audit_logs: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=sl_user_2.id
user_id=sl_user_2.id,
action=UserAuditLogAction.UnlinkAccount.value,
).all()
assert len(sl_user_2_audit_logs) == 1
assert sl_user_2_audit_logs[0].user_id == sl_user_2.id
assert sl_user_2_audit_logs[0].action == UserAuditLogAction.UnlinkAccount.value
def test_cannot_create_instance_of_base_strategy():

View file

@ -351,7 +351,9 @@ def test_perform_mailbox_email_change_valid_id_not_new_email():
res = mailbox_utils.perform_mailbox_email_change(mb.id)
assert res.error == MailboxEmailChangeError.InvalidId
assert res.message_category == "error"
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).count()
audit_log_entries = UserAuditLog.filter_by(
user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
).count()
assert audit_log_entries == 0
@ -374,7 +376,9 @@ def test_perform_mailbox_email_change_valid_id_email_already_used():
res = mailbox_utils.perform_mailbox_email_change(mb_to_change.id)
assert res.error == MailboxEmailChangeError.EmailAlreadyUsed
assert res.message_category == "error"
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).count()
audit_log_entries = UserAuditLog.filter_by(
user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
).count()
assert audit_log_entries == 0
@ -398,6 +402,7 @@ def test_perform_mailbox_email_change_success():
assert db_mailbox.email == new_email
assert db_mailbox.new_email is None
audit_log_entries = UserAuditLog.filter_by(user_id=user.id).all()
assert len(audit_log_entries) == 1
assert audit_log_entries[0].action == UserAuditLogAction.UpdateMailbox.value
audit_log_entries = UserAuditLog.filter_by(
user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
).count()
assert audit_log_entries == 1

View file

@ -27,7 +27,9 @@ def test_emit_alias_audit_log_for_random_data():
commit=True,
)
logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(user_id=user.id).all()
logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=user.id, action=action.value
).all()
assert len(logs_for_user) == 1
assert logs_for_user[0].user_id == user.id
assert logs_for_user[0].user_email == user.email
@ -41,7 +43,10 @@ def test_emit_audit_log_on_mailbox_creation():
user=user, email=random_email(), verified=True
)
logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(user_id=user.id).all()
logs_for_user: List[UserAuditLog] = UserAuditLog.filter_by(
user_id=user.id,
action=UserAuditLogAction.CreateMailbox.value,
).all()
assert len(logs_for_user) == 1
assert logs_for_user[0].user_id == user.id
assert logs_for_user[0].user_email == user.email