From 06201a517c45cf0a51e1162bd0be9968697bf362 Mon Sep 17 00:00:00 2001 From: Carlos Quintana <74399022+cquintana92@users.noreply.github.com> Date: Fri, 18 Oct 2024 09:45:18 +0200 Subject: [PATCH] fix: mailbox email change verification crash (#2272) --- app/dashboard/views/mailbox_detail.py | 44 +++++----------- app/mailbox_utils.py | 52 +++++++++++++++++++ tests/test_mailbox_utils.py | 75 ++++++++++++++++++++++++++- 3 files changed, 140 insertions(+), 31 deletions(-) diff --git a/app/dashboard/views/mailbox_detail.py b/app/dashboard/views/mailbox_detail.py index 357038d4..21ef19c7 100644 --- a/app/dashboard/views/mailbox_detail.py +++ b/app/dashboard/views/mailbox_detail.py @@ -1,5 +1,4 @@ from smtplib import SMTPRecipientsRefused -from typing import Optional from email_validator import validate_email, EmailNotValidError from flask import render_template, request, redirect, url_for, flash @@ -17,7 +16,7 @@ from app.db import Session from app.email_utils import email_can_be_used_as_mailbox from app.email_utils import mailbox_already_used, render, send_email from app.extensions import limiter -from app.log import LOG +from app.mailbox_utils import perform_mailbox_email_change, MailboxEmailChangeError from app.models import Alias, AuthorizedAddress from app.models import Mailbox from app.pgp_utils import PGPException, load_public_key_and_check @@ -318,7 +317,7 @@ def cancel_mailbox_change_route(mailbox_id): @dashboard_bp.route("/mailbox/confirm_change") -def mailbox_confirm_change_route(): +def mailbox_confirm_email_change_route(): s = TimestampSigner(MAILBOX_SECRET) signed_mailbox_id = request.args.get("mailbox_id") @@ -327,35 +326,20 @@ def mailbox_confirm_change_route(): except Exception: flash("Invalid link", "error") return redirect(url_for("dashboard.index")) - else: - mailbox: Optional[Mailbox] = Mailbox.get(mailbox_id) - # new_email can be None if user cancels change in the meantime - if mailbox and mailbox.new_email: - user = mailbox.user - if Mailbox.get_by(email=mailbox.new_email, user_id=user.id): - flash(f"{mailbox.new_email} is already used", "error") - return redirect( - url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox.id) - ) + res = perform_mailbox_email_change(mailbox_id) - emit_user_audit_log( - user=current_user, - action=UserAuditLogAction.UpdateMailbox, - message=f"Change mailbox email for mailbox {mailbox_id} (old={mailbox.email} | new={mailbox.new_email})", - ) - mailbox.email = mailbox.new_email - mailbox.new_email = None - - # mark mailbox as verified if the change request is sent from an unverified mailbox - mailbox.verified = True - Session.commit() - - LOG.d("Mailbox change %s is verified", mailbox) - flash(f"The {mailbox.email} is updated", "success") + flash(res.message, res.message_category) + if res.error: + if res.error == MailboxEmailChangeError.EmailAlreadyUsed: return redirect( - url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox.id) + url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id) ) - else: - flash("Invalid link", "error") + elif res.error == MailboxEmailChangeError.InvalidId: return redirect(url_for("dashboard.index")) + else: + raise Exception("Unhandled MailboxEmailChangeError") + else: + return redirect( + url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox_id) + ) diff --git a/app/mailbox_utils.py b/app/mailbox_utils.py index c3c05e56..42182671 100644 --- a/app/mailbox_utils.py +++ b/app/mailbox_utils.py @@ -1,6 +1,7 @@ import dataclasses import secrets import random +from enum import Enum from typing import Optional import arrow @@ -273,3 +274,54 @@ def send_verification_email( mailbox_email=mailbox.email, ), ) + + +class MailboxEmailChangeError(Enum): + InvalidId = 1 + EmailAlreadyUsed = 2 + + +@dataclasses.dataclass +class MailboxEmailChangeResult: + error: Optional[MailboxEmailChangeError] + message: str + message_category: str + + +def perform_mailbox_email_change(mailbox_id: int) -> MailboxEmailChangeResult: + mailbox: Optional[Mailbox] = Mailbox.get(mailbox_id) + + # new_email can be None if user cancels change in the meantime + if mailbox and mailbox.new_email: + user = mailbox.user + if Mailbox.get_by(email=mailbox.new_email, user_id=user.id): + return MailboxEmailChangeResult( + error=MailboxEmailChangeError.EmailAlreadyUsed, + message=f"{mailbox.new_email} is already used", + message_category="error", + ) + + emit_user_audit_log( + user=user, + action=UserAuditLogAction.UpdateMailbox, + message=f"Change mailbox email for mailbox {mailbox_id} (old={mailbox.email} | new={mailbox.new_email})", + ) + mailbox.email = mailbox.new_email + mailbox.new_email = None + + # mark mailbox as verified if the change request is sent from an unverified mailbox + mailbox.verified = True + Session.commit() + + LOG.d("Mailbox change %s is verified", mailbox) + return MailboxEmailChangeResult( + error=None, + message=f"The {mailbox.email} is updated", + message_category="success", + ) + else: + return MailboxEmailChangeResult( + error=MailboxEmailChangeError.InvalidId, + message="Invalid link", + message_category="error", + ) diff --git a/tests/test_mailbox_utils.py b/tests/test_mailbox_utils.py index b217d7a4..0db184f3 100644 --- a/tests/test_mailbox_utils.py +++ b/tests/test_mailbox_utils.py @@ -6,7 +6,9 @@ import pytest from app import mailbox_utils, config from app.db import Session from app.mail_sender import mail_sender -from app.models import Mailbox, MailboxActivation, User, Job +from app.mailbox_utils import MailboxEmailChangeError +from app.models import Mailbox, MailboxActivation, User, Job, UserAuditLog +from app.user_audit_log_utils import UserAuditLogAction from tests.utils import create_new_user, random_email @@ -328,3 +330,74 @@ def test_verify_ok(): assert activation is None mailbox = Mailbox.get(id=output.mailbox.id) assert mailbox.verified + + +# perform_mailbox_email_change +def test_perform_mailbox_email_change_invalid_id(): + res = mailbox_utils.perform_mailbox_email_change(99999) + assert res.error == MailboxEmailChangeError.InvalidId + assert res.message_category == "error" + + +def test_perform_mailbox_email_change_valid_id_not_new_email(): + user = create_new_user() + mb = Mailbox.create( + user_id=user.id, + email=random_email(), + new_email=None, + verified=True, + commit=True, + ) + res = mailbox_utils.perform_mailbox_email_change(mb.id) + assert res.error == MailboxEmailChangeError.InvalidId + assert res.message_category == "error" + audit_log_entries = UserAuditLog.filter_by(user_id=user.id).count() + assert audit_log_entries == 0 + + +def test_perform_mailbox_email_change_valid_id_email_already_used(): + user = create_new_user() + new_email = random_email() + # Create mailbox with that email + Mailbox.create( + user_id=user.id, + email=new_email, + verified=True, + ) + mb_to_change = Mailbox.create( + user_id=user.id, + email=random_email(), + new_email=new_email, + verified=True, + commit=True, + ) + res = mailbox_utils.perform_mailbox_email_change(mb_to_change.id) + assert res.error == MailboxEmailChangeError.EmailAlreadyUsed + assert res.message_category == "error" + audit_log_entries = UserAuditLog.filter_by(user_id=user.id).count() + assert audit_log_entries == 0 + + +def test_perform_mailbox_email_change_success(): + user = create_new_user() + new_email = random_email() + mb = Mailbox.create( + user_id=user.id, + email=random_email(), + new_email=new_email, + verified=True, + commit=True, + ) + res = mailbox_utils.perform_mailbox_email_change(mb.id) + assert res.error is None + assert res.message_category == "success" + + db_mailbox = Mailbox.get_by(id=mb.id) + assert db_mailbox is not None + assert db_mailbox.verified is True + assert db_mailbox.email == new_email + assert db_mailbox.new_email is None + + audit_log_entries = UserAuditLog.filter_by(user_id=user.id).all() + assert len(audit_log_entries) == 1 + assert audit_log_entries[0].action == UserAuditLogAction.UpdateMailbox.value