mirror of
https://github.com/simple-login/app.git
synced 2024-11-10 21:27:10 +01:00
Enforce user match on mailbox verification and improve logging (#2172)
This commit is contained in:
parent
d11c2686b9
commit
6faec9ba4d
@ -114,13 +114,14 @@ def mailbox_route():
|
|||||||
|
|
||||||
|
|
||||||
@dashboard_bp.route("/mailbox_verify")
|
@dashboard_bp.route("/mailbox_verify")
|
||||||
|
@login_required
|
||||||
def mailbox_verify():
|
def mailbox_verify():
|
||||||
mailbox_id = request.args.get("mailbox_id")
|
mailbox_id = request.args.get("mailbox_id")
|
||||||
code = request.args.get("code")
|
code = request.args.get("code")
|
||||||
if not code:
|
if not code:
|
||||||
# Old way
|
# Old way
|
||||||
return verify_with_signed_secret(mailbox_id)
|
return verify_with_signed_secret(mailbox_id)
|
||||||
mailbox = mailbox_utils.verify_mailbox_code(mailbox_id, code)
|
mailbox = mailbox_utils.verify_mailbox_code(current_user, mailbox_id, code)
|
||||||
LOG.d("Mailbox %s is verified", mailbox)
|
LOG.d("Mailbox %s is verified", mailbox)
|
||||||
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)
|
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)
|
||||||
|
|
||||||
|
@ -22,6 +22,9 @@ class MailboxError(Exception):
|
|||||||
self.msg = msg
|
self.msg = msg
|
||||||
|
|
||||||
|
|
||||||
|
MAX_ACTIVATION_TRIES = 3
|
||||||
|
|
||||||
|
|
||||||
def create_mailbox(
|
def create_mailbox(
|
||||||
user: User,
|
user: User,
|
||||||
email: str,
|
email: str,
|
||||||
@ -29,15 +32,20 @@ def create_mailbox(
|
|||||||
send_verification_link: bool = True,
|
send_verification_link: bool = True,
|
||||||
) -> Mailbox:
|
) -> Mailbox:
|
||||||
if not user.is_premium():
|
if not user.is_premium():
|
||||||
|
LOG.i(f"User {user} has created mailbox with {email} but is not premium")
|
||||||
raise MailboxError("Only premium plan can add additional mailbox")
|
raise MailboxError("Only premium plan can add additional mailbox")
|
||||||
if not is_valid_email(email):
|
if not is_valid_email(email):
|
||||||
|
LOG.i(f"User {user} has created mailbox with {email} but is not valid email")
|
||||||
raise MailboxError("Invalid email")
|
raise MailboxError("Invalid email")
|
||||||
elif mailbox_already_used(email, user):
|
elif mailbox_already_used(email, user):
|
||||||
|
LOG.i(f"User {user} has created mailbox with {email} but email is already used")
|
||||||
raise MailboxError("Email already used")
|
raise MailboxError("Email already used")
|
||||||
elif not email_can_be_used_as_mailbox(email):
|
elif not email_can_be_used_as_mailbox(email):
|
||||||
|
LOG.i(f"User {user} has created mailbox with {email} but email is invalid")
|
||||||
raise MailboxError("Invalid email")
|
raise MailboxError("Invalid email")
|
||||||
new_mailbox = Mailbox.create(email=email, user_id=user.id, commit=True)
|
new_mailbox = Mailbox.create(email=email, user_id=user.id, commit=True)
|
||||||
|
|
||||||
|
LOG.i(f"User {user} has created mailbox with {email}")
|
||||||
send_verification_email(
|
send_verification_email(
|
||||||
user,
|
user,
|
||||||
new_mailbox,
|
new_mailbox,
|
||||||
@ -53,28 +61,39 @@ def delete_mailbox(
|
|||||||
mailbox = Mailbox.get(mailbox_id)
|
mailbox = Mailbox.get(mailbox_id)
|
||||||
|
|
||||||
if not mailbox or mailbox.user_id != user.id:
|
if not mailbox or mailbox.user_id != user.id:
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} has tried to delete another user's mailbox with {mailbox_id}"
|
||||||
|
)
|
||||||
raise MailboxError("Invalid mailbox")
|
raise MailboxError("Invalid mailbox")
|
||||||
|
|
||||||
if mailbox.id == user.default_mailbox_id:
|
if mailbox.id == user.default_mailbox_id:
|
||||||
|
LOG.i(f"User {user} has tried to delete the default mailbox")
|
||||||
raise MailboxError("Cannot delete your default mailbox")
|
raise MailboxError("Cannot delete your default mailbox")
|
||||||
|
|
||||||
if transfer_mailbox_id and transfer_mailbox_id > 0:
|
if transfer_mailbox_id and transfer_mailbox_id > 0:
|
||||||
transfer_mailbox = Mailbox.get(transfer_mailbox_id)
|
transfer_mailbox = Mailbox.get(transfer_mailbox_id)
|
||||||
|
|
||||||
if not transfer_mailbox or transfer_mailbox.user_id != user.id:
|
if not transfer_mailbox or transfer_mailbox.user_id != user.id:
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} has tried to transfer to a mailbox owned by another user"
|
||||||
|
)
|
||||||
raise MailboxError("You must transfer the aliases to a mailbox you own")
|
raise MailboxError("You must transfer the aliases to a mailbox you own")
|
||||||
|
|
||||||
if transfer_mailbox.id == mailbox.id:
|
if transfer_mailbox.id == mailbox.id:
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} has tried to transfer to the same mailbox he is deleting"
|
||||||
|
)
|
||||||
raise MailboxError(
|
raise MailboxError(
|
||||||
"You can not transfer the aliases to the mailbox you want to delete"
|
"You can not transfer the aliases to the mailbox you want to delete"
|
||||||
)
|
)
|
||||||
|
|
||||||
if not transfer_mailbox.verified:
|
if not transfer_mailbox.verified:
|
||||||
|
LOG.i(f"User {user} has tried to transfer to a non verified mailbox")
|
||||||
MailboxError("Your new mailbox is not verified")
|
MailboxError("Your new mailbox is not verified")
|
||||||
|
|
||||||
# Schedule delete account job
|
# Schedule delete account job
|
||||||
LOG.w(
|
LOG.i(
|
||||||
f"schedule delete mailbox job for {mailbox.id} with transfer to mailbox {transfer_mailbox_id}"
|
f"User {user} has scheduled delete mailbox job for {mailbox.id} with transfer to mailbox {transfer_mailbox_id}"
|
||||||
)
|
)
|
||||||
Job.create(
|
Job.create(
|
||||||
name=JOB_DELETE_MAILBOX,
|
name=JOB_DELETE_MAILBOX,
|
||||||
@ -101,6 +120,7 @@ def set_default_mailbox(user: User, mailbox_id: int) -> Mailbox:
|
|||||||
|
|
||||||
if mailbox.id == user.default_mailbox_id:
|
if mailbox.id == user.default_mailbox_id:
|
||||||
return mailbox
|
return mailbox
|
||||||
|
LOG.i(f"User {user} has set mailbox {mailbox} as his default one")
|
||||||
|
|
||||||
user.default_mailbox_id = mailbox.id
|
user.default_mailbox_id = mailbox.id
|
||||||
Session.commit()
|
Session.commit()
|
||||||
@ -114,26 +134,53 @@ def clear_activation_codes_for_mailbox(mailbox: Mailbox):
|
|||||||
Session.commit()
|
Session.commit()
|
||||||
|
|
||||||
|
|
||||||
def verify_mailbox_code(mailbox_id: int, code: str) -> Mailbox:
|
def verify_mailbox_code(user: User, mailbox_id: int, code: str) -> Mailbox:
|
||||||
mailbox = Mailbox.get(mailbox_id)
|
mailbox = Mailbox.get(mailbox_id)
|
||||||
if not mailbox:
|
if not mailbox:
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} failed to verify mailbox {mailbox_id} because it does not exist"
|
||||||
|
)
|
||||||
raise MailboxError("Invalid mailbox")
|
raise MailboxError("Invalid mailbox")
|
||||||
if mailbox.verified:
|
if mailbox.verified:
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} failed to verify mailbox {mailbox_id} because it's already verified"
|
||||||
|
)
|
||||||
clear_activation_codes_for_mailbox(mailbox)
|
clear_activation_codes_for_mailbox(mailbox)
|
||||||
return mailbox
|
return mailbox
|
||||||
activation = MailboxActivation.get_by(mailbox_id=mailbox_id).first()
|
if mailbox.user_id != user.id:
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} failed to verify mailbox {mailbox_id} because it's owned by another user"
|
||||||
|
)
|
||||||
|
raise MailboxError("Invalid mailbox")
|
||||||
|
|
||||||
|
activation = (
|
||||||
|
MailboxActivation.filter(MailboxActivation.mailbox_id == mailbox_id)
|
||||||
|
.order_by(MailboxActivation.created_at.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
if not activation:
|
if not activation:
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} failed to verify mailbox {mailbox_id} because there is no activation"
|
||||||
|
)
|
||||||
raise MailboxError("Invalid code")
|
raise MailboxError("Invalid code")
|
||||||
if activation.tries > 3:
|
if activation.tries >= MAX_ACTIVATION_TRIES:
|
||||||
|
LOG.i(f"User {user} failed to verify mailbox {mailbox_id} more than 3 times")
|
||||||
clear_activation_codes_for_mailbox(mailbox)
|
clear_activation_codes_for_mailbox(mailbox)
|
||||||
raise MailboxError("Invalid activation code. Please request another code.")
|
raise MailboxError("Invalid activation code. Please request another code.")
|
||||||
if activation.created_at < arrow.now().shift(minutes=-15):
|
if activation.created_at < arrow.now().shift(minutes=-15):
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} failed to verify mailbox {mailbox_id} because code is too old"
|
||||||
|
)
|
||||||
clear_activation_codes_for_mailbox(mailbox)
|
clear_activation_codes_for_mailbox(mailbox)
|
||||||
raise MailboxError("Invalid activation code. Please request another code.")
|
raise MailboxError("Invalid activation code. Please request another code.")
|
||||||
if code != activation.code:
|
if code != activation.code:
|
||||||
|
LOG.i(
|
||||||
|
f"User {user} failed to verify mailbox {mailbox_id} because code does not match"
|
||||||
|
)
|
||||||
activation.tries = activation.tries + 1
|
activation.tries = activation.tries + 1
|
||||||
Session.commit()
|
Session.commit()
|
||||||
raise MailboxError("Invalid activation code")
|
raise MailboxError("Invalid activation code")
|
||||||
|
LOG.i(f"User {user} has verified mailbox {mailbox_id}")
|
||||||
mailbox.verified = True
|
mailbox.verified = True
|
||||||
clear_activation_codes_for_mailbox(mailbox)
|
clear_activation_codes_for_mailbox(mailbox)
|
||||||
return mailbox
|
return mailbox
|
||||||
@ -153,6 +200,9 @@ def send_verification_email(
|
|||||||
tries=0,
|
tries=0,
|
||||||
)
|
)
|
||||||
Session.commit()
|
Session.commit()
|
||||||
|
LOG.i(
|
||||||
|
f"Sending mailbox verification email to {mailbox.email} with digit={use_digit_code} link={send_link}"
|
||||||
|
)
|
||||||
|
|
||||||
if send_link:
|
if send_link:
|
||||||
verification_url = (
|
verification_url = (
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
import arrow
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from app import mailbox_utils, config
|
from app import mailbox_utils, config
|
||||||
@ -211,3 +212,68 @@ def test_cannot_default_other_user_mailbox():
|
|||||||
)
|
)
|
||||||
with pytest.raises(mailbox_utils.MailboxError):
|
with pytest.raises(mailbox_utils.MailboxError):
|
||||||
mailbox_utils.set_default_mailbox(user, mailbox.id)
|
mailbox_utils.set_default_mailbox(user, mailbox.id)
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_non_existing_mailbox():
|
||||||
|
with pytest.raises(mailbox_utils.MailboxError):
|
||||||
|
mailbox_utils.verify_mailbox_code(user, 999999999, "9999999")
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_already_verified_mailbox():
|
||||||
|
mailbox = Mailbox.create(
|
||||||
|
user_id=user.id, email=random_email(), verified=True, commit=True
|
||||||
|
)
|
||||||
|
mbox = mailbox_utils.verify_mailbox_code(user, mailbox.id, "9999999")
|
||||||
|
assert mbox.id == mailbox.id
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_other_users_mailbox():
|
||||||
|
other = create_new_user()
|
||||||
|
mailbox = Mailbox.create(
|
||||||
|
user_id=other.id, email=random_email(), verified=False, commit=True
|
||||||
|
)
|
||||||
|
with pytest.raises(mailbox_utils.MailboxError):
|
||||||
|
mailbox_utils.verify_mailbox_code(user, mailbox.id, "9999999")
|
||||||
|
|
||||||
|
|
||||||
|
@mail_sender.store_emails_test_decorator
|
||||||
|
def test_verify_fail():
|
||||||
|
mailbox = mailbox_utils.create_mailbox(user, random_email())
|
||||||
|
for i in range(mailbox_utils.MAX_ACTIVATION_TRIES - 1):
|
||||||
|
try:
|
||||||
|
mailbox_utils.verify_mailbox_code(user, mailbox.id, "9999999")
|
||||||
|
assert False, f"test {i}"
|
||||||
|
except mailbox_utils.MailboxError:
|
||||||
|
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
|
||||||
|
assert activation.tries == i + 1
|
||||||
|
|
||||||
|
|
||||||
|
@mail_sender.store_emails_test_decorator
|
||||||
|
def test_verify_too_may():
|
||||||
|
mailbox = mailbox_utils.create_mailbox(user, random_email())
|
||||||
|
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
|
||||||
|
activation.tries = mailbox_utils.MAX_ACTIVATION_TRIES
|
||||||
|
Session.commit()
|
||||||
|
with pytest.raises(mailbox_utils.MailboxError):
|
||||||
|
mailbox_utils.verify_mailbox_code(user, mailbox.id, activation.code)
|
||||||
|
|
||||||
|
|
||||||
|
@mail_sender.store_emails_test_decorator
|
||||||
|
def test_verify_too_old_code():
|
||||||
|
mailbox = mailbox_utils.create_mailbox(user, random_email())
|
||||||
|
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
|
||||||
|
activation.created_at = arrow.now().shift(minutes=-30)
|
||||||
|
Session.commit()
|
||||||
|
with pytest.raises(mailbox_utils.MailboxError):
|
||||||
|
mailbox_utils.verify_mailbox_code(user, mailbox.id, activation.code)
|
||||||
|
|
||||||
|
|
||||||
|
@mail_sender.store_emails_test_decorator
|
||||||
|
def test_verify_ok():
|
||||||
|
mailbox = mailbox_utils.create_mailbox(user, random_email())
|
||||||
|
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
|
||||||
|
mailbox_utils.verify_mailbox_code(user, mailbox.id, activation.code)
|
||||||
|
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
|
||||||
|
assert activation is None
|
||||||
|
mailbox = Mailbox.get(id=mailbox.id)
|
||||||
|
assert mailbox.verified
|
||||||
|
Loading…
Reference in New Issue
Block a user