import enum import random import uuid from email.utils import formataddr from typing import List, Tuple, Optional import arrow from arrow import Arrow from flask import url_for from flask_login import UserMixin from sqlalchemy import text, desc, CheckConstraint from sqlalchemy.orm import deferred from sqlalchemy_utils import ArrowType from app import s3 from app.config import ( MAX_NB_EMAIL_FREE_PLAN, URL, AVATAR_URL_EXPIRATION, JOB_ONBOARDING_1, JOB_ONBOARDING_2, JOB_ONBOARDING_4, LANDING_PAGE_URL, FIRST_ALIAS_DOMAIN, DISABLE_ONBOARDING, UNSUBSCRIBER, ALIAS_RANDOM_SUFFIX_LENGTH, ) from app.errors import AliasInTrashError from app.extensions import db from app.log import LOG from app.oauth_models import Scope from app.pw_models import PasswordOracle from app.utils import ( convert_to_id, random_string, random_words, sanitize_email, random_word, ) class ModelMixin(object): id = db.Column(db.Integer, primary_key=True, autoincrement=True) created_at = db.Column(ArrowType, default=arrow.utcnow, nullable=False) updated_at = db.Column(ArrowType, default=None, onupdate=arrow.utcnow) _repr_hide = ["created_at", "updated_at"] @classmethod def query(cls): return db.session.query(cls) @classmethod def get(cls, id): return cls.query.get(id) @classmethod def get_by(cls, **kw): return cls.query.filter_by(**kw).first() @classmethod def filter_by(cls, **kw): return cls.query.filter_by(**kw) @classmethod def get_or_create(cls, **kw): r = cls.get_by(**kw) if not r: r = cls(**kw) db.session.add(r) return r @classmethod def create(cls, **kw): # whether should call db.session.commit commit = kw.pop("commit", False) r = cls(**kw) db.session.add(r) if commit: db.session.commit() return r def save(self): db.session.add(self) @classmethod def delete(cls, obj_id): cls.query.filter(cls.id == obj_id).delete() @classmethod def first(cls): return cls.query.first() def __repr__(self): values = ", ".join( "%s=%r" % (n, getattr(self, n)) for n in self.__table__.c.keys() if n not in self._repr_hide ) return "%s(%s)" % (self.__class__.__name__, values) class File(db.Model, ModelMixin): path = db.Column(db.String(128), unique=True, nullable=False) user_id = db.Column(db.ForeignKey("users.id", ondelete="cascade"), nullable=True) def get_url(self, expires_in=3600): return s3.get_url(self.path, expires_in) def __repr__(self): return f"" class EnumE(enum.Enum): @classmethod def has_value(cls, value: int) -> bool: return value in set(item.value for item in cls) @classmethod def get_name(cls, value: int) -> Optional[str]: for item in cls: if item.value == value: return item.name return None @classmethod def has_name(cls, name: str) -> bool: for item in cls: if item.name == name: return True return False @classmethod def get_value(cls, name: str) -> Optional[int]: for item in cls: if item.name == name: return item.value return None class PlanEnum(EnumE): monthly = 2 yearly = 3 # Specify the format for sender address class SenderFormatEnum(EnumE): AT = 0 # John Wick - john at wick.com VIA = 1 # john@wick.com via SimpleLogin A = 2 # John Wick - john(a)wick.com FULL = 3 # John Wick - john@wick.com class AliasGeneratorEnum(EnumE): word = 1 # aliases are generated based on random words uuid = 2 # aliases are generated based on uuid class AliasSuffixEnum(EnumE): word = 0 # Random word from dictionary file random_string = 1 # Completely random string class Hibp(db.Model, ModelMixin): __tablename__ = "hibp" name = db.Column(db.String(), nullable=False, unique=True, index=True) breached_aliases = db.relationship("Alias", secondary="alias_hibp") description = db.Column(db.Text) date = db.Column(ArrowType, nullable=True) def __repr__(self): return f"" class Fido(db.Model, ModelMixin): __tablename__ = "fido" credential_id = db.Column(db.String(), nullable=False, unique=True, index=True) uuid = db.Column( db.ForeignKey("users.fido_uuid", ondelete="cascade"), unique=False, nullable=False, ) public_key = db.Column(db.String(), nullable=False, unique=True) sign_count = db.Column(db.Integer(), nullable=False) name = db.Column(db.String(128), nullable=False, unique=False) class User(db.Model, ModelMixin, UserMixin, PasswordOracle): __tablename__ = "users" email = db.Column(db.String(256), unique=True, nullable=False) name = db.Column(db.String(128), nullable=True) is_admin = db.Column(db.Boolean, nullable=False, default=False) alias_generator = db.Column( db.Integer, nullable=False, default=AliasGeneratorEnum.word.value, server_default=str(AliasGeneratorEnum.word.value), ) notification = db.Column( db.Boolean, default=True, nullable=False, server_default="1" ) activated = db.Column(db.Boolean, default=False, nullable=False) # an account can be disabled if having harmful behavior disabled = db.Column(db.Boolean, default=False, nullable=False, server_default="0") profile_picture_id = db.Column(db.ForeignKey(File.id), nullable=True) otp_secret = db.Column(db.String(16), nullable=True) enable_otp = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) last_otp = db.Column(db.String(12), nullable=True, default=False) # Fields for WebAuthn fido_uuid = db.Column(db.String(), nullable=True, unique=True) # the default domain that's used when user creates a new random alias # default_alias_custom_domain_id XOR default_alias_public_domain_id default_alias_custom_domain_id = db.Column( db.ForeignKey("custom_domain.id", ondelete="SET NULL"), nullable=True, default=None, ) default_alias_public_domain_id = db.Column( db.ForeignKey("public_domain.id", ondelete="SET NULL"), nullable=True, default=None, ) # some users could have lifetime premium lifetime = db.Column(db.Boolean, default=False, nullable=False, server_default="0") paid_lifetime = db.Column( db.Boolean, default=False, nullable=False, server_default="0" ) # user can use all premium features until this date trial_end = db.Column( ArrowType, default=lambda: arrow.now().shift(days=7, hours=1), nullable=True ) # the mailbox used when create random alias # this field is nullable but in practice, it's always set # it cannot be set to non-nullable though # as this will create foreign key cycle between User and Mailbox default_mailbox_id = db.Column( db.ForeignKey("mailbox.id"), nullable=True, default=None ) profile_picture = db.relationship(File, foreign_keys=[profile_picture_id]) # Specify the format for sender address # John Wick - john at wick.com -> 0 # john@wick.com via SimpleLogin -> 1 # John Wick - john(a)wick.com -> 2 # John Wick - john@wick.com -> 3 sender_format = db.Column( db.Integer, default="0", nullable=False, server_default="0" ) # to know whether user has explicitly chosen a sender format as opposed to those who use the default ones. # users who haven't chosen a sender format and are using 1 or 3 format, their sender format will be set to 0 sender_format_updated_at = db.Column(ArrowType, default=None) replace_reverse_alias = db.Column( db.Boolean, default=False, nullable=False, server_default="0" ) referral_id = db.Column( db.ForeignKey("referral.id", ondelete="SET NULL"), nullable=True, default=None ) referral = db.relationship("Referral", foreign_keys=[referral_id]) # whether intro has been shown to user intro_shown = db.Column( db.Boolean, default=False, nullable=False, server_default="0" ) default_mailbox = db.relationship("Mailbox", foreign_keys=[default_mailbox_id]) # user can set a more strict max_spam score to block spams more aggressively max_spam_score = db.Column(db.Integer, nullable=True) # newsletter is sent to this address newsletter_alias_id = db.Column( db.ForeignKey("alias.id", ondelete="SET NULL"), nullable=True, default=None ) # whether to include the sender address in reverse-alias include_sender_in_reverse_alias = db.Column( db.Boolean, default=False, nullable=False, server_default="0" ) # whether to use random string or random word as suffix # Random word from dictionary file -> 0 # Completely random string -> 1 random_alias_suffix = db.Column( db.Integer, nullable=False, default=AliasSuffixEnum.random_string.value, server_default=str(AliasSuffixEnum.random_string.value), ) # always expand the alias info, i.e. without needing to press "More" expand_alias_info = db.Column( db.Boolean, default=False, nullable=False, server_default="0" ) @classmethod def create(cls, email, name="", password=None, **kwargs): user: User = super(User, cls).create(email=email, name=name, **kwargs) if password: user.set_password(password) db.session.flush() mb = Mailbox.create(user_id=user.id, email=user.email, verified=True) db.session.flush() user.default_mailbox_id = mb.id # create a first alias mail to show user how to use when they login alias = Alias.create_new( user, prefix="simplelogin-newsletter", mailbox_id=mb.id, note="This is your first alias. It's used to receive SimpleLogin communications " "like new features announcements, newsletters.", ) db.session.flush() user.newsletter_alias_id = alias.id db.session.flush() if DISABLE_ONBOARDING: LOG.d("Disable onboarding emails") return user # Schedule onboarding emails Job.create( name=JOB_ONBOARDING_1, payload={"user_id": user.id}, run_at=arrow.now().shift(days=1), ) Job.create( name=JOB_ONBOARDING_2, payload={"user_id": user.id}, run_at=arrow.now().shift(days=2), ) Job.create( name=JOB_ONBOARDING_4, payload={"user_id": user.id}, run_at=arrow.now().shift(days=3), ) db.session.flush() return user def _lifetime_or_active_subscription(self) -> bool: """True if user has lifetime licence or active subscription""" if self.lifetime: return True sub: Subscription = self.get_subscription() if sub: return True apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id) if apple_sub and apple_sub.is_valid(): return True manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id) if manual_sub and manual_sub.is_active(): return True coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by( user_id=self.id ) if coinbase_subscription and coinbase_subscription.is_active(): return True return False def is_paid(self) -> bool: """same as _lifetime_or_active_subscription but not include free manual subscription""" sub: Subscription = self.get_subscription() if sub: return True apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id) if apple_sub and apple_sub.is_valid(): return True manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id) if manual_sub and not manual_sub.is_giveaway and manual_sub.is_active(): return True coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by( user_id=self.id ) if coinbase_subscription and coinbase_subscription.is_active(): return True return False def in_trial(self): """return True if user does not have lifetime licence or an active subscription AND is in trial period""" if self._lifetime_or_active_subscription(): return False if self.trial_end and arrow.now() < self.trial_end: return True return False def should_show_upgrade_button(self): if self._lifetime_or_active_subscription(): # user who has canceled can also re-subscribe sub: Subscription = self.get_subscription() if sub and sub.cancelled: return True return False return True def can_upgrade(self) -> bool: """ The following users can upgrade: - have giveaway lifetime licence - have giveaway manual subscriptions - have a cancelled Paddle subscription - have a expired Apple subscription - have a expired Coinbase subscription """ sub: Subscription = self.get_subscription() # user who has canceled can also re-subscribe if sub and not sub.cancelled: return False apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id) if apple_sub and apple_sub.is_valid(): return False manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id) # user who has giveaway premium can decide to upgrade if manual_sub and manual_sub.is_active() and not manual_sub.is_giveaway: return False coinbase_subscription = CoinbaseSubscription.get_by(user_id=self.id) if coinbase_subscription and coinbase_subscription.is_active(): return False return True def is_premium(self) -> bool: """ user is premium if they: - have a lifetime deal or - in trial period or - active subscription """ if self._lifetime_or_active_subscription(): return True if self.trial_end and arrow.now() < self.trial_end: return True return False @property def upgrade_channel(self) -> str: if self.lifetime: return "Lifetime" sub: Subscription = self.get_subscription() if sub: if sub.cancelled: return f"Cancelled Paddle Subscription {sub.subscription_id}" else: return f"Active Paddle Subscription {sub.subscription_id}" apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id) if apple_sub and apple_sub.is_valid(): return "Apple Subscription" manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id) if manual_sub and manual_sub.is_active(): mode = "Giveaway" if manual_sub.is_giveaway else "Paid" return f"Manual Subscription {manual_sub.comment} {mode}" coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by( user_id=self.id ) if coinbase_subscription and coinbase_subscription.is_active(): return "Coinbase Subscription" if self.trial_end and arrow.now() < self.trial_end: return "In Trial" return "N/A" @property def subscription_cancelled(self) -> bool: sub: Subscription = self.get_subscription() if sub and sub.cancelled: return True apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id) if apple_sub and not apple_sub.is_valid(): return True manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id) if manual_sub and not manual_sub.is_active(): return True coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by( user_id=self.id ) if coinbase_subscription and not coinbase_subscription.is_active(): return True return False @property def premium_end(self) -> str: if self.lifetime: return "Forever" sub: Subscription = self.get_subscription() if sub: return str(sub.next_bill_date) apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id) if apple_sub and apple_sub.is_valid(): return apple_sub.expires_date.humanize() manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id) if manual_sub and manual_sub.is_active(): return manual_sub.end_at.humanize() coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by( user_id=self.id ) if coinbase_subscription and coinbase_subscription.is_active(): return coinbase_subscription.end_at.humanize() return "N/A" def can_create_new_alias(self) -> bool: """ Whether user can create a new alias. User can't create a new alias if - has more than 15 aliases in the free plan, *even in the free trial* """ if self._lifetime_or_active_subscription(): return True else: return Alias.filter_by(user_id=self.id).count() < MAX_NB_EMAIL_FREE_PLAN def profile_picture_url(self): if self.profile_picture_id: return self.profile_picture.get_url() else: return url_for("static", filename="default-avatar.png") def suggested_emails(self, website_name) -> (str, [str]): """return suggested email and other email choices """ website_name = convert_to_id(website_name) all_aliases = [ ge.email for ge in Alias.filter_by(user_id=self.id, enabled=True) ] if self.can_create_new_alias(): suggested_alias = Alias.create_new(self, prefix=website_name).email else: # pick an email from the list of gen emails suggested_alias = random.choice(all_aliases) return ( suggested_alias, list(set(all_aliases).difference({suggested_alias})), ) def suggested_names(self) -> (str, [str]): """return suggested name and other name choices """ other_name = convert_to_id(self.name) return self.name, [other_name, "Anonymous", "whoami"] def get_name_initial(self) -> str: if not self.name: return "" names = self.name.split(" ") return "".join([n[0].upper() for n in names if n]) def get_subscription(self) -> Optional["Subscription"]: """return *active* Paddle subscription Return None if the subscription is already expired TODO: support user unsubscribe and re-subscribe """ sub = Subscription.get_by(user_id=self.id) if sub: # sub is active until the next billing_date + 1 if sub.next_bill_date >= arrow.now().shift(days=-1).date(): return sub # past subscription, user is considered not having a subscription = free plan else: return None else: return sub def verified_custom_domains(self) -> List["CustomDomain"]: return CustomDomain.query.filter_by(user_id=self.id, verified=True).all() def mailboxes(self) -> List["Mailbox"]: """list of mailbox that user own""" mailboxes = [] for mailbox in Mailbox.query.filter_by(user_id=self.id, verified=True): mailboxes.append(mailbox) return mailboxes def nb_directory(self): return Directory.query.filter_by(user_id=self.id).count() def has_custom_domain(self): return CustomDomain.filter_by(user_id=self.id, verified=True).count() > 0 def custom_domains(self): return CustomDomain.filter_by(user_id=self.id, verified=True).all() def available_domains_for_random_alias(self) -> List[Tuple[bool, str]]: """Return available domains for user to create random aliases Each result record contains: - whether the domain belongs to SimpleLogin - the domain """ res = [] for domain in self.available_sl_domains(): res.append((True, domain)) for custom_domain in self.verified_custom_domains(): res.append((False, custom_domain.domain)) return res def default_random_alias_domain(self) -> str: """return the domain used for the random alias""" if self.default_alias_custom_domain_id: custom_domain = CustomDomain.get(self.default_alias_custom_domain_id) # sanity check if ( not custom_domain or not custom_domain.verified or custom_domain.user_id != self.id ): LOG.warning("Problem with %s default random alias domain", self) return FIRST_ALIAS_DOMAIN return custom_domain.domain if self.default_alias_public_domain_id: sl_domain = SLDomain.get(self.default_alias_public_domain_id) # sanity check if not sl_domain: LOG.exception("Problem with %s public random alias domain", self) return FIRST_ALIAS_DOMAIN if sl_domain.premium_only and not self.is_premium(): LOG.warning( "%s is not premium and cannot use %s. Reset default random alias domain setting", self, sl_domain, ) self.default_alias_custom_domain_id = None self.default_alias_public_domain_id = None db.session.commit() return FIRST_ALIAS_DOMAIN return sl_domain.domain return FIRST_ALIAS_DOMAIN def fido_enabled(self) -> bool: if self.fido_uuid is not None: return True return False def two_factor_authentication_enabled(self) -> bool: return self.enable_otp or self.fido_enabled() def get_communication_email(self) -> (Optional[str], str, bool): """ Return - the email that user uses to receive email communication. None if user unsubscribes from newsletter - the unsubscribe URL - whether the unsubscribe method is via sending email (mailto:) or Http POST """ if self.notification and self.activated: if self.newsletter_alias_id: alias = Alias.get(self.newsletter_alias_id) if alias.enabled: unsubscribe_link, via_email = alias.unsubscribe_link() return alias.email, unsubscribe_link, via_email # alias disabled -> user doesn't want to receive newsletter else: return None, None, False else: # do not handle http POST unsubscribe if UNSUBSCRIBER: # use * as suffix instead of = as for alias unsubscribe return self.email, f"mailto:{UNSUBSCRIBER}?subject={self.id}*", True return None, None, False def available_sl_domains(self) -> [str]: """ Return all SimpleLogin domains that user can use when creating a new alias, including: - SimpleLogin public domains, available for all users (ALIAS_DOMAIN) - SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN) """ return [sl_domain.domain for sl_domain in self.get_sl_domains()] def get_sl_domains(self) -> List["SLDomain"]: if self.is_premium(): query = SLDomain.query else: query = SLDomain.filter_by(premium_only=False) return query.all() def available_alias_domains(self) -> [str]: """return all domains that user can use when creating a new alias, including: - SimpleLogin public domains, available for all users (ALIAS_DOMAIN) - SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN) - Verified custom domains """ domains = self.available_sl_domains() for custom_domain in self.verified_custom_domains(): domains.append(custom_domain.domain) # can have duplicate where a "root" user has a domain that's also listed in SL domains return list(set(domains)) def should_show_app_page(self) -> bool: """whether to show the app page""" return ( # when user has used the "Sign in with SL" button before ClientUser.query.filter(ClientUser.user_id == self.id).count() # or when user has created an app + Client.query.filter(Client.user_id == self.id).count() > 0 ) def get_random_alias_suffix(self): """Get random suffix for an alias based on user's preference. Returns: str: the random suffix generated """ if self.random_alias_suffix == AliasSuffixEnum.random_string.value: return random_string(ALIAS_RANDOM_SUFFIX_LENGTH, include_digits=True) return random_word() def __repr__(self): return f"" def _expiration_1h(): return arrow.now().shift(hours=1) def _expiration_12h(): return arrow.now().shift(hours=12) def _expiration_5m(): return arrow.now().shift(minutes=5) def _expiration_7d(): return arrow.now().shift(days=7) class ActivationCode(db.Model, ModelMixin): """For activate user account""" user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) code = db.Column(db.String(128), unique=True, nullable=False) user = db.relationship(User) expired = db.Column(ArrowType, nullable=False, default=_expiration_1h) def is_expired(self): return self.expired < arrow.now() class ResetPasswordCode(db.Model, ModelMixin): """For resetting password""" user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) code = db.Column(db.String(128), unique=True, nullable=False) user = db.relationship(User) expired = db.Column(ArrowType, nullable=False, default=_expiration_1h) def is_expired(self): return self.expired < arrow.now() class SocialAuth(db.Model, ModelMixin): """Store how user authenticates with social login""" user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) # name of the social login used, could be facebook, google or github social = db.Column(db.String(128), nullable=False) __table_args__ = (db.UniqueConstraint("user_id", "social", name="uq_social_auth"),) # <<< OAUTH models >>> def generate_oauth_client_id(client_name) -> str: oauth_client_id = convert_to_id(client_name) + "-" + random_string() # check that the client does not exist yet if not Client.get_by(oauth_client_id=oauth_client_id): LOG.debug("generate oauth_client_id %s", oauth_client_id) return oauth_client_id # Rerun the function LOG.warning( "client_id %s already exists, generate a new client_id", oauth_client_id ) return generate_oauth_client_id(client_name) class MfaBrowser(db.Model, ModelMixin): user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) token = db.Column(db.String(64), default=False, unique=True, nullable=False) expires = db.Column(ArrowType, default=False, nullable=False) user = db.relationship(User) @classmethod def create_new(cls, user, token_length=64) -> "MfaBrowser": found = False while not found: token = random_string(token_length) if not cls.get_by(token=token): found = True return MfaBrowser.create( user_id=user.id, token=token, expires=arrow.now().shift(days=30), ) @classmethod def delete(cls, token): cls.query.filter(cls.token == token).delete() db.session.commit() @classmethod def delete_expired(cls): cls.query.filter(cls.expires < arrow.now()).delete() db.session.commit() def is_expired(self): return self.expires < arrow.now() def reset_expire(self): self.expires = arrow.now().shift(days=30) class Client(db.Model, ModelMixin): oauth_client_id = db.Column(db.String(128), unique=True, nullable=False) oauth_client_secret = db.Column(db.String(128), nullable=False) name = db.Column(db.String(128), nullable=False) home_url = db.Column(db.String(1024)) # user who created this client user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) icon_id = db.Column(db.ForeignKey(File.id), nullable=True) # an app needs to be approved by SimpleLogin team approved = db.Column(db.Boolean, nullable=False, default=False, server_default="0") description = db.Column(db.Text, nullable=True) icon = db.relationship(File) user = db.relationship(User) def nb_user(self): return ClientUser.filter_by(client_id=self.id).count() def get_scopes(self) -> [Scope]: # todo: client can choose which scopes they want to have access return [Scope.NAME, Scope.EMAIL, Scope.AVATAR_URL] @classmethod def create_new(cls, name, user_id) -> "Client": # generate a client-id oauth_client_id = generate_oauth_client_id(name) oauth_client_secret = random_string(40) client = Client.create( name=name, oauth_client_id=oauth_client_id, oauth_client_secret=oauth_client_secret, user_id=user_id, ) return client def get_icon_url(self): if self.icon_id: return self.icon.get_url() else: return URL + "/static/default-icon.svg" def last_user_login(self) -> "ClientUser": client_user = ( ClientUser.query.filter(ClientUser.client_id == self.id) .order_by(ClientUser.updated_at) .first() ) if client_user: return client_user return None class RedirectUri(db.Model, ModelMixin): """Valid redirect uris for a client""" client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False) uri = db.Column(db.String(1024), nullable=False) client = db.relationship(Client, backref="redirect_uris") class AuthorizationCode(db.Model, ModelMixin): code = db.Column(db.String(128), unique=True, nullable=False) client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False) user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) scope = db.Column(db.String(128)) redirect_uri = db.Column(db.String(1024)) # what is the input response_type, e.g. "code", "code,id_token", ... response_type = db.Column(db.String(128)) nonce = db.Column(db.Text, nullable=True, default=None, server_default=text("NULL")) user = db.relationship(User, lazy=False) client = db.relationship(Client, lazy=False) expired = db.Column(ArrowType, nullable=False, default=_expiration_5m) def is_expired(self): return self.expired < arrow.now() class OauthToken(db.Model, ModelMixin): access_token = db.Column(db.String(128), unique=True) client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False) user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) scope = db.Column(db.String(128)) redirect_uri = db.Column(db.String(1024)) # what is the input response_type, e.g. "token", "token,id_token", ... response_type = db.Column(db.String(128)) user = db.relationship(User) client = db.relationship(Client) expired = db.Column(ArrowType, nullable=False, default=_expiration_1h) def is_expired(self): return self.expired < arrow.now() def generate_email( scheme: int = AliasGeneratorEnum.word.value, in_hex: bool = False, alias_domain=FIRST_ALIAS_DOMAIN, ) -> str: """generate an email address that does not exist before :param alias_domain: the domain used to generate the alias. :param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated :type in_hex: bool, if the generate scheme is uuid, is hex favorable? """ if scheme == AliasGeneratorEnum.uuid.value: name = uuid.uuid4().hex if in_hex else uuid.uuid4().__str__() random_email = name + "@" + alias_domain else: random_email = random_words() + "@" + alias_domain random_email = random_email.lower().strip() # check that the client does not exist yet if not Alias.get_by(email=random_email) and not DeletedAlias.get_by( email=random_email ): LOG.debug("generate email %s", random_email) return random_email # Rerun the function LOG.warning("email %s already exists, generate a new email", random_email) return generate_email(scheme=scheme, in_hex=in_hex) class Alias(db.Model, ModelMixin): user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, index=True ) email = db.Column(db.String(128), unique=True, nullable=False) # the name to use when user replies/sends from alias name = db.Column(db.String(128), nullable=True, default=None) enabled = db.Column(db.Boolean(), default=True, nullable=False) custom_domain_id = db.Column( db.ForeignKey("custom_domain.id", ondelete="cascade"), nullable=True ) custom_domain = db.relationship("CustomDomain", foreign_keys=[custom_domain_id]) # To know whether an alias is created "on the fly", i.e. via the custom domain catch-all feature automatic_creation = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) # to know whether an alias belongs to a directory directory_id = db.Column( db.ForeignKey("directory.id", ondelete="cascade"), nullable=True ) note = db.Column(db.Text, default=None, nullable=True) # an alias can be owned by another mailbox mailbox_id = db.Column( db.ForeignKey("mailbox.id", ondelete="cascade"), nullable=False, index=True ) # prefix _ to avoid this object being used accidentally. # To have the list of all mailboxes, should use AliasInfo instead _mailboxes = db.relationship("Mailbox", secondary="alias_mailbox", lazy="joined") # If the mailbox has PGP-enabled, user can choose disable the PGP on the alias # this is useful when some senders already support PGP disable_pgp = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) # a way to bypass the bounce automatic disable mechanism cannot_be_disabled = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) # when a mailbox wants to send an email on behalf of the alias via the reverse-alias # several checks are performed to avoid email spoofing # this option allow disabling these checks disable_email_spoofing_check = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) # to know whether an alias is added using a batch import batch_import_id = db.Column( db.ForeignKey("batch_import.id", ondelete="SET NULL"), nullable=True, default=None, ) # set in case of alias transfer. original_owner_id = db.Column( db.ForeignKey(User.id, ondelete="SET NULL"), nullable=True ) # alias is pinned on top pinned = db.Column(db.Boolean, nullable=False, default=False, server_default="0") # used to transfer an alias to another user transfer_token = db.Column(db.String(64), default=None, unique=True, nullable=True) # have I been pwned hibp_last_check = db.Column(ArrowType, default=None) hibp_breaches = db.relationship("Hibp", secondary="alias_hibp") user = db.relationship(User, foreign_keys=[user_id]) mailbox = db.relationship("Mailbox", lazy="joined") @property def mailboxes(self): ret = [self.mailbox] for m in self._mailboxes: ret.append(m) ret = [mb for mb in ret if mb.verified] ret = sorted(ret, key=lambda mb: mb.email) return ret def mailbox_support_pgp(self) -> bool: """return True of one of the mailboxes support PGP""" for mb in self.mailboxes: if mb.pgp_enabled(): return True return False def pgp_enabled(self) -> bool: if self.mailbox_support_pgp() and not self.disable_pgp: return True return False @classmethod def create(cls, **kw): # whether should call db.session.commit commit = kw.pop("commit", False) r = cls(**kw) email = kw["email"] # make sure email is lowercase and doesn't have any whitespace email = sanitize_email(email) # make sure alias is not in global trash, i.e. DeletedAlias table if DeletedAlias.get_by(email=email): raise AliasInTrashError if DomainDeletedAlias.get_by(email=email): raise AliasInTrashError db.session.add(r) if commit: db.session.commit() return r @classmethod def create_new(cls, user, prefix, note=None, mailbox_id=None): prefix = prefix.lower().strip().replace(" ", "") if not prefix: raise Exception("alias prefix cannot be empty") # find the right suffix - avoid infinite loop by running this at max 1000 times for i in range(1000): suffix = user.get_random_alias_suffix() email = f"{prefix}.{suffix}@{FIRST_ALIAS_DOMAIN}" if not cls.get_by(email=email) and not DeletedAlias.get_by(email=email): break return Alias.create( user_id=user.id, email=email, note=note, mailbox_id=mailbox_id or user.default_mailbox_id, ) @classmethod def delete(cls, obj_id): raise Exception("should use delete_alias(alias,user) instead") @classmethod def create_new_random( cls, user, scheme: int = AliasGeneratorEnum.word.value, in_hex: bool = False, note: str = None, ): """create a new random alias""" custom_domain = None random_email = None if user.default_alias_custom_domain_id: custom_domain = CustomDomain.get(user.default_alias_custom_domain_id) random_email = generate_email( scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain ) elif user.default_alias_public_domain_id: sl_domain: SLDomain = SLDomain.get(user.default_alias_public_domain_id) if sl_domain.premium_only and not user.is_premium(): LOG.warning("%s not premium, cannot use %s", user, sl_domain) else: random_email = generate_email( scheme=scheme, in_hex=in_hex, alias_domain=sl_domain.domain ) if not random_email: random_email = generate_email(scheme=scheme, in_hex=in_hex) alias = Alias.create( user_id=user.id, email=random_email, mailbox_id=user.default_mailbox_id, note=note, ) if custom_domain: alias.custom_domain_id = custom_domain.id return alias def mailbox_email(self): if self.mailbox_id: return self.mailbox.email else: return self.user.email def unsubscribe_link(self) -> (str, bool): """return the unsubscribe link along with whether this is via email (mailto:) or Http POST The mailto: method is preferred """ if UNSUBSCRIBER: return f"mailto:{UNSUBSCRIBER}?subject={self.id}=", True else: return f"{URL}/dashboard/unsubscribe/{self.id}", False def __repr__(self): return f"" class ClientUser(db.Model, ModelMixin): __table_args__ = ( db.UniqueConstraint("user_id", "client_id", name="uq_client_user"), ) user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) client_id = db.Column(db.ForeignKey(Client.id, ondelete="cascade"), nullable=False) # Null means client has access to user original email alias_id = db.Column(db.ForeignKey(Alias.id, ondelete="cascade"), nullable=True) # user can decide to send to client another name name = db.Column( db.String(128), nullable=True, default=None, server_default=text("NULL") ) # user can decide to send to client a default avatar default_avatar = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) alias = db.relationship(Alias, backref="client_users") user = db.relationship(User) client = db.relationship(Client) def get_email(self): return self.alias.email if self.alias_id else self.user.email def get_user_name(self): if self.name: return self.name else: return self.user.name def get_user_info(self) -> dict: """return user info according to client scope Return dict with key being scope name. For now all the fields are the same for all clients: { "client": "Demo", "email": "test-avk5l@mail-tester.com", "email_verified": true, "id": 1, "name": "Son GM", "avatar_url": "http://s3..." } """ res = { "id": self.id, "client": self.client.name, "email_verified": True, "sub": str(self.id), } for scope in self.client.get_scopes(): if scope == Scope.NAME: if self.name: res[Scope.NAME.value] = self.name or "" else: res[Scope.NAME.value] = self.user.name or "" elif scope == Scope.AVATAR_URL: if self.user.profile_picture_id: if self.default_avatar: res[Scope.AVATAR_URL.value] = URL + "/static/default-avatar.png" else: res[Scope.AVATAR_URL.value] = self.user.profile_picture.get_url( AVATAR_URL_EXPIRATION ) else: res[Scope.AVATAR_URL.value] = None elif scope == Scope.EMAIL: # Use generated email if self.alias_id: LOG.debug( "Use gen email for user %s, client %s", self.user, self.client ) res[Scope.EMAIL.value] = self.alias.email # Use user original email else: res[Scope.EMAIL.value] = self.user.email return res class Contact(db.Model, ModelMixin): """ Store configuration of sender (website-email) and alias. """ __table_args__ = ( db.UniqueConstraint("alias_id", "website_email", name="uq_contact"), ) user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, index=True ) alias_id = db.Column( db.ForeignKey(Alias.id, ondelete="cascade"), nullable=False, index=True ) name = db.Column( db.String(512), nullable=True, default=None, server_default=text("NULL") ) website_email = db.Column(db.String(512), nullable=False) # the email from header, e.g. AB CD # nullable as this field is added after website_email website_from = db.Column(db.String(1024), nullable=True) # when user clicks on "reply", they will reply to this address. # This address allows to hide user personal email # this reply email is created every time a website sends an email to user # it has the prefix "reply+" or "ra+" to distinguish with other email reply_email = db.Column(db.String(512), nullable=False, index=True) # whether a contact is created via CC is_cc = db.Column(db.Boolean, nullable=False, default=False, server_default="0") pgp_public_key = db.Column(db.Text, nullable=True) pgp_finger_print = db.Column(db.String(512), nullable=True) alias = db.relationship(Alias, backref="contacts") user = db.relationship(User) # the latest reply sent to this contact latest_reply: Optional[Arrow] = None # to investigate why the website_email is sometimes not correctly parsed # the envelope mail_from mail_from = db.Column(db.Text, nullable=True, default=None) # the message["From"] header from_header = db.Column(db.Text, nullable=True, default=None) # a contact can have an empty email address, in this case it can't receive emails invalid_email = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) @property def email(self): return self.website_email def website_send_to(self): """return the email address with name. to use when user wants to send an email from the alias Return "First Last | email at example.com" """ # Prefer using contact name if possible user = self.user name = self.name email = self.website_email if ( not user or not SenderFormatEnum.has_value(user.sender_format) or user.sender_format == SenderFormatEnum.AT.value ): email = email.replace("@", " at ") elif user.sender_format == SenderFormatEnum.A.value: email = email.replace("@", "(a)") # if no name, try to parse it from website_from if not name and self.website_from: try: from app.email_utils import parseaddr_unicode name, _ = parseaddr_unicode(self.website_from) except Exception: # Skip if website_from is wrongly formatted LOG.warning( "Cannot parse contact %s website_from %s", self, self.website_from ) name = "" # remove all double quote if name: name = name.replace('"', "") if name: name = name + " | " + email else: name = email # cannot use formataddr here as this field is for email client, not for MTA return f'"{name}" <{self.reply_email}>' def new_addr(self): """ Replace original email by reply_email. Possible formats: - first@example.com via SimpleLogin OR - First Last - first at example.com OR - First Last - first(a)example.com OR - First Last - first@example.com OR And return new address with RFC 2047 format `new_email` is a special reply address """ user = self.user if ( not user or not SenderFormatEnum.has_value(user.sender_format) or user.sender_format == SenderFormatEnum.VIA.value ): new_name = f"{self.website_email} via SimpleLogin" else: if user.sender_format == SenderFormatEnum.AT.value: formatted_email = self.website_email.replace("@", " at ").strip() elif user.sender_format == SenderFormatEnum.A.value: formatted_email = self.website_email.replace("@", "(a)").strip() elif user.sender_format == SenderFormatEnum.FULL.value: formatted_email = self.website_email.strip() # Prefix name to formatted email if available new_name = ( (self.name + " - " + formatted_email) if self.name and self.name != self.website_email.strip() else formatted_email ) new_addr = formataddr((new_name, self.reply_email)).strip() return new_addr.strip() def last_reply(self) -> "EmailLog": """return the most recent reply""" return ( EmailLog.query.filter_by(contact_id=self.id, is_reply=True) .order_by(desc(EmailLog.created_at)) .first() ) def __repr__(self): return f"" class EmailLog(db.Model, ModelMixin): user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, index=True ) contact_id = db.Column( db.ForeignKey(Contact.id, ondelete="cascade"), nullable=False, index=True ) alias_id = db.Column( db.ForeignKey(Alias.id, ondelete="cascade"), nullable=True, index=True ) # whether this is a reply is_reply = db.Column(db.Boolean, nullable=False, default=False) # for ex if alias is disabled, this forwarding is blocked blocked = db.Column(db.Boolean, nullable=False, default=False) # can happen when user mailbox refuses the forwarded email # usually because the forwarded email is too spammy bounced = db.Column(db.Boolean, nullable=False, default=False, server_default="0") # happen when an email with auto (holiday) reply auto_replied = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) # SpamAssassin result is_spam = db.Column(db.Boolean, nullable=False, default=False, server_default="0") spam_score = db.Column(db.Float, nullable=True) spam_status = db.Column(db.Text, nullable=True, default=None) # do not load this column spam_report = deferred(db.Column(db.JSON, nullable=True)) # Point to the email that has been refused refused_email_id = db.Column( db.ForeignKey("refused_email.id", ondelete="SET NULL"), nullable=True ) # in forward phase, this is the mailbox that will receive the email # in reply phase, this is the mailbox (or a mailbox's authorized address) that sends the email mailbox_id = db.Column( db.ForeignKey("mailbox.id", ondelete="cascade"), nullable=True ) # in case of bounce, record on what mailbox the email has been bounced # useful when an alias has several mailboxes bounced_mailbox_id = db.Column( db.ForeignKey("mailbox.id", ondelete="cascade"), nullable=True ) refused_email = db.relationship("RefusedEmail") forward = db.relationship(Contact) contact = db.relationship(Contact, backref="email_logs") mailbox = db.relationship("Mailbox", lazy="joined", foreign_keys=[mailbox_id]) user = db.relationship(User) def bounced_mailbox(self) -> str: if self.bounced_mailbox_id: return Mailbox.get(self.bounced_mailbox_id).email # retro-compatibility return self.contact.alias.mailboxes[0].email def get_action(self) -> str: """return the action name: forward|reply|block|bounced""" if self.is_reply: return "reply" elif self.bounced: return "bounced" elif self.blocked: return "block" else: return "forward" def get_phase(self) -> str: if self.is_reply: return "reply" else: return "forward" def __repr__(self): return f"" class Subscription(db.Model, ModelMixin): """Paddle subscription""" # Come from Paddle cancel_url = db.Column(db.String(1024), nullable=False) update_url = db.Column(db.String(1024), nullable=False) subscription_id = db.Column(db.String(1024), nullable=False, unique=True) event_time = db.Column(ArrowType, nullable=False) next_bill_date = db.Column(db.Date, nullable=False) cancelled = db.Column(db.Boolean, nullable=False, default=False) plan = db.Column(db.Enum(PlanEnum), nullable=False) user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True ) user = db.relationship(User) def plan_name(self): if self.plan == PlanEnum.monthly: return "Monthly ($4/month)" else: return "Yearly ($30/year)" def __repr__(self): return f"" class ManualSubscription(db.Model, ModelMixin): """ For users who use other forms of payment and therefore not pass by Paddle """ user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True ) # an reminder is sent several days before the subscription ends end_at = db.Column(ArrowType, nullable=False) # for storing note about this subscription comment = db.Column(db.Text, nullable=True) # manual subscription are also used for Premium giveaways is_giveaway = db.Column( db.Boolean, default=False, nullable=False, server_default="0" ) user = db.relationship(User) def is_active(self): return self.end_at > arrow.now() class CoinbaseSubscription(db.Model, ModelMixin): """ For subscriptions using Coinbase Commerce """ user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True ) # an reminder is sent several days before the subscription ends end_at = db.Column(ArrowType, nullable=False) # the Coinbase code code = db.Column(db.String(64), nullable=True) user = db.relationship(User) def is_active(self): return self.end_at > arrow.now() # https://help.apple.com/app-store-connect/#/dev58bda3212 _APPLE_GRACE_PERIOD_DAYS = 16 class AppleSubscription(db.Model, ModelMixin): """ For users who have subscribed via Apple in-app payment """ user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True ) expires_date = db.Column(ArrowType, nullable=False) # to avoid using "Restore Purchase" on another account original_transaction_id = db.Column(db.String(256), nullable=False, unique=True) receipt_data = db.Column(db.Text(), nullable=False) plan = db.Column(db.Enum(PlanEnum), nullable=False) user = db.relationship(User) def is_valid(self): # Todo: take into account grace period? return self.expires_date > arrow.now().shift(days=-_APPLE_GRACE_PERIOD_DAYS) class DeletedAlias(db.Model, ModelMixin): """Store all deleted alias to make sure they are NOT reused""" email = db.Column(db.String(256), unique=True, nullable=False) @classmethod def create(cls, **kw): raise Exception("should use delete_alias(alias,user) instead") def __repr__(self): return f"" class EmailChange(db.Model, ModelMixin): """Used when user wants to update their email""" user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True, index=True, ) new_email = db.Column(db.String(256), unique=True, nullable=False) code = db.Column(db.String(128), unique=True, nullable=False) expired = db.Column(ArrowType, nullable=False, default=_expiration_12h) user = db.relationship(User) def is_expired(self): return self.expired < arrow.now() def __repr__(self): return f"" class AliasUsedOn(db.Model, ModelMixin): """Used to know where an alias is created""" __table_args__ = ( db.UniqueConstraint("alias_id", "hostname", name="uq_alias_used"), ) alias_id = db.Column(db.ForeignKey(Alias.id, ondelete="cascade"), nullable=False) user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) alias = db.relationship(Alias) hostname = db.Column(db.String(1024), nullable=False) class ApiKey(db.Model, ModelMixin): """used in browser extension to identify user""" user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) code = db.Column(db.String(128), unique=True, nullable=False) name = db.Column(db.String(128), nullable=False) last_used = db.Column(ArrowType, default=None) times = db.Column(db.Integer, default=0, nullable=False) user = db.relationship(User) @classmethod def create(cls, user_id, name): # generate unique code found = False while not found: code = random_string(60) if not cls.get_by(code=code): found = True a = cls(user_id=user_id, code=code, name=name) db.session.add(a) return a class CustomDomain(db.Model, ModelMixin): user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) domain = db.Column(db.String(128), unique=True, nullable=False) # default name to use when user replies/sends from alias name = db.Column(db.String(128), nullable=True, default=None) verified = db.Column(db.Boolean, nullable=False, default=False) dkim_verified = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) spf_verified = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) dmarc_verified = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) _mailboxes = db.relationship("Mailbox", secondary="domain_mailbox", lazy="joined") # an alias is created automatically the first time it receives an email catch_all = db.Column(db.Boolean, nullable=False, default=False, server_default="0") # option to generate random prefix version automatically random_prefix_generation = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) # incremented when a check is failed on the domain # alert when the number exceeds a threshold # used in check_custom_domain() nb_failed_checks = db.Column( db.Integer, default=0, server_default="0", nullable=False ) user = db.relationship(User, foreign_keys=[user_id]) @property def mailboxes(self): if self._mailboxes: return self._mailboxes else: return [self.user.default_mailbox] def nb_alias(self): return Alias.filter_by(custom_domain_id=self.id).count() def get_trash_url(self): return URL + f"/dashboard/domains/{self.id}/trash" def __repr__(self): return f"" class DomainDeletedAlias(db.Model, ModelMixin): """Store all deleted alias for a domain""" __table_args__ = ( db.UniqueConstraint("domain_id", "email", name="uq_domain_trash"), ) email = db.Column(db.String(256), nullable=False) domain_id = db.Column( db.ForeignKey("custom_domain.id", ondelete="cascade"), nullable=False ) user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) domain = db.relationship(CustomDomain) @classmethod def create(cls, **kw): raise Exception("should use delete_alias(alias,user) instead") def __repr__(self): return f"" class LifetimeCoupon(db.Model, ModelMixin): code = db.Column(db.String(128), nullable=False, unique=True) nb_used = db.Column(db.Integer, nullable=False) paid = db.Column(db.Boolean, default=False, server_default="0", nullable=False) class Coupon(db.Model, ModelMixin): code = db.Column(db.String(128), nullable=False, unique=True) # by default a coupon is for 1 year nb_year = db.Column(db.Integer, nullable=False, server_default="1", default=1) # whether the coupon has been used used = db.Column(db.Boolean, default=False, server_default="0", nullable=False) # the user who uses the code # non-null when the coupon is used used_by_user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=True ) class Directory(db.Model, ModelMixin): user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) name = db.Column(db.String(128), unique=True, nullable=False) # when a directory is disabled, new alias can't be created on the fly disabled = db.Column(db.Boolean, default=False, nullable=False, server_default="0") user = db.relationship(User) _mailboxes = db.relationship( "Mailbox", secondary="directory_mailbox", lazy="joined" ) @property def mailboxes(self): if self._mailboxes: return self._mailboxes else: return [self.user.default_mailbox] def nb_alias(self): return Alias.filter_by(directory_id=self.id).count() @classmethod def delete(cls, obj_id): obj: Directory = cls.get(obj_id) user = obj.user # Put all aliases belonging to this directory to global or domain trash for alias in Alias.query.filter_by(directory_id=obj_id): from app import alias_utils alias_utils.delete_alias(alias, user) cls.query.filter(cls.id == obj_id).delete() db.session.commit() def __repr__(self): return f"" class Job(db.Model, ModelMixin): """Used to schedule one-time job in the future""" name = db.Column(db.String(128), nullable=False) payload = db.Column(db.JSON) # whether the job has been taken by the job runner taken = db.Column(db.Boolean, default=False, nullable=False) run_at = db.Column(ArrowType) def __repr__(self): return f"" class Mailbox(db.Model, ModelMixin): user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, index=True ) email = db.Column(db.String(256), nullable=False, index=True) verified = db.Column(db.Boolean, default=False, nullable=False) force_spf = db.Column(db.Boolean, default=True, server_default="1", nullable=False) # used when user wants to update mailbox email new_email = db.Column(db.String(256), unique=True) pgp_public_key = db.Column(db.Text, nullable=True) pgp_finger_print = db.Column(db.String(512), nullable=True) disable_pgp = db.Column( db.Boolean, default=False, nullable=False, server_default="0" ) # incremented when a check is failed on the mailbox # alert when the number exceeds a threshold # used in sanity_check() nb_failed_checks = db.Column( db.Integer, default=0, server_default="0", nullable=False ) # a mailbox can be disabled if it can't be reached disabled = db.Column(db.Boolean, default=False, nullable=False, server_default="0") generic_subject = db.Column(db.String(78), nullable=True) __table_args__ = (db.UniqueConstraint("user_id", "email", name="uq_mailbox_user"),) user = db.relationship(User, foreign_keys=[user_id]) def pgp_enabled(self) -> bool: if self.pgp_finger_print and not self.disable_pgp: return True return False def nb_alias(self): return ( AliasMailbox.filter_by(mailbox_id=self.id).count() + Alias.filter_by(mailbox_id=self.id).count() ) @classmethod def delete(cls, obj_id): mailbox: Mailbox = cls.get(obj_id) user = mailbox.user # Put all aliases belonging to this mailbox to global or domain trash for alias in Alias.query.filter_by(mailbox_id=obj_id): # special handling for alias that has several mailboxes and has mailbox_id=obj_id if len(alias.mailboxes) > 1: # use the first mailbox found in alias._mailboxes first_mb = alias._mailboxes[0] alias.mailbox_id = first_mb.id alias._mailboxes.remove(first_mb) else: from app import alias_utils # only put aliases that have mailbox as a single mailbox into trash alias_utils.delete_alias(alias, user) db.session.commit() cls.query.filter(cls.id == obj_id).delete() db.session.commit() @property def aliases(self) -> [Alias]: ret = Alias.filter_by(mailbox_id=self.id).all() for am in AliasMailbox.filter_by(mailbox_id=self.id): ret.append(am.alias) return ret def __repr__(self): return f"" class AccountActivation(db.Model, ModelMixin): """contains code to activate the user account when they sign up on mobile""" user_id = db.Column( db.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True ) # the activation code is usually 6 digits code = db.Column(db.String(10), nullable=False) # nb tries decrements each time user enters wrong code tries = db.Column(db.Integer, default=3, nullable=False) __table_args__ = ( CheckConstraint(tries >= 0, name="account_activation_tries_positive"), {}, ) class RefusedEmail(db.Model, ModelMixin): """Store emails that have been refused, i.e. bounced or classified as spams""" # Store the full report, including logs from Sending & Receiving MTA full_report_path = db.Column(db.String(128), unique=True, nullable=False) # The original email, to display to user path = db.Column(db.String(128), unique=True, nullable=True) user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) # the email content will be deleted at this date delete_at = db.Column(ArrowType, nullable=False, default=_expiration_7d) # toggle this when email content (stored at full_report_path & path are deleted) deleted = db.Column(db.Boolean, nullable=False, default=False, server_default="0") def get_url(self, expires_in=3600): if self.path: return s3.get_url(self.path, expires_in) else: return s3.get_url(self.full_report_path, expires_in) def __repr__(self): return f"" class Referral(db.Model, ModelMixin): """Referral code so user can invite others""" user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) name = db.Column(db.String(512), nullable=True, default=None) code = db.Column(db.String(128), unique=True, nullable=False) user = db.relationship(User, foreign_keys=[user_id]) @property def nb_user(self) -> int: return User.filter_by(referral_id=self.id, activated=True).count() @property def nb_paid_user(self) -> int: res = 0 for user in User.filter_by(referral_id=self.id, activated=True): if user.is_paid(): res += 1 return res def link(self): return f"{LANDING_PAGE_URL}?slref={self.code}" class SentAlert(db.Model, ModelMixin): """keep track of alerts sent to user. User can receive an alert when there's abnormal activity on their aliases such as - reverse-alias not used by the owning mailbox - SPF fails when using the reverse-alias - bounced email - ... Different rate controls can then be implemented based on SentAlert: - only once alert: an alert type should be sent only once - max number of sent per 24H: an alert type should not be sent more than X times in 24h """ user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) to_email = db.Column(db.String(256), nullable=False) alert_type = db.Column(db.String(256), nullable=False) class AliasMailbox(db.Model, ModelMixin): __table_args__ = ( db.UniqueConstraint("alias_id", "mailbox_id", name="uq_alias_mailbox"), ) alias_id = db.Column( db.ForeignKey(Alias.id, ondelete="cascade"), nullable=False, index=True ) mailbox_id = db.Column( db.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False, index=True ) alias = db.relationship(Alias) class AliasHibp(db.Model, ModelMixin): __tablename__ = "alias_hibp" __table_args__ = (db.UniqueConstraint("alias_id", "hibp_id", name="uq_alias_hibp"),) alias_id = db.Column( db.Integer(), db.ForeignKey("alias.id", ondelete="cascade"), index=True ) hibp_id = db.Column( db.Integer(), db.ForeignKey("hibp.id", ondelete="cascade"), index=True ) alias = db.relationship( "Alias", backref=db.backref("alias_hibp", cascade="all, delete-orphan") ) hibp = db.relationship( "Hibp", backref=db.backref("alias_hibp", cascade="all, delete-orphan") ) class DirectoryMailbox(db.Model, ModelMixin): __table_args__ = ( db.UniqueConstraint("directory_id", "mailbox_id", name="uq_directory_mailbox"), ) directory_id = db.Column( db.ForeignKey(Directory.id, ondelete="cascade"), nullable=False ) mailbox_id = db.Column( db.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False ) class DomainMailbox(db.Model, ModelMixin): """store the owning mailboxes for a domain""" __table_args__ = ( db.UniqueConstraint("domain_id", "mailbox_id", name="uq_domain_mailbox"), ) domain_id = db.Column( db.ForeignKey(CustomDomain.id, ondelete="cascade"), nullable=False ) mailbox_id = db.Column( db.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False ) _NB_RECOVERY_CODE = 8 _RECOVERY_CODE_LENGTH = 8 class RecoveryCode(db.Model, ModelMixin): """allow user to login in case you lose any of your authenticators""" __table_args__ = (db.UniqueConstraint("user_id", "code", name="uq_recovery_code"),) user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) code = db.Column(db.String(16), nullable=False) used = db.Column(db.Boolean, nullable=False, default=False) used_at = db.Column(ArrowType, nullable=True, default=None) user = db.relationship(User) @classmethod def generate(cls, user): """generate recovery codes for user""" # delete all existing codes cls.query.filter_by(user_id=user.id).delete() db.session.flush() nb_code = 0 while nb_code < _NB_RECOVERY_CODE: code = random_string(_RECOVERY_CODE_LENGTH) if not cls.get_by(user_id=user.id, code=code): cls.create(user_id=user.id, code=code) nb_code += 1 LOG.d("Create recovery codes for %s", user) db.session.commit() @classmethod def empty(cls, user): """Delete all recovery codes for user""" cls.query.filter_by(user_id=user.id).delete() db.session.commit() class Notification(db.Model, ModelMixin): user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) message = db.Column(db.Text, nullable=False) # whether user has marked the notification as read read = db.Column(db.Boolean, nullable=False, default=False) class SLDomain(db.Model, ModelMixin): """SimpleLogin domains""" __tablename__ = "public_domain" domain = db.Column(db.String(128), unique=True, nullable=False) # only available for premium accounts premium_only = db.Column( db.Boolean, nullable=False, default=False, server_default="0" ) def __repr__(self): return f"" class AuthorizedAddress(db.Model, ModelMixin): """Authorize other addresses to send emails from aliases that are owned by a mailbox""" user_id = db.Column(db.ForeignKey(User.id, ondelete="cascade"), nullable=False) mailbox_id = db.Column( db.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False ) email = db.Column(db.String(256), nullable=False) __table_args__ = ( db.UniqueConstraint("mailbox_id", "email", name="uq_authorize_address"), ) mailbox = db.relationship(Mailbox, backref="authorized_addresses") def __repr__(self): return f"" class Metric2(db.Model, ModelMixin): """ For storing different metrics like number of users, etc Store each metric as a column as opposed to having different rows as in Metric """ date = db.Column(ArrowType, default=arrow.utcnow, nullable=False) nb_user = db.Column(db.Float, nullable=True) nb_activated_user = db.Column(db.Float, nullable=True) nb_premium = db.Column(db.Float, nullable=True) nb_apple_premium = db.Column(db.Float, nullable=True) nb_cancelled_premium = db.Column(db.Float, nullable=True) nb_manual_premium = db.Column(db.Float, nullable=True) nb_coinbase_premium = db.Column(db.Float, nullable=True) # nb users who have been referred nb_referred_user = db.Column(db.Float, nullable=True) nb_referred_user_paid = db.Column(db.Float, nullable=True) nb_alias = db.Column(db.Float, nullable=True) # Obsolete as only for the last 14 days nb_forward = db.Column(db.Float, nullable=True) nb_block = db.Column(db.Float, nullable=True) nb_reply = db.Column(db.Float, nullable=True) nb_bounced = db.Column(db.Float, nullable=True) nb_spam = db.Column(db.Float, nullable=True) # should be used instead nb_forward_last_24h = db.Column(db.Float, nullable=True) nb_block_last_24h = db.Column(db.Float, nullable=True) nb_reply_last_24h = db.Column(db.Float, nullable=True) nb_bounced_last_24h = db.Column(db.Float, nullable=True) nb_verified_custom_domain = db.Column(db.Float, nullable=True) nb_app = db.Column(db.Float, nullable=True) class Bounce(db.Model, ModelMixin): """Record all bounces. Deleted after 7 days""" email = db.Column(db.String(256), nullable=False, index=True) class TransactionalEmail(db.Model, ModelMixin): """Storing all email addresses that receive transactional emails, including account email and mailboxes. Deleted after 7 days """ email = db.Column(db.String(256), nullable=False, unique=True) class Payout(db.Model, ModelMixin): """Referral payouts""" user_id = db.Column(db.ForeignKey("users.id", ondelete="cascade"), nullable=False) # in USD amount = db.Column(db.Float, nullable=False) # BTC, PayPal, etc payment_method = db.Column(db.String(256), nullable=False) # number of upgraded user included in this payout number_upgraded_account = db.Column(db.Integer, nullable=False) comment = db.Column(db.Text) user = db.relationship(User) class IgnoredEmail(db.Model, ModelMixin): """If an email has mail_from and rcpt_to present in this table, discard it by returning 250 status.""" mail_from = db.Column(db.String(512), nullable=False) rcpt_to = db.Column(db.String(512), nullable=False)