mirror of
https://github.com/simple-login/app.git
synced 2024-11-16 17:08:30 +01:00
5ee5e386e5
* Allow to create users from partner * Fix tests * Update tests/test_account_linking.py Co-authored-by: Adrià Casajús <acasajus@users.noreply.github.com> * Fix lint Co-authored-by: Adrià Casajús <acasajus@users.noreply.github.com>
123 lines
3.9 KiB
Python
123 lines
3.9 KiB
Python
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from flask import url_for
|
|
from typing import Optional
|
|
|
|
from app.db import Session
|
|
from app.errors import LinkException, ProtonPartnerNotSetUp
|
|
from app.models import User, Partner
|
|
from app.proton.proton_client import ProtonClient, ProtonUser
|
|
from app.account_linking import (
|
|
process_login_case,
|
|
process_link_case,
|
|
PartnerLinkRequest,
|
|
)
|
|
|
|
PROTON_PARTNER_NAME = "Proton"
|
|
_PROTON_PARTNER: Optional[Partner] = None
|
|
|
|
|
|
def get_proton_partner() -> Partner:
|
|
global _PROTON_PARTNER
|
|
if _PROTON_PARTNER is None:
|
|
partner = Partner.get_by(name=PROTON_PARTNER_NAME)
|
|
if partner is None:
|
|
raise ProtonPartnerNotSetUp
|
|
Session.expunge(partner)
|
|
_PROTON_PARTNER = partner
|
|
return _PROTON_PARTNER
|
|
|
|
|
|
class Action(Enum):
|
|
Login = 1
|
|
Link = 2
|
|
|
|
|
|
@dataclass
|
|
class ProtonCallbackResult:
|
|
redirect_to_login: bool
|
|
flash_message: Optional[str]
|
|
flash_category: Optional[str]
|
|
redirect: Optional[str]
|
|
user: Optional[User]
|
|
|
|
|
|
def generate_account_not_allowed_to_log_in() -> ProtonCallbackResult:
|
|
return ProtonCallbackResult(
|
|
redirect_to_login=True,
|
|
flash_message="This account is not allowed to log in with Proton. Please convert your account to a full Proton account",
|
|
flash_category="error",
|
|
redirect=None,
|
|
user=None,
|
|
)
|
|
|
|
|
|
class ProtonCallbackHandler:
|
|
def __init__(self, proton_client: ProtonClient):
|
|
self.proton_client = proton_client
|
|
|
|
def handle_login(self, partner: Partner) -> ProtonCallbackResult:
|
|
try:
|
|
user = self.__get_partner_user()
|
|
if user is None:
|
|
return generate_account_not_allowed_to_log_in()
|
|
res = process_login_case(user, partner)
|
|
return ProtonCallbackResult(
|
|
redirect_to_login=False,
|
|
flash_message=None,
|
|
flash_category=None,
|
|
redirect=None,
|
|
user=res.user,
|
|
)
|
|
except LinkException as e:
|
|
return ProtonCallbackResult(
|
|
redirect_to_login=True,
|
|
flash_message=e.message,
|
|
flash_category="error",
|
|
redirect=None,
|
|
user=None,
|
|
)
|
|
|
|
def handle_link(
|
|
self, current_user: Optional[User], partner: Partner
|
|
) -> ProtonCallbackResult:
|
|
if current_user is None:
|
|
raise Exception("Cannot link account with current_user being None")
|
|
try:
|
|
user = self.__get_partner_user()
|
|
if user is None:
|
|
return generate_account_not_allowed_to_log_in()
|
|
res = process_link_case(user, current_user, partner)
|
|
return ProtonCallbackResult(
|
|
redirect_to_login=False,
|
|
flash_message="Account successfully linked",
|
|
flash_category="success",
|
|
redirect=url_for("dashboard.setting"),
|
|
user=res.user,
|
|
)
|
|
except LinkException as e:
|
|
return ProtonCallbackResult(
|
|
redirect_to_login=False,
|
|
flash_message=e.message,
|
|
flash_category="error",
|
|
redirect=None,
|
|
user=None,
|
|
)
|
|
|
|
def __get_partner_user(self) -> Optional[PartnerLinkRequest]:
|
|
proton_user = self.__get_proton_user()
|
|
if proton_user is None:
|
|
return None
|
|
return PartnerLinkRequest(
|
|
email=proton_user.email,
|
|
external_user_id=proton_user.id,
|
|
name=proton_user.name,
|
|
plan=proton_user.plan,
|
|
from_partner=False, # The user has started this flow, so we don't mark it as created by a partner
|
|
)
|
|
|
|
def __get_proton_user(self) -> Optional[ProtonUser]:
|
|
user = self.proton_client.get_user()
|
|
if user is None:
|
|
return None
|
|
return ProtonUser(email=user.email, plan=user.plan, name=user.name, id=user.id)
|