mirror of
https://github.com/simple-login/app.git
synced 2024-11-14 08:01:13 +01:00
4bcc728222
* origin/master: (34 commits) fix flake8 add link to the anti phishing page improve email wording Move tests Only send enum names Only send enum name for events intead of the full class.enum Also track login and register events from the api routes typo revert changes Added fix for parts that are not messages Add missing formatting place Revert unwanted changes Do not show an error if we receive an unsubscribe from a different address Revert changes to pgp_utils fix import Send newrelic events on login and register PR changes format Move dmarc management to its own file ignore VERPTransactional ...
2992 lines
94 KiB
Python
2992 lines
94 KiB
Python
import enum
|
|
import os
|
|
import random
|
|
import uuid
|
|
from email.utils import formataddr
|
|
from typing import List, Tuple, Optional
|
|
|
|
import arrow
|
|
import sqlalchemy as sa
|
|
from arrow import Arrow
|
|
from email_validator import validate_email
|
|
from flanker.addresslib import address
|
|
from flask import url_for
|
|
from flask_login import UserMixin
|
|
from jinja2 import FileSystemLoader, Environment
|
|
from sqlalchemy import orm
|
|
from sqlalchemy import text, desc, CheckConstraint, Index, Column
|
|
from sqlalchemy.dialects.postgresql import TSVECTOR
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import deferred
|
|
from sqlalchemy_utils import ArrowType
|
|
|
|
from app import s3
|
|
from app.config import (
|
|
MAX_NB_EMAIL_FREE_PLAN,
|
|
URL,
|
|
AVATAR_URL_EXPIRATION,
|
|
JOB_ONBOARDING_1,
|
|
JOB_ONBOARDING_2,
|
|
JOB_ONBOARDING_4,
|
|
LANDING_PAGE_URL,
|
|
FIRST_ALIAS_DOMAIN,
|
|
DISABLE_ONBOARDING,
|
|
UNSUBSCRIBER,
|
|
ALIAS_RANDOM_SUFFIX_LENGTH,
|
|
MAX_NB_SUBDOMAIN,
|
|
MAX_NB_DIRECTORY,
|
|
ROOT_DIR,
|
|
)
|
|
from app.db import Session
|
|
from app.errors import (
|
|
AliasInTrashError,
|
|
DirectoryInTrashError,
|
|
SubdomainInTrashError,
|
|
CannotCreateContactForReverseAlias,
|
|
)
|
|
from app.log import LOG
|
|
from app.oauth_models import Scope
|
|
from app.pw_models import PasswordOracle
|
|
from app.utils import (
|
|
convert_to_id,
|
|
random_string,
|
|
random_words,
|
|
sanitize_email,
|
|
random_word,
|
|
)
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
class TSVector(sa.types.TypeDecorator):
|
|
impl = TSVECTOR
|
|
|
|
|
|
class ModelMixin(object):
|
|
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
|
|
created_at = sa.Column(ArrowType, default=arrow.utcnow, nullable=False)
|
|
updated_at = sa.Column(ArrowType, default=None, onupdate=arrow.utcnow)
|
|
|
|
_repr_hide = ["created_at", "updated_at"]
|
|
|
|
@classmethod
|
|
def query(cls):
|
|
return Session.query(cls)
|
|
|
|
@classmethod
|
|
def yield_per_query(cls, page=1000):
|
|
"""to be used when iterating on a big table to avoid taking all the memory"""
|
|
return Session.query(cls).yield_per(page).enable_eagerloads(False)
|
|
|
|
@classmethod
|
|
def get(cls, id):
|
|
return Session.query(cls).get(id)
|
|
|
|
@classmethod
|
|
def get_by(cls, **kw):
|
|
return Session.query(cls).filter_by(**kw).first()
|
|
|
|
@classmethod
|
|
def filter_by(cls, **kw):
|
|
return Session.query(cls).filter_by(**kw)
|
|
|
|
@classmethod
|
|
def filter(cls, *args, **kw):
|
|
return Session.query(cls).filter(*args, **kw)
|
|
|
|
@classmethod
|
|
def order_by(cls, *args, **kw):
|
|
return Session.query(cls).order_by(*args, **kw)
|
|
|
|
@classmethod
|
|
def all(cls):
|
|
return Session.query(cls).all()
|
|
|
|
@classmethod
|
|
def count(cls):
|
|
return Session.query(cls).count()
|
|
|
|
@classmethod
|
|
def get_or_create(cls, **kw):
|
|
r = cls.get_by(**kw)
|
|
if not r:
|
|
r = cls(**kw)
|
|
Session.add(r)
|
|
|
|
return r
|
|
|
|
@classmethod
|
|
def create(cls, **kw):
|
|
# whether to call Session.commit
|
|
commit = kw.pop("commit", False)
|
|
flush = kw.pop("flush", False)
|
|
|
|
r = cls(**kw)
|
|
Session.add(r)
|
|
|
|
if commit:
|
|
Session.commit()
|
|
|
|
if flush:
|
|
Session.flush()
|
|
|
|
return r
|
|
|
|
def save(self):
|
|
Session.add(self)
|
|
|
|
@classmethod
|
|
def delete(cls, obj_id, commit=False):
|
|
Session.query(cls).filter(cls.id == obj_id).delete()
|
|
|
|
if commit:
|
|
Session.commit()
|
|
|
|
@classmethod
|
|
def first(cls):
|
|
return Session.query(cls).first()
|
|
|
|
def __repr__(self):
|
|
values = ", ".join(
|
|
"%s=%r" % (n, getattr(self, n))
|
|
for n in self.__table__.c.keys()
|
|
if n not in self._repr_hide
|
|
)
|
|
return "%s(%s)" % (self.__class__.__name__, values)
|
|
|
|
|
|
class File(Base, ModelMixin):
|
|
__tablename__ = "file"
|
|
path = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
user_id = sa.Column(sa.ForeignKey("users.id", ondelete="cascade"), nullable=True)
|
|
|
|
def get_url(self, expires_in=3600):
|
|
return s3.get_url(self.path, expires_in)
|
|
|
|
def __repr__(self):
|
|
return f"<File {self.path}>"
|
|
|
|
|
|
class EnumE(enum.Enum):
|
|
@classmethod
|
|
def has_value(cls, value: int) -> bool:
|
|
return value in set(item.value for item in cls)
|
|
|
|
@classmethod
|
|
def get_name(cls, value: int) -> Optional[str]:
|
|
for item in cls:
|
|
if item.value == value:
|
|
return item.name
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
def has_name(cls, name: str) -> bool:
|
|
for item in cls:
|
|
if item.name == name:
|
|
return True
|
|
|
|
return False
|
|
|
|
@classmethod
|
|
def get_value(cls, name: str) -> Optional[int]:
|
|
for item in cls:
|
|
if item.name == name:
|
|
return item.value
|
|
|
|
return None
|
|
|
|
|
|
class PlanEnum(EnumE):
|
|
monthly = 2
|
|
yearly = 3
|
|
|
|
|
|
# Specify the format for sender address
|
|
class SenderFormatEnum(EnumE):
|
|
AT = 0 # John Wick - john at wick.com
|
|
A = 2 # John Wick - john(a)wick.com
|
|
NAME_ONLY = 5 # John Wick
|
|
AT_ONLY = 6 # john at wick.com
|
|
NO_NAME = 7
|
|
|
|
|
|
class AliasGeneratorEnum(EnumE):
|
|
word = 1 # aliases are generated based on random words
|
|
uuid = 2 # aliases are generated based on uuid
|
|
|
|
|
|
class AliasSuffixEnum(EnumE):
|
|
word = 0 # Random word from dictionary file
|
|
random_string = 1 # Completely random string
|
|
|
|
|
|
class BlockBehaviourEnum(EnumE):
|
|
return_2xx = 0
|
|
return_5xx = 1
|
|
|
|
|
|
class AuditLogActionEnum(EnumE):
|
|
create_object = 0
|
|
update_object = 1
|
|
delete_object = 2
|
|
manual_upgrade = 3
|
|
extend_trial = 4
|
|
disable_2fa = 5
|
|
logged_as_user = 6
|
|
extend_subscription = 7
|
|
|
|
|
|
class VerpType(EnumE):
|
|
bounce_forward = 0
|
|
bounce_reply = 1
|
|
transactional = 2
|
|
|
|
|
|
class Hibp(Base, ModelMixin):
|
|
__tablename__ = "hibp"
|
|
name = sa.Column(sa.String(), nullable=False, unique=True, index=True)
|
|
breached_aliases = orm.relationship("Alias", secondary="alias_hibp")
|
|
|
|
description = sa.Column(sa.Text)
|
|
date = sa.Column(ArrowType, nullable=True)
|
|
|
|
def __repr__(self):
|
|
return f"<HIBP Breach {self.id} {self.name}>"
|
|
|
|
|
|
class HibpNotifiedAlias(Base, ModelMixin):
|
|
"""Contain list of aliases that have been notified to users
|
|
So that we can only notify users of new aliases.
|
|
"""
|
|
|
|
__tablename__ = "hibp_notified_alias"
|
|
alias_id = sa.Column(sa.ForeignKey("alias.id", ondelete="cascade"), nullable=False)
|
|
user_id = sa.Column(sa.ForeignKey("users.id", ondelete="cascade"), nullable=False)
|
|
|
|
notified_at = sa.Column(ArrowType, default=arrow.utcnow, nullable=False)
|
|
|
|
|
|
class Fido(Base, ModelMixin):
|
|
__tablename__ = "fido"
|
|
credential_id = sa.Column(sa.String(), nullable=False, unique=True, index=True)
|
|
uuid = sa.Column(
|
|
sa.ForeignKey("users.fido_uuid", ondelete="cascade"),
|
|
unique=False,
|
|
nullable=False,
|
|
)
|
|
public_key = sa.Column(sa.String(), nullable=False, unique=True)
|
|
sign_count = sa.Column(sa.BigInteger(), nullable=False)
|
|
name = sa.Column(sa.String(128), nullable=False, unique=False)
|
|
user_id = sa.Column(sa.ForeignKey("users.id", ondelete="cascade"), nullable=True)
|
|
|
|
|
|
class User(Base, ModelMixin, UserMixin, PasswordOracle):
|
|
__tablename__ = "users"
|
|
email = sa.Column(sa.String(256), unique=True, nullable=False)
|
|
|
|
name = sa.Column(sa.String(128), nullable=True)
|
|
is_admin = sa.Column(sa.Boolean, nullable=False, default=False)
|
|
alias_generator = sa.Column(
|
|
sa.Integer,
|
|
nullable=False,
|
|
default=AliasGeneratorEnum.word.value,
|
|
server_default=str(AliasGeneratorEnum.word.value),
|
|
)
|
|
notification = sa.Column(
|
|
sa.Boolean, default=True, nullable=False, server_default="1"
|
|
)
|
|
|
|
activated = sa.Column(sa.Boolean, default=False, nullable=False)
|
|
|
|
# an account can be disabled if having harmful behavior
|
|
disabled = sa.Column(sa.Boolean, default=False, nullable=False, server_default="0")
|
|
|
|
profile_picture_id = sa.Column(sa.ForeignKey(File.id), nullable=True)
|
|
|
|
otp_secret = sa.Column(sa.String(16), nullable=True)
|
|
enable_otp = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
last_otp = sa.Column(sa.String(12), nullable=True, default=False)
|
|
|
|
# Fields for WebAuthn
|
|
fido_uuid = sa.Column(sa.String(), nullable=True, unique=True)
|
|
|
|
# the default domain that's used when user creates a new random alias
|
|
# default_alias_custom_domain_id XOR default_alias_public_domain_id
|
|
default_alias_custom_domain_id = sa.Column(
|
|
sa.ForeignKey("custom_domain.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
default=None,
|
|
)
|
|
|
|
default_alias_public_domain_id = sa.Column(
|
|
sa.ForeignKey("public_domain.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
default=None,
|
|
)
|
|
|
|
# some users could have lifetime premium
|
|
lifetime = sa.Column(sa.Boolean, default=False, nullable=False, server_default="0")
|
|
paid_lifetime = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
lifetime_coupon_id = sa.Column(
|
|
sa.ForeignKey("lifetime_coupon.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
default=None,
|
|
)
|
|
|
|
# user can use all premium features until this date
|
|
trial_end = sa.Column(
|
|
ArrowType, default=lambda: arrow.now().shift(days=7, hours=1), nullable=True
|
|
)
|
|
|
|
# the mailbox used when create random alias
|
|
# this field is nullable but in practice, it's always set
|
|
# it cannot be set to non-nullable though
|
|
# as this will create foreign key cycle between User and Mailbox
|
|
default_mailbox_id = sa.Column(
|
|
sa.ForeignKey("mailbox.id"), nullable=True, default=None
|
|
)
|
|
|
|
profile_picture = orm.relationship(File, foreign_keys=[profile_picture_id])
|
|
|
|
# Specify the format for sender address
|
|
# for the full list, see SenderFormatEnum
|
|
sender_format = sa.Column(
|
|
sa.Integer, default="0", nullable=False, server_default="0"
|
|
)
|
|
# to know whether user has explicitly chosen a sender format as opposed to those who use the default ones.
|
|
# users who haven't chosen a sender format and are using 1 or 3 format, their sender format will be set to 0
|
|
sender_format_updated_at = sa.Column(ArrowType, default=None)
|
|
|
|
replace_reverse_alias = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
referral_id = sa.Column(
|
|
sa.ForeignKey("referral.id", ondelete="SET NULL"), nullable=True, default=None
|
|
)
|
|
|
|
referral = orm.relationship("Referral", foreign_keys=[referral_id])
|
|
|
|
# whether intro has been shown to user
|
|
intro_shown = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
default_mailbox = orm.relationship("Mailbox", foreign_keys=[default_mailbox_id])
|
|
|
|
# user can set a more strict max_spam score to block spams more aggressively
|
|
max_spam_score = sa.Column(sa.Integer, nullable=True)
|
|
|
|
# newsletter is sent to this address
|
|
newsletter_alias_id = sa.Column(
|
|
sa.ForeignKey("alias.id", ondelete="SET NULL"), nullable=True, default=None
|
|
)
|
|
|
|
# whether to include the sender address in reverse-alias
|
|
include_sender_in_reverse_alias = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
# whether to use random string or random word as suffix
|
|
# Random word from dictionary file -> 0
|
|
# Completely random string -> 1
|
|
random_alias_suffix = sa.Column(
|
|
sa.Integer,
|
|
nullable=False,
|
|
default=AliasSuffixEnum.random_string.value,
|
|
server_default=str(AliasSuffixEnum.random_string.value),
|
|
)
|
|
|
|
# always expand the alias info, i.e. without needing to press "More"
|
|
expand_alias_info = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
# ignore emails send from a mailbox to its alias. This can happen when replying all to a forwarded email
|
|
# can automatically re-includes the alias
|
|
ignore_loop_email = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
# used for flask-login as an "alternative token"
|
|
# cf https://flask-login.readthedocs.io/en/latest/#alternative-tokens
|
|
alternative_id = sa.Column(sa.String(128), unique=True, nullable=True)
|
|
|
|
# by default, when an alias is automatically created, a note like "Created with ...." is created
|
|
# If this field is True, the note won't be created.
|
|
disable_automatic_alias_note = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
# By default, the one-click unsubscribe disable the alias
|
|
# If set to true, it will block the sender instead
|
|
one_click_unsubscribe_block_sender = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
# automatically include the website name when user creates an alias via the SimpleLogin icon in the email field
|
|
include_website_in_one_click_alias = sa.Column(
|
|
sa.Boolean,
|
|
# new user will have this option turned on automatically
|
|
default=True,
|
|
nullable=False,
|
|
# old user will have this option turned off
|
|
server_default="0",
|
|
)
|
|
|
|
_directory_quota = sa.Column(
|
|
"directory_quota", sa.Integer, default=50, nullable=False, server_default="50"
|
|
)
|
|
|
|
_subdomain_quota = sa.Column(
|
|
"subdomain_quota", sa.Integer, default=5, nullable=False, server_default="5"
|
|
)
|
|
|
|
# user can use import to import too many aliases
|
|
disable_import = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
# user can use the phone feature
|
|
can_use_phone = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
# in minutes
|
|
phone_quota = sa.Column(sa.Integer, nullable=True)
|
|
|
|
# Status code to return if is blocked
|
|
block_behaviour = sa.Column(
|
|
sa.Enum(BlockBehaviourEnum),
|
|
nullable=False,
|
|
server_default=BlockBehaviourEnum.return_2xx.name,
|
|
)
|
|
|
|
# to keep existing behavior, the server default is TRUE whereas for new user, the default value is FALSE
|
|
include_header_email_header = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="1"
|
|
)
|
|
|
|
@property
|
|
def directory_quota(self):
|
|
return min(
|
|
self._directory_quota,
|
|
MAX_NB_DIRECTORY - Directory.filter_by(user_id=self.id).count(),
|
|
)
|
|
|
|
@property
|
|
def subdomain_quota(self):
|
|
return min(
|
|
self._subdomain_quota,
|
|
MAX_NB_SUBDOMAIN
|
|
- CustomDomain.filter_by(user_id=self.id, is_sl_subdomain=True).count(),
|
|
)
|
|
|
|
@staticmethod
|
|
def subdomain_is_available():
|
|
return SLDomain.filter_by(can_use_subdomain=True).count() > 0
|
|
|
|
# implement flask-login "alternative token"
|
|
def get_id(self):
|
|
if self.alternative_id:
|
|
return self.alternative_id
|
|
else:
|
|
return str(self.id)
|
|
|
|
@classmethod
|
|
def create(cls, email, name="", password=None, **kwargs):
|
|
user: User = super(User, cls).create(email=email, name=name, **kwargs)
|
|
|
|
if password:
|
|
user.set_password(password)
|
|
|
|
Session.flush()
|
|
|
|
mb = Mailbox.create(user_id=user.id, email=user.email, verified=True)
|
|
Session.flush()
|
|
user.default_mailbox_id = mb.id
|
|
|
|
# create a first alias mail to show user how to use when they login
|
|
alias = Alias.create_new(
|
|
user,
|
|
prefix="simplelogin-newsletter",
|
|
mailbox_id=mb.id,
|
|
note="This is your first alias. It's used to receive SimpleLogin communications "
|
|
"like new features announcements, newsletters.",
|
|
)
|
|
Session.flush()
|
|
|
|
user.newsletter_alias_id = alias.id
|
|
Session.flush()
|
|
|
|
# generate an alternative_id if needed
|
|
if "alternative_id" not in kwargs:
|
|
user.alternative_id = str(uuid.uuid4())
|
|
|
|
if DISABLE_ONBOARDING:
|
|
LOG.d("Disable onboarding emails")
|
|
return user
|
|
|
|
# Schedule onboarding emails
|
|
Job.create(
|
|
name=JOB_ONBOARDING_1,
|
|
payload={"user_id": user.id},
|
|
run_at=arrow.now().shift(days=1),
|
|
)
|
|
Job.create(
|
|
name=JOB_ONBOARDING_2,
|
|
payload={"user_id": user.id},
|
|
run_at=arrow.now().shift(days=2),
|
|
)
|
|
Job.create(
|
|
name=JOB_ONBOARDING_4,
|
|
payload={"user_id": user.id},
|
|
run_at=arrow.now().shift(days=3),
|
|
)
|
|
Session.flush()
|
|
|
|
return user
|
|
|
|
# region Billing
|
|
def lifetime_or_active_subscription(self) -> bool:
|
|
"""True if user has lifetime licence or active subscription"""
|
|
if self.lifetime:
|
|
return True
|
|
|
|
sub: Subscription = self.get_subscription()
|
|
if sub:
|
|
return True
|
|
|
|
apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id)
|
|
if apple_sub and apple_sub.is_valid():
|
|
return True
|
|
|
|
manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id)
|
|
if manual_sub and manual_sub.is_active():
|
|
return True
|
|
|
|
coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by(
|
|
user_id=self.id
|
|
)
|
|
if coinbase_subscription and coinbase_subscription.is_active():
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_paid(self) -> bool:
|
|
"""same as _lifetime_or_active_subscription but not include free manual subscription"""
|
|
sub: Subscription = self.get_subscription()
|
|
if sub:
|
|
return True
|
|
|
|
apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id)
|
|
if apple_sub and apple_sub.is_valid():
|
|
return True
|
|
|
|
manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id)
|
|
if manual_sub and not manual_sub.is_giveaway and manual_sub.is_active():
|
|
return True
|
|
|
|
coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by(
|
|
user_id=self.id
|
|
)
|
|
if coinbase_subscription and coinbase_subscription.is_active():
|
|
return True
|
|
|
|
return False
|
|
|
|
def in_trial(self):
|
|
"""return True if user does not have lifetime licence or an active subscription AND is in trial period"""
|
|
if self.lifetime_or_active_subscription():
|
|
return False
|
|
|
|
if self.trial_end and arrow.now() < self.trial_end:
|
|
return True
|
|
|
|
return False
|
|
|
|
def should_show_upgrade_button(self):
|
|
if self.lifetime_or_active_subscription():
|
|
# user who has canceled can also re-subscribe
|
|
sub: Subscription = self.get_subscription()
|
|
if sub and sub.cancelled:
|
|
return True
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
def is_premium(self) -> bool:
|
|
"""
|
|
user is premium if they:
|
|
- have a lifetime deal or
|
|
- in trial period or
|
|
- active subscription
|
|
"""
|
|
if self.lifetime_or_active_subscription():
|
|
return True
|
|
|
|
if self.trial_end and arrow.now() < self.trial_end:
|
|
return True
|
|
|
|
return False
|
|
|
|
@property
|
|
def upgrade_channel(self) -> str:
|
|
"""Used on admin dashboard"""
|
|
# user can have multiple subscription channel
|
|
channels = []
|
|
if self.lifetime:
|
|
channels.append("Lifetime")
|
|
|
|
sub: Subscription = self.get_subscription()
|
|
if sub:
|
|
if sub.cancelled:
|
|
channels.append(
|
|
f"Cancelled Paddle Subscription {sub.subscription_id} {sub.plan_name()}"
|
|
)
|
|
else:
|
|
channels.append(
|
|
f"Active Paddle Subscription {sub.subscription_id} {sub.plan_name()}"
|
|
)
|
|
|
|
apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id)
|
|
if apple_sub and apple_sub.is_valid():
|
|
channels.append(f"Apple Subscription {apple_sub.expires_date.humanize()}")
|
|
|
|
manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id)
|
|
if manual_sub and manual_sub.is_active():
|
|
mode = "Giveaway" if manual_sub.is_giveaway else "Paid"
|
|
channels.append(
|
|
f"Manual Subscription {manual_sub.comment} {mode} {manual_sub.end_at.humanize()}"
|
|
)
|
|
|
|
coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by(
|
|
user_id=self.id
|
|
)
|
|
if coinbase_subscription and coinbase_subscription.is_active():
|
|
channels.append(
|
|
f"Coinbase Subscription ends {coinbase_subscription.end_at.humanize()}"
|
|
)
|
|
|
|
return ".\n".join(channels)
|
|
|
|
# endregion
|
|
|
|
def can_create_new_alias(self) -> bool:
|
|
"""
|
|
Whether user can create a new alias. User can't create a new alias if
|
|
- has more than 15 aliases in the free plan, *even in the free trial*
|
|
"""
|
|
if self.lifetime_or_active_subscription():
|
|
return True
|
|
else:
|
|
return Alias.filter_by(user_id=self.id).count() < MAX_NB_EMAIL_FREE_PLAN
|
|
|
|
def profile_picture_url(self):
|
|
if self.profile_picture_id:
|
|
return self.profile_picture.get_url()
|
|
else:
|
|
return url_for("static", filename="default-avatar.png")
|
|
|
|
def suggested_emails(self, website_name) -> (str, [str]):
|
|
"""return suggested email and other email choices"""
|
|
website_name = convert_to_id(website_name)
|
|
|
|
all_aliases = [
|
|
ge.email for ge in Alias.filter_by(user_id=self.id, enabled=True)
|
|
]
|
|
if self.can_create_new_alias():
|
|
suggested_alias = Alias.create_new(self, prefix=website_name).email
|
|
else:
|
|
# pick an email from the list of gen emails
|
|
suggested_alias = random.choice(all_aliases)
|
|
|
|
return (
|
|
suggested_alias,
|
|
list(set(all_aliases).difference({suggested_alias})),
|
|
)
|
|
|
|
def suggested_names(self) -> (str, [str]):
|
|
"""return suggested name and other name choices"""
|
|
other_name = convert_to_id(self.name)
|
|
|
|
return self.name, [other_name, "Anonymous", "whoami"]
|
|
|
|
def get_name_initial(self) -> str:
|
|
if not self.name:
|
|
return ""
|
|
names = self.name.split(" ")
|
|
return "".join([n[0].upper() for n in names if n])
|
|
|
|
def get_subscription(self) -> Optional["Subscription"]:
|
|
"""return *active* Paddle subscription
|
|
Return None if the subscription is already expired
|
|
TODO: support user unsubscribe and re-subscribe
|
|
"""
|
|
sub = Subscription.get_by(user_id=self.id)
|
|
|
|
if sub:
|
|
# sub is active until the next billing_date + 1
|
|
if sub.next_bill_date >= arrow.now().shift(days=-1).date():
|
|
return sub
|
|
# past subscription, user is considered not having a subscription = free plan
|
|
else:
|
|
return None
|
|
else:
|
|
return sub
|
|
|
|
def verified_custom_domains(self) -> List["CustomDomain"]:
|
|
return CustomDomain.filter_by(user_id=self.id, ownership_verified=True).all()
|
|
|
|
def mailboxes(self) -> List["Mailbox"]:
|
|
"""list of mailbox that user own"""
|
|
mailboxes = []
|
|
|
|
for mailbox in Mailbox.filter_by(user_id=self.id, verified=True):
|
|
mailboxes.append(mailbox)
|
|
|
|
return mailboxes
|
|
|
|
def nb_directory(self):
|
|
return Directory.filter_by(user_id=self.id).count()
|
|
|
|
def has_custom_domain(self):
|
|
return CustomDomain.filter_by(user_id=self.id, verified=True).count() > 0
|
|
|
|
def custom_domains(self):
|
|
return CustomDomain.filter_by(user_id=self.id, verified=True).all()
|
|
|
|
def available_domains_for_random_alias(self) -> List[Tuple[bool, str]]:
|
|
"""Return available domains for user to create random aliases
|
|
Each result record contains:
|
|
- whether the domain belongs to SimpleLogin
|
|
- the domain
|
|
"""
|
|
res = []
|
|
for domain in self.available_sl_domains():
|
|
res.append((True, domain))
|
|
|
|
for custom_domain in self.verified_custom_domains():
|
|
res.append((False, custom_domain.domain))
|
|
|
|
return res
|
|
|
|
def default_random_alias_domain(self) -> str:
|
|
"""return the domain used for the random alias"""
|
|
if self.default_alias_custom_domain_id:
|
|
custom_domain = CustomDomain.get(self.default_alias_custom_domain_id)
|
|
# sanity check
|
|
if (
|
|
not custom_domain
|
|
or not custom_domain.verified
|
|
or custom_domain.user_id != self.id
|
|
):
|
|
LOG.w("Problem with %s default random alias domain", self)
|
|
return FIRST_ALIAS_DOMAIN
|
|
|
|
return custom_domain.domain
|
|
|
|
if self.default_alias_public_domain_id:
|
|
sl_domain = SLDomain.get(self.default_alias_public_domain_id)
|
|
# sanity check
|
|
if not sl_domain:
|
|
LOG.e("Problem with %s public random alias domain", self)
|
|
return FIRST_ALIAS_DOMAIN
|
|
|
|
if sl_domain.premium_only and not self.is_premium():
|
|
LOG.w(
|
|
"%s is not premium and cannot use %s. Reset default random alias domain setting",
|
|
self,
|
|
sl_domain,
|
|
)
|
|
self.default_alias_custom_domain_id = None
|
|
self.default_alias_public_domain_id = None
|
|
Session.commit()
|
|
return FIRST_ALIAS_DOMAIN
|
|
|
|
return sl_domain.domain
|
|
|
|
return FIRST_ALIAS_DOMAIN
|
|
|
|
def fido_enabled(self) -> bool:
|
|
if self.fido_uuid is not None:
|
|
return True
|
|
return False
|
|
|
|
def two_factor_authentication_enabled(self) -> bool:
|
|
return self.enable_otp or self.fido_enabled()
|
|
|
|
def get_communication_email(self) -> (Optional[str], str, bool):
|
|
"""
|
|
Return
|
|
- the email that user uses to receive email communication. None if user unsubscribes from newsletter
|
|
- the unsubscribe URL
|
|
- whether the unsubscribe method is via sending email (mailto:) or Http POST
|
|
"""
|
|
if self.notification and self.activated and not self.disabled:
|
|
if self.newsletter_alias_id:
|
|
alias = Alias.get(self.newsletter_alias_id)
|
|
if alias.enabled:
|
|
unsubscribe_link, via_email = alias.unsubscribe_link()
|
|
return alias.email, unsubscribe_link, via_email
|
|
# alias disabled -> user doesn't want to receive newsletter
|
|
else:
|
|
return None, None, False
|
|
else:
|
|
# do not handle http POST unsubscribe
|
|
if UNSUBSCRIBER:
|
|
# use * as suffix instead of = as for alias unsubscribe
|
|
return self.email, f"mailto:{UNSUBSCRIBER}?subject={self.id}*", True
|
|
|
|
return None, None, False
|
|
|
|
def available_sl_domains(self) -> [str]:
|
|
"""
|
|
Return all SimpleLogin domains that user can use when creating a new alias, including:
|
|
- SimpleLogin public domains, available for all users (ALIAS_DOMAIN)
|
|
- SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN)
|
|
"""
|
|
return [sl_domain.domain for sl_domain in self.get_sl_domains()]
|
|
|
|
def get_sl_domains(self) -> List["SLDomain"]:
|
|
if self.is_premium():
|
|
return SLDomain.all()
|
|
else:
|
|
return SLDomain.filter_by(premium_only=False).all()
|
|
|
|
def available_alias_domains(self) -> [str]:
|
|
"""return all domains that user can use when creating a new alias, including:
|
|
- SimpleLogin public domains, available for all users (ALIAS_DOMAIN)
|
|
- SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN)
|
|
- Verified custom domains
|
|
|
|
"""
|
|
domains = self.available_sl_domains()
|
|
|
|
for custom_domain in self.verified_custom_domains():
|
|
domains.append(custom_domain.domain)
|
|
|
|
# can have duplicate where a "root" user has a domain that's also listed in SL domains
|
|
return list(set(domains))
|
|
|
|
def should_show_app_page(self) -> bool:
|
|
"""whether to show the app page"""
|
|
return (
|
|
# when user has used the "Sign in with SL" button before
|
|
ClientUser.filter(ClientUser.user_id == self.id).count()
|
|
# or when user has created an app
|
|
+ Client.filter(Client.user_id == self.id).count()
|
|
> 0
|
|
)
|
|
|
|
def get_random_alias_suffix(self):
|
|
"""Get random suffix for an alias based on user's preference.
|
|
|
|
|
|
Returns:
|
|
str: the random suffix generated
|
|
"""
|
|
if self.random_alias_suffix == AliasSuffixEnum.random_string.value:
|
|
return random_string(ALIAS_RANDOM_SUFFIX_LENGTH, include_digits=True)
|
|
return random_word()
|
|
|
|
def __repr__(self):
|
|
return f"<User {self.id} {self.name} {self.email}>"
|
|
|
|
|
|
def _expiration_1h():
|
|
return arrow.now().shift(hours=1)
|
|
|
|
|
|
def _expiration_12h():
|
|
return arrow.now().shift(hours=12)
|
|
|
|
|
|
def _expiration_5m():
|
|
return arrow.now().shift(minutes=5)
|
|
|
|
|
|
def _expiration_7d():
|
|
return arrow.now().shift(days=7)
|
|
|
|
|
|
class ActivationCode(Base, ModelMixin):
|
|
"""For activate user account"""
|
|
|
|
__tablename__ = "activation_code"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
code = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
expired = sa.Column(ArrowType, nullable=False, default=_expiration_1h)
|
|
|
|
def is_expired(self):
|
|
return self.expired < arrow.now()
|
|
|
|
|
|
class ResetPasswordCode(Base, ModelMixin):
|
|
"""For resetting password"""
|
|
|
|
__tablename__ = "reset_password_code"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
code = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
expired = sa.Column(ArrowType, nullable=False, default=_expiration_1h)
|
|
|
|
def is_expired(self):
|
|
return self.expired < arrow.now()
|
|
|
|
|
|
class SocialAuth(Base, ModelMixin):
|
|
"""Store how user authenticates with social login"""
|
|
|
|
__tablename__ = "social_auth"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
|
|
# name of the social login used, could be facebook, google or github
|
|
social = sa.Column(sa.String(128), nullable=False)
|
|
|
|
__table_args__ = (sa.UniqueConstraint("user_id", "social", name="uq_social_auth"),)
|
|
|
|
|
|
# <<< OAUTH models >>>
|
|
|
|
|
|
def generate_oauth_client_id(client_name) -> str:
|
|
oauth_client_id = convert_to_id(client_name) + "-" + random_string()
|
|
|
|
# check that the client does not exist yet
|
|
if not Client.get_by(oauth_client_id=oauth_client_id):
|
|
LOG.d("generate oauth_client_id %s", oauth_client_id)
|
|
return oauth_client_id
|
|
|
|
# Rerun the function
|
|
LOG.w("client_id %s already exists, generate a new client_id", oauth_client_id)
|
|
return generate_oauth_client_id(client_name)
|
|
|
|
|
|
class MfaBrowser(Base, ModelMixin):
|
|
__tablename__ = "mfa_browser"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
token = sa.Column(sa.String(64), default=False, unique=True, nullable=False)
|
|
expires = sa.Column(ArrowType, default=False, nullable=False)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
@classmethod
|
|
def create_new(cls, user, token_length=64) -> "MfaBrowser":
|
|
found = False
|
|
while not found:
|
|
token = random_string(token_length)
|
|
|
|
if not cls.get_by(token=token):
|
|
found = True
|
|
|
|
return MfaBrowser.create(
|
|
user_id=user.id,
|
|
token=token,
|
|
expires=arrow.now().shift(days=30),
|
|
)
|
|
|
|
@classmethod
|
|
def delete(cls, token):
|
|
cls.filter(cls.token == token).delete()
|
|
Session.commit()
|
|
|
|
@classmethod
|
|
def delete_expired(cls):
|
|
cls.filter(cls.expires < arrow.now()).delete()
|
|
Session.commit()
|
|
|
|
def is_expired(self):
|
|
return self.expires < arrow.now()
|
|
|
|
def reset_expire(self):
|
|
self.expires = arrow.now().shift(days=30)
|
|
|
|
|
|
class Client(Base, ModelMixin):
|
|
__tablename__ = "client"
|
|
oauth_client_id = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
oauth_client_secret = sa.Column(sa.String(128), nullable=False)
|
|
|
|
name = sa.Column(sa.String(128), nullable=False)
|
|
home_url = sa.Column(sa.String(1024))
|
|
|
|
# user who created this client
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
icon_id = sa.Column(sa.ForeignKey(File.id), nullable=True)
|
|
|
|
# an app needs to be approved by SimpleLogin team
|
|
approved = sa.Column(sa.Boolean, nullable=False, default=False, server_default="0")
|
|
description = sa.Column(sa.Text, nullable=True)
|
|
|
|
# a referral can be attached to a client
|
|
# so all users who sign up via the authorize screen are counted towards this referral
|
|
referral_id = sa.Column(
|
|
sa.ForeignKey("referral.id", ondelete="SET NULL"), nullable=True
|
|
)
|
|
|
|
icon = orm.relationship(File)
|
|
user = orm.relationship(User)
|
|
referral = orm.relationship("Referral")
|
|
|
|
def nb_user(self):
|
|
return ClientUser.filter_by(client_id=self.id).count()
|
|
|
|
def get_scopes(self) -> [Scope]:
|
|
# todo: client can choose which scopes they want to have access
|
|
return [Scope.NAME, Scope.EMAIL, Scope.AVATAR_URL]
|
|
|
|
@classmethod
|
|
def create_new(cls, name, user_id) -> "Client":
|
|
# generate a client-id
|
|
oauth_client_id = generate_oauth_client_id(name)
|
|
oauth_client_secret = random_string(40)
|
|
client = Client.create(
|
|
name=name,
|
|
oauth_client_id=oauth_client_id,
|
|
oauth_client_secret=oauth_client_secret,
|
|
user_id=user_id,
|
|
)
|
|
|
|
return client
|
|
|
|
def get_icon_url(self):
|
|
if self.icon_id:
|
|
return self.icon.get_url()
|
|
else:
|
|
return URL + "/static/default-icon.svg"
|
|
|
|
def last_user_login(self) -> "ClientUser":
|
|
client_user = (
|
|
ClientUser.filter(ClientUser.client_id == self.id)
|
|
.order_by(ClientUser.updated_at)
|
|
.first()
|
|
)
|
|
if client_user:
|
|
return client_user
|
|
return None
|
|
|
|
|
|
class RedirectUri(Base, ModelMixin):
|
|
"""Valid redirect uris for a client"""
|
|
|
|
__tablename__ = "redirect_uri"
|
|
|
|
client_id = sa.Column(sa.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
|
|
uri = sa.Column(sa.String(1024), nullable=False)
|
|
|
|
client = orm.relationship(Client, backref="redirect_uris")
|
|
|
|
|
|
class AuthorizationCode(Base, ModelMixin):
|
|
__tablename__ = "authorization_code"
|
|
|
|
code = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
client_id = sa.Column(sa.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
|
|
scope = sa.Column(sa.String(128))
|
|
redirect_uri = sa.Column(sa.String(1024))
|
|
|
|
# what is the input response_type, e.g. "code", "code,id_token", ...
|
|
response_type = sa.Column(sa.String(128))
|
|
|
|
nonce = sa.Column(sa.Text, nullable=True, default=None, server_default=text("NULL"))
|
|
|
|
user = orm.relationship(User, lazy=False)
|
|
client = orm.relationship(Client, lazy=False)
|
|
|
|
expired = sa.Column(ArrowType, nullable=False, default=_expiration_5m)
|
|
|
|
def is_expired(self):
|
|
return self.expired < arrow.now()
|
|
|
|
|
|
class OauthToken(Base, ModelMixin):
|
|
__tablename__ = "oauth_token"
|
|
|
|
access_token = sa.Column(sa.String(128), unique=True)
|
|
client_id = sa.Column(sa.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
|
|
scope = sa.Column(sa.String(128))
|
|
redirect_uri = sa.Column(sa.String(1024))
|
|
|
|
# what is the input response_type, e.g. "token", "token,id_token", ...
|
|
response_type = sa.Column(sa.String(128))
|
|
|
|
user = orm.relationship(User)
|
|
client = orm.relationship(Client)
|
|
|
|
expired = sa.Column(ArrowType, nullable=False, default=_expiration_1h)
|
|
|
|
def is_expired(self):
|
|
return self.expired < arrow.now()
|
|
|
|
|
|
def generate_email(
|
|
scheme: int = AliasGeneratorEnum.word.value,
|
|
in_hex: bool = False,
|
|
alias_domain=FIRST_ALIAS_DOMAIN,
|
|
) -> str:
|
|
"""generate an email address that does not exist before
|
|
:param alias_domain: the domain used to generate the alias.
|
|
:param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated
|
|
:type in_hex: bool, if the generate scheme is uuid, is hex favorable?
|
|
"""
|
|
if scheme == AliasGeneratorEnum.uuid.value:
|
|
name = uuid.uuid4().hex if in_hex else uuid.uuid4().__str__()
|
|
random_email = name + "@" + alias_domain
|
|
else:
|
|
random_email = random_words() + "@" + alias_domain
|
|
|
|
random_email = random_email.lower().strip()
|
|
|
|
# check that the client does not exist yet
|
|
if not Alias.get_by(email=random_email) and not DeletedAlias.get_by(
|
|
email=random_email
|
|
):
|
|
LOG.d("generate email %s", random_email)
|
|
return random_email
|
|
|
|
# Rerun the function
|
|
LOG.w("email %s already exists, generate a new email", random_email)
|
|
return generate_email(scheme=scheme, in_hex=in_hex)
|
|
|
|
|
|
class Alias(Base, ModelMixin):
|
|
__tablename__ = "alias"
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
email = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
# the name to use when user replies/sends from alias
|
|
name = sa.Column(sa.String(128), nullable=True, default=None)
|
|
|
|
enabled = sa.Column(sa.Boolean(), default=True, nullable=False)
|
|
|
|
custom_domain_id = sa.Column(
|
|
sa.ForeignKey("custom_domain.id", ondelete="cascade"), nullable=True, index=True
|
|
)
|
|
|
|
custom_domain = orm.relationship("CustomDomain", foreign_keys=[custom_domain_id])
|
|
|
|
# To know whether an alias is created "on the fly", i.e. via the custom domain catch-all feature
|
|
automatic_creation = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# to know whether an alias belongs to a directory
|
|
directory_id = sa.Column(
|
|
sa.ForeignKey("directory.id", ondelete="cascade"), nullable=True, index=True
|
|
)
|
|
|
|
note = sa.Column(sa.Text, default=None, nullable=True)
|
|
|
|
# an alias can be owned by another mailbox
|
|
mailbox_id = sa.Column(
|
|
sa.ForeignKey("mailbox.id", ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
|
|
# prefix _ to avoid this object being used accidentally.
|
|
# To have the list of all mailboxes, should use AliasInfo instead
|
|
_mailboxes = orm.relationship("Mailbox", secondary="alias_mailbox", lazy="joined")
|
|
|
|
# If the mailbox has PGP-enabled, user can choose disable the PGP on the alias
|
|
# this is useful when some senders already support PGP
|
|
disable_pgp = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# a way to bypass the bounce automatic disable mechanism
|
|
cannot_be_disabled = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# when a mailbox wants to send an email on behalf of the alias via the reverse-alias
|
|
# several checks are performed to avoid email spoofing
|
|
# this option allow disabling these checks
|
|
disable_email_spoofing_check = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# to know whether an alias is added using a batch import
|
|
batch_import_id = sa.Column(
|
|
sa.ForeignKey("batch_import.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
default=None,
|
|
)
|
|
|
|
# set in case of alias transfer.
|
|
original_owner_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="SET NULL"), nullable=True
|
|
)
|
|
|
|
# alias is pinned on top
|
|
pinned = sa.Column(sa.Boolean, nullable=False, default=False, server_default="0")
|
|
|
|
# used to transfer an alias to another user
|
|
transfer_token = sa.Column(sa.String(64), default=None, unique=True, nullable=True)
|
|
|
|
# have I been pwned
|
|
hibp_last_check = sa.Column(ArrowType, default=None)
|
|
hibp_breaches = orm.relationship("Hibp", secondary="alias_hibp")
|
|
|
|
# to use Postgres full text search. Only applied on "note" column for now
|
|
# this is a generated Postgres column
|
|
ts_vector = sa.Column(
|
|
TSVector(), sa.Computed("to_tsvector('english', note)", persisted=True)
|
|
)
|
|
|
|
__table_args__ = (
|
|
Index("ix_video___ts_vector__", ts_vector, postgresql_using="gin"),
|
|
# index on note column using pg_trgm
|
|
Index(
|
|
"note_pg_trgm_index",
|
|
"note",
|
|
postgresql_ops={"note": "gin_trgm_ops"},
|
|
postgresql_using="gin",
|
|
),
|
|
)
|
|
|
|
user = orm.relationship(User, foreign_keys=[user_id])
|
|
mailbox = orm.relationship("Mailbox", lazy="joined")
|
|
|
|
@property
|
|
def mailboxes(self):
|
|
ret = [self.mailbox]
|
|
for m in self._mailboxes:
|
|
ret.append(m)
|
|
|
|
ret = [mb for mb in ret if mb.verified]
|
|
ret = sorted(ret, key=lambda mb: mb.email)
|
|
|
|
return ret
|
|
|
|
def authorized_addresses(self) -> [str]:
|
|
"""return addresses that can send on behalf of this alias, i.e. can send emails to this alias's reverse-aliases
|
|
Including its mailboxes and their authorized addresses
|
|
"""
|
|
mailboxes = self.mailboxes
|
|
ret = [mb.email for mb in mailboxes]
|
|
for mailbox in mailboxes:
|
|
for aa in mailbox.authorized_addresses:
|
|
ret.append(aa.email)
|
|
|
|
return ret
|
|
|
|
def mailbox_support_pgp(self) -> bool:
|
|
"""return True of one of the mailboxes support PGP"""
|
|
for mb in self.mailboxes:
|
|
if mb.pgp_enabled():
|
|
return True
|
|
return False
|
|
|
|
def pgp_enabled(self) -> bool:
|
|
if self.mailbox_support_pgp() and not self.disable_pgp:
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_custom_domain(alias_address) -> Optional["CustomDomain"]:
|
|
alias_domain = validate_email(
|
|
alias_address, check_deliverability=False, allow_smtputf8=False
|
|
).domain
|
|
|
|
# handle the case a SLDomain is also a CustomDomain
|
|
if SLDomain.get_by(domain=alias_domain) is None:
|
|
custom_domain = CustomDomain.get_by(domain=alias_domain)
|
|
if custom_domain:
|
|
return custom_domain
|
|
|
|
@classmethod
|
|
def create(cls, **kw):
|
|
commit = kw.pop("commit", False)
|
|
flush = kw.pop("flush", False)
|
|
|
|
new_alias = cls(**kw)
|
|
|
|
email = kw["email"]
|
|
# make sure email is lowercase and doesn't have any whitespace
|
|
email = sanitize_email(email)
|
|
|
|
# make sure alias is not in global trash, i.e. DeletedAlias table
|
|
if DeletedAlias.get_by(email=email):
|
|
raise AliasInTrashError
|
|
|
|
if DomainDeletedAlias.get_by(email=email):
|
|
raise AliasInTrashError
|
|
|
|
# detect whether alias should belong to a custom domain
|
|
if "custom_domain_id" not in kw:
|
|
custom_domain = Alias.get_custom_domain(email)
|
|
if custom_domain:
|
|
new_alias.custom_domain_id = custom_domain.id
|
|
|
|
Session.add(new_alias)
|
|
|
|
if commit:
|
|
Session.commit()
|
|
|
|
if flush:
|
|
Session.flush()
|
|
|
|
return new_alias
|
|
|
|
@classmethod
|
|
def create_new(cls, user, prefix, note=None, mailbox_id=None):
|
|
prefix = prefix.lower().strip().replace(" ", "")
|
|
|
|
if not prefix:
|
|
raise Exception("alias prefix cannot be empty")
|
|
|
|
# find the right suffix - avoid infinite loop by running this at max 1000 times
|
|
for _ in range(1000):
|
|
suffix = user.get_random_alias_suffix()
|
|
email = f"{prefix}.{suffix}@{FIRST_ALIAS_DOMAIN}"
|
|
|
|
if not cls.get_by(email=email) and not DeletedAlias.get_by(email=email):
|
|
break
|
|
|
|
return Alias.create(
|
|
user_id=user.id,
|
|
email=email,
|
|
note=note,
|
|
mailbox_id=mailbox_id or user.default_mailbox_id,
|
|
)
|
|
|
|
@classmethod
|
|
def delete(cls, obj_id):
|
|
raise Exception("should use delete_alias(alias,user) instead")
|
|
|
|
@classmethod
|
|
def create_new_random(
|
|
cls,
|
|
user,
|
|
scheme: int = AliasGeneratorEnum.word.value,
|
|
in_hex: bool = False,
|
|
note: str = None,
|
|
):
|
|
"""create a new random alias"""
|
|
custom_domain = None
|
|
|
|
random_email = None
|
|
|
|
if user.default_alias_custom_domain_id:
|
|
custom_domain = CustomDomain.get(user.default_alias_custom_domain_id)
|
|
random_email = generate_email(
|
|
scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain
|
|
)
|
|
elif user.default_alias_public_domain_id:
|
|
sl_domain: SLDomain = SLDomain.get(user.default_alias_public_domain_id)
|
|
if sl_domain.premium_only and not user.is_premium():
|
|
LOG.w("%s not premium, cannot use %s", user, sl_domain)
|
|
else:
|
|
random_email = generate_email(
|
|
scheme=scheme, in_hex=in_hex, alias_domain=sl_domain.domain
|
|
)
|
|
|
|
if not random_email:
|
|
random_email = generate_email(scheme=scheme, in_hex=in_hex)
|
|
|
|
alias = Alias.create(
|
|
user_id=user.id,
|
|
email=random_email,
|
|
mailbox_id=user.default_mailbox_id,
|
|
note=note,
|
|
)
|
|
|
|
if custom_domain:
|
|
alias.custom_domain_id = custom_domain.id
|
|
|
|
return alias
|
|
|
|
def mailbox_email(self):
|
|
if self.mailbox_id:
|
|
return self.mailbox.email
|
|
else:
|
|
return self.user.email
|
|
|
|
def unsubscribe_link(self, contact: Optional["Contact"] = None) -> (str, bool):
|
|
"""
|
|
return the unsubscribe link along with whether this is via email (mailto:) or Http POST
|
|
The mailto: method is preferred
|
|
"""
|
|
if contact:
|
|
if UNSUBSCRIBER:
|
|
return f"mailto:{UNSUBSCRIBER}?subject={contact.id}_", True
|
|
else:
|
|
return f"{URL}/dashboard/block_contact/{contact.id}", False
|
|
else:
|
|
if UNSUBSCRIBER:
|
|
return f"mailto:{UNSUBSCRIBER}?subject={self.id}=", True
|
|
else:
|
|
return f"{URL}/dashboard/unsubscribe/{self.id}", False
|
|
|
|
def __repr__(self):
|
|
return f"<Alias {self.id} {self.email}>"
|
|
|
|
|
|
class ClientUser(Base, ModelMixin):
|
|
__tablename__ = "client_user"
|
|
__table_args__ = (
|
|
sa.UniqueConstraint("user_id", "client_id", name="uq_client_user"),
|
|
)
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
client_id = sa.Column(sa.ForeignKey(Client.id, ondelete="cascade"), nullable=False)
|
|
|
|
# Null means client has access to user original email
|
|
alias_id = sa.Column(sa.ForeignKey(Alias.id, ondelete="cascade"), nullable=True)
|
|
|
|
# user can decide to send to client another name
|
|
name = sa.Column(
|
|
sa.String(128), nullable=True, default=None, server_default=text("NULL")
|
|
)
|
|
|
|
# user can decide to send to client a default avatar
|
|
default_avatar = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
alias = orm.relationship(Alias, backref="client_users")
|
|
|
|
user = orm.relationship(User)
|
|
client = orm.relationship(Client)
|
|
|
|
def get_email(self):
|
|
return self.alias.email if self.alias_id else self.user.email
|
|
|
|
def get_user_name(self):
|
|
if self.name:
|
|
return self.name
|
|
else:
|
|
return self.user.name
|
|
|
|
def get_user_info(self) -> dict:
|
|
"""return user info according to client scope
|
|
Return dict with key being scope name. For now all the fields are the same for all clients:
|
|
|
|
{
|
|
"client": "Demo",
|
|
"email": "test-avk5l@mail-tester.com",
|
|
"email_verified": true,
|
|
"id": 1,
|
|
"name": "Son GM",
|
|
"avatar_url": "http://s3..."
|
|
}
|
|
|
|
"""
|
|
res = {
|
|
"id": self.id,
|
|
"client": self.client.name,
|
|
"email_verified": True,
|
|
"sub": str(self.id),
|
|
}
|
|
|
|
for scope in self.client.get_scopes():
|
|
if scope == Scope.NAME:
|
|
if self.name:
|
|
res[Scope.NAME.value] = self.name or ""
|
|
else:
|
|
res[Scope.NAME.value] = self.user.name or ""
|
|
elif scope == Scope.AVATAR_URL:
|
|
if self.user.profile_picture_id:
|
|
if self.default_avatar:
|
|
res[Scope.AVATAR_URL.value] = URL + "/static/default-avatar.png"
|
|
else:
|
|
res[Scope.AVATAR_URL.value] = self.user.profile_picture.get_url(
|
|
AVATAR_URL_EXPIRATION
|
|
)
|
|
else:
|
|
res[Scope.AVATAR_URL.value] = None
|
|
elif scope == Scope.EMAIL:
|
|
# Use generated email
|
|
if self.alias_id:
|
|
LOG.d(
|
|
"Use gen email for user %s, client %s", self.user, self.client
|
|
)
|
|
res[Scope.EMAIL.value] = self.alias.email
|
|
# Use user original email
|
|
else:
|
|
res[Scope.EMAIL.value] = self.user.email
|
|
|
|
return res
|
|
|
|
|
|
class Contact(Base, ModelMixin):
|
|
"""
|
|
Store configuration of sender (website-email) and alias.
|
|
"""
|
|
|
|
__tablename__ = "contact"
|
|
|
|
__table_args__ = (
|
|
sa.UniqueConstraint("alias_id", "website_email", name="uq_contact"),
|
|
)
|
|
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
alias_id = sa.Column(
|
|
sa.ForeignKey(Alias.id, ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
|
|
name = sa.Column(
|
|
sa.String(512), nullable=True, default=None, server_default=text("NULL")
|
|
)
|
|
|
|
website_email = sa.Column(sa.String(512), nullable=False)
|
|
|
|
# the email from header, e.g. AB CD <ab@cd.com>
|
|
# nullable as this field is added after website_email
|
|
website_from = sa.Column(sa.String(1024), nullable=True)
|
|
|
|
# when user clicks on "reply", they will reply to this address.
|
|
# This address allows to hide user personal email
|
|
# this reply email is created every time a website sends an email to user
|
|
# it used to have the prefix "reply+" or "ra+"
|
|
reply_email = sa.Column(sa.String(512), nullable=False, index=True)
|
|
|
|
# whether a contact is created via CC
|
|
is_cc = sa.Column(sa.Boolean, nullable=False, default=False, server_default="0")
|
|
|
|
pgp_public_key = sa.Column(sa.Text, nullable=True)
|
|
pgp_finger_print = sa.Column(sa.String(512), nullable=True)
|
|
|
|
alias = orm.relationship(Alias, backref="contacts")
|
|
user = orm.relationship(User)
|
|
|
|
# the latest reply sent to this contact
|
|
latest_reply: Optional[Arrow] = None
|
|
|
|
# to investigate why the website_email is sometimes not correctly parsed
|
|
# the envelope mail_from
|
|
mail_from = sa.Column(sa.Text, nullable=True, default=None)
|
|
|
|
# a contact can have an empty email address, in this case it can't receive emails
|
|
invalid_email = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# emails sent from this contact will be blocked
|
|
block_forward = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# whether contact is created automatically during the forward phase
|
|
automatic_created = sa.Column(sa.Boolean, nullable=True, default=False)
|
|
|
|
@property
|
|
def email(self):
|
|
return self.website_email
|
|
|
|
@classmethod
|
|
def create(cls, **kw):
|
|
commit = kw.pop("commit", False)
|
|
flush = kw.pop("flush", False)
|
|
|
|
new_contact = cls(**kw)
|
|
|
|
website_email = kw["website_email"]
|
|
# make sure email is lowercase and doesn't have any whitespace
|
|
website_email = sanitize_email(website_email)
|
|
|
|
# make sure contact.website_email isn't a reverse alias
|
|
orig_contact = Contact.get_by(reply_email=website_email)
|
|
if orig_contact:
|
|
raise CannotCreateContactForReverseAlias(str(orig_contact))
|
|
|
|
Session.add(new_contact)
|
|
|
|
if commit:
|
|
Session.commit()
|
|
|
|
if flush:
|
|
Session.flush()
|
|
|
|
return new_contact
|
|
|
|
def website_send_to(self):
|
|
"""return the email address with name.
|
|
to use when user wants to send an email from the alias
|
|
Return
|
|
"First Last | email at example.com" <reverse-alias@SL>
|
|
"""
|
|
|
|
# Prefer using contact name if possible
|
|
user = self.user
|
|
name = self.name
|
|
email = self.website_email
|
|
|
|
if (
|
|
not user
|
|
or not SenderFormatEnum.has_value(user.sender_format)
|
|
or user.sender_format == SenderFormatEnum.AT.value
|
|
):
|
|
email = email.replace("@", " at ")
|
|
elif user.sender_format == SenderFormatEnum.A.value:
|
|
email = email.replace("@", "(a)")
|
|
|
|
# if no name, try to parse it from website_from
|
|
if not name and self.website_from:
|
|
try:
|
|
name = address.parse(self.website_from).display_name
|
|
except Exception:
|
|
# Skip if website_from is wrongly formatted
|
|
LOG.e(
|
|
"Cannot parse contact %s website_from %s", self, self.website_from
|
|
)
|
|
name = ""
|
|
|
|
# remove all double quote
|
|
if name:
|
|
name = name.replace('"', "")
|
|
|
|
if name:
|
|
name = name + " | " + email
|
|
else:
|
|
name = email
|
|
|
|
# cannot use formataddr here as this field is for email client, not for MTA
|
|
return f'"{name}" <{self.reply_email}>'
|
|
|
|
def new_addr(self):
|
|
"""
|
|
Replace original email by reply_email. Possible formats:
|
|
- First Last - first at example.com <reply_email> OR
|
|
- First Last - first(a)example.com <reply_email> OR
|
|
- First Last <reply_email>
|
|
- first at example.com <reply_email>
|
|
- reply_email
|
|
And return new address with RFC 2047 format
|
|
"""
|
|
user = self.user
|
|
sender_format = user.sender_format if user else SenderFormatEnum.AT.value
|
|
|
|
if sender_format == SenderFormatEnum.NO_NAME.value:
|
|
return self.reply_email
|
|
|
|
if sender_format == SenderFormatEnum.NAME_ONLY.value:
|
|
new_name = self.name
|
|
elif sender_format == SenderFormatEnum.AT_ONLY.value:
|
|
new_name = self.website_email.replace("@", " at ").strip()
|
|
elif sender_format == SenderFormatEnum.AT.value:
|
|
formatted_email = self.website_email.replace("@", " at ").strip()
|
|
new_name = (
|
|
(self.name + " - " + formatted_email)
|
|
if self.name and self.name != self.website_email.strip()
|
|
else formatted_email
|
|
)
|
|
else: # SenderFormatEnum.A.value
|
|
formatted_email = self.website_email.replace("@", "(a)").strip()
|
|
new_name = (
|
|
(self.name + " - " + formatted_email)
|
|
if self.name and self.name != self.website_email.strip()
|
|
else formatted_email
|
|
)
|
|
|
|
new_addr = formataddr((new_name, self.reply_email)).strip()
|
|
return new_addr.strip()
|
|
|
|
def last_reply(self) -> "EmailLog":
|
|
"""return the most recent reply"""
|
|
return (
|
|
EmailLog.filter_by(contact_id=self.id, is_reply=True)
|
|
.order_by(desc(EmailLog.created_at))
|
|
.first()
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<Contact {self.id} {self.website_email} {self.alias_id}>"
|
|
|
|
|
|
class EmailLog(Base, ModelMixin):
|
|
__tablename__ = "email_log"
|
|
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
contact_id = sa.Column(
|
|
sa.ForeignKey(Contact.id, ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
alias_id = sa.Column(
|
|
sa.ForeignKey(Alias.id, ondelete="cascade"), nullable=True, index=True
|
|
)
|
|
|
|
# whether this is a reply
|
|
is_reply = sa.Column(sa.Boolean, nullable=False, default=False)
|
|
|
|
# for ex if alias is disabled, this forwarding is blocked
|
|
blocked = sa.Column(sa.Boolean, nullable=False, default=False)
|
|
|
|
# can happen when user mailbox refuses the forwarded email
|
|
# usually because the forwarded email is too spammy
|
|
bounced = sa.Column(sa.Boolean, nullable=False, default=False, server_default="0")
|
|
|
|
# happen when an email with auto (holiday) reply
|
|
auto_replied = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# SpamAssassin result
|
|
is_spam = sa.Column(sa.Boolean, nullable=False, default=False, server_default="0")
|
|
spam_score = sa.Column(sa.Float, nullable=True)
|
|
spam_status = sa.Column(sa.Text, nullable=True, default=None)
|
|
# do not load this column
|
|
spam_report = deferred(sa.Column(sa.JSON, nullable=True))
|
|
|
|
# Point to the email that has been refused
|
|
refused_email_id = sa.Column(
|
|
sa.ForeignKey("refused_email.id", ondelete="SET NULL"), nullable=True
|
|
)
|
|
|
|
# in forward phase, this is the mailbox that will receive the email
|
|
# in reply phase, this is the mailbox (or a mailbox's authorized address) that sends the email
|
|
mailbox_id = sa.Column(
|
|
sa.ForeignKey("mailbox.id", ondelete="cascade"), nullable=True
|
|
)
|
|
|
|
# in case of bounce, record on what mailbox the email has been bounced
|
|
# useful when an alias has several mailboxes
|
|
bounced_mailbox_id = sa.Column(
|
|
sa.ForeignKey("mailbox.id", ondelete="cascade"), nullable=True
|
|
)
|
|
|
|
# the Message ID
|
|
message_id = deferred(sa.Column(sa.String(1024), nullable=True))
|
|
# in the reply phase, the original message_id is replaced by the SL message_id
|
|
sl_message_id = deferred(sa.Column(sa.String(512), nullable=True))
|
|
|
|
refused_email = orm.relationship("RefusedEmail")
|
|
forward = orm.relationship(Contact)
|
|
|
|
contact = orm.relationship(Contact, backref="email_logs")
|
|
alias = orm.relationship(Alias)
|
|
mailbox = orm.relationship("Mailbox", lazy="joined", foreign_keys=[mailbox_id])
|
|
user = orm.relationship(User)
|
|
|
|
def bounced_mailbox(self) -> str:
|
|
if self.bounced_mailbox_id:
|
|
return Mailbox.get(self.bounced_mailbox_id).email
|
|
# retro-compatibility
|
|
return self.contact.alias.mailboxes[0].email
|
|
|
|
def get_action(self) -> str:
|
|
"""return the action name: forward|reply|block|bounced"""
|
|
if self.is_reply:
|
|
return "reply"
|
|
elif self.bounced:
|
|
return "bounced"
|
|
elif self.blocked:
|
|
return "block"
|
|
else:
|
|
return "forward"
|
|
|
|
def get_phase(self) -> str:
|
|
if self.is_reply:
|
|
return "reply"
|
|
else:
|
|
return "forward"
|
|
|
|
def get_dashboard_url(self):
|
|
return f"{URL}/dashboard/refused_email?highlight_id={self.id}"
|
|
|
|
def __repr__(self):
|
|
return f"<EmailLog {self.id}>"
|
|
|
|
|
|
class Subscription(Base, ModelMixin):
|
|
"""Paddle subscription"""
|
|
|
|
__tablename__ = "subscription"
|
|
|
|
# Come from Paddle
|
|
cancel_url = sa.Column(sa.String(1024), nullable=False)
|
|
update_url = sa.Column(sa.String(1024), nullable=False)
|
|
subscription_id = sa.Column(sa.String(1024), nullable=False, unique=True)
|
|
event_time = sa.Column(ArrowType, nullable=False)
|
|
next_bill_date = sa.Column(sa.Date, nullable=False)
|
|
|
|
cancelled = sa.Column(sa.Boolean, nullable=False, default=False)
|
|
|
|
plan = sa.Column(sa.Enum(PlanEnum), nullable=False)
|
|
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
|
|
)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
def plan_name(self):
|
|
if self.plan == PlanEnum.monthly:
|
|
return "Monthly"
|
|
else:
|
|
return "Yearly"
|
|
|
|
def __repr__(self):
|
|
return f"<Subscription {self.plan} {self.next_bill_date}>"
|
|
|
|
|
|
class ManualSubscription(Base, ModelMixin):
|
|
"""
|
|
For users who use other forms of payment and therefore not pass by Paddle
|
|
"""
|
|
|
|
__tablename__ = "manual_subscription"
|
|
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
|
|
)
|
|
|
|
# an reminder is sent several days before the subscription ends
|
|
end_at = sa.Column(ArrowType, nullable=False)
|
|
|
|
# for storing note about this subscription
|
|
comment = sa.Column(sa.Text, nullable=True)
|
|
|
|
# manual subscription are also used for Premium giveaways
|
|
is_giveaway = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
def is_active(self):
|
|
return self.end_at > arrow.now()
|
|
|
|
|
|
class CoinbaseSubscription(Base, ModelMixin):
|
|
"""
|
|
For subscriptions using Coinbase Commerce
|
|
"""
|
|
|
|
__tablename__ = "coinbase_subscription"
|
|
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
|
|
)
|
|
|
|
# an reminder is sent several days before the subscription ends
|
|
end_at = sa.Column(ArrowType, nullable=False)
|
|
|
|
# the Coinbase code
|
|
code = sa.Column(sa.String(64), nullable=True)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
def is_active(self):
|
|
return self.end_at > arrow.now()
|
|
|
|
|
|
# https://help.apple.com/app-store-connect/#/dev58bda3212
|
|
_APPLE_GRACE_PERIOD_DAYS = 16
|
|
|
|
|
|
class AppleSubscription(Base, ModelMixin):
|
|
"""
|
|
For users who have subscribed via Apple in-app payment
|
|
"""
|
|
|
|
__tablename__ = "apple_subscription"
|
|
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
|
|
)
|
|
|
|
expires_date = sa.Column(ArrowType, nullable=False)
|
|
|
|
# to avoid using "Restore Purchase" on another account
|
|
original_transaction_id = sa.Column(sa.String(256), nullable=False, unique=True)
|
|
receipt_data = sa.Column(sa.Text(), nullable=False)
|
|
|
|
plan = sa.Column(sa.Enum(PlanEnum), nullable=False)
|
|
|
|
# to know what subscription user has bought
|
|
# e.g. io.simplelogin.ios_app.subscription.premium.monthly
|
|
product_id = sa.Column(sa.String(256), nullable=True)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
def is_valid(self):
|
|
# Todo: take into account grace period?
|
|
return self.expires_date > arrow.now().shift(days=-_APPLE_GRACE_PERIOD_DAYS)
|
|
|
|
|
|
class DeletedAlias(Base, ModelMixin):
|
|
"""Store all deleted alias to make sure they are NOT reused"""
|
|
|
|
__tablename__ = "deleted_alias"
|
|
|
|
email = sa.Column(sa.String(256), unique=True, nullable=False)
|
|
|
|
@classmethod
|
|
def create(cls, **kw):
|
|
raise Exception("should use delete_alias(alias,user) instead")
|
|
|
|
def __repr__(self):
|
|
return f"<Deleted Alias {self.email}>"
|
|
|
|
|
|
class EmailChange(Base, ModelMixin):
|
|
"""Used when user wants to update their email"""
|
|
|
|
__tablename__ = "email_change"
|
|
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"),
|
|
nullable=False,
|
|
unique=True,
|
|
index=True,
|
|
)
|
|
new_email = sa.Column(sa.String(256), unique=True, nullable=False)
|
|
code = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
expired = sa.Column(ArrowType, nullable=False, default=_expiration_12h)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
def is_expired(self):
|
|
return self.expired < arrow.now()
|
|
|
|
def __repr__(self):
|
|
return f"<EmailChange {self.id} {self.new_email} {self.user_id}>"
|
|
|
|
|
|
class AliasUsedOn(Base, ModelMixin):
|
|
"""Used to know where an alias is created"""
|
|
|
|
__tablename__ = "alias_used_on"
|
|
|
|
__table_args__ = (
|
|
sa.UniqueConstraint("alias_id", "hostname", name="uq_alias_used"),
|
|
)
|
|
|
|
alias_id = sa.Column(sa.ForeignKey(Alias.id, ondelete="cascade"), nullable=False)
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
|
|
alias = orm.relationship(Alias)
|
|
|
|
hostname = sa.Column(sa.String(1024), nullable=False)
|
|
|
|
|
|
class ApiKey(Base, ModelMixin):
|
|
"""used in browser extension to identify user"""
|
|
|
|
__tablename__ = "api_key"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
code = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
name = sa.Column(sa.String(128), nullable=True)
|
|
last_used = sa.Column(ArrowType, default=None)
|
|
times = sa.Column(sa.Integer, default=0, nullable=False)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
@classmethod
|
|
def create(cls, user_id, name=None, **kwargs):
|
|
code = random_string(60)
|
|
if cls.get_by(code=code):
|
|
code = str(uuid.uuid4())
|
|
|
|
return super().create(user_id=user_id, name=name, code=code, **kwargs)
|
|
|
|
@classmethod
|
|
def delete_all(cls, user_id):
|
|
Session.query(cls).filter(cls.user_id == user_id).delete()
|
|
|
|
|
|
class CustomDomain(Base, ModelMixin):
|
|
__tablename__ = "custom_domain"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
domain = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
# default name to use when user replies/sends from alias
|
|
name = sa.Column(sa.String(128), nullable=True, default=None)
|
|
|
|
# mx verified
|
|
verified = sa.Column(sa.Boolean, nullable=False, default=False)
|
|
dkim_verified = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
spf_verified = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
dmarc_verified = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
_mailboxes = orm.relationship("Mailbox", secondary="domain_mailbox", lazy="joined")
|
|
|
|
# an alias is created automatically the first time it receives an email
|
|
catch_all = sa.Column(sa.Boolean, nullable=False, default=False, server_default="0")
|
|
|
|
# option to generate random prefix version automatically
|
|
random_prefix_generation = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# incremented when a check is failed on the domain
|
|
# alert when the number exceeds a threshold
|
|
# used in check_custom_domain()
|
|
nb_failed_checks = sa.Column(
|
|
sa.Integer, default=0, server_default="0", nullable=False
|
|
)
|
|
|
|
# only domain has the ownership verified can go the next DNS step
|
|
# MX verified domains before this change don't have to do the TXT check
|
|
# and therefore have ownership_verified=True
|
|
ownership_verified = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# randomly generated TXT value for verifying domain ownership
|
|
# the TXT record should be sl-verification=txt_token
|
|
ownership_txt_token = sa.Column(sa.String(128), nullable=True)
|
|
|
|
# if the domain is SimpleLogin subdomain, no need for the ownership, SPF, DKIM, DMARC check
|
|
is_sl_subdomain = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
__table_args__ = (
|
|
Index(
|
|
"ix_unique_domain", # Index name
|
|
"domain", # Columns which are part of the index
|
|
unique=True,
|
|
postgresql_where=Column("ownership_verified"),
|
|
), # The condition
|
|
)
|
|
|
|
user = orm.relationship(User, foreign_keys=[user_id], backref="custom_domains")
|
|
|
|
@property
|
|
def mailboxes(self):
|
|
if self._mailboxes:
|
|
return self._mailboxes
|
|
else:
|
|
return [self.user.default_mailbox]
|
|
|
|
def nb_alias(self):
|
|
return Alias.filter_by(custom_domain_id=self.id).count()
|
|
|
|
def get_trash_url(self):
|
|
return URL + f"/dashboard/domains/{self.id}/trash"
|
|
|
|
def get_ownership_dns_txt_value(self):
|
|
return f"sl-verification={self.ownership_txt_token}"
|
|
|
|
@classmethod
|
|
def create(cls, **kwargs):
|
|
domain = kwargs.get("domain")
|
|
if DeletedSubdomain.get_by(domain=domain):
|
|
raise SubdomainInTrashError
|
|
|
|
domain: CustomDomain = super(CustomDomain, cls).create(**kwargs)
|
|
|
|
# generate a domain ownership txt token
|
|
if not domain.ownership_txt_token:
|
|
domain.ownership_txt_token = random_string(30)
|
|
Session.commit()
|
|
|
|
if domain.is_sl_subdomain:
|
|
user = domain.user
|
|
user._subdomain_quota -= 1
|
|
Session.flush()
|
|
|
|
return domain
|
|
|
|
@classmethod
|
|
def delete(cls, obj_id):
|
|
obj: CustomDomain = cls.get(obj_id)
|
|
if obj.is_sl_subdomain:
|
|
DeletedSubdomain.create(domain=obj.domain)
|
|
|
|
return super(CustomDomain, cls).delete(obj_id)
|
|
|
|
@property
|
|
def auto_create_rules(self):
|
|
return sorted(self._auto_create_rules, key=lambda rule: rule.order)
|
|
|
|
def __repr__(self):
|
|
return f"<Custom Domain {self.domain}>"
|
|
|
|
|
|
class AutoCreateRule(Base, ModelMixin):
|
|
"""Alias auto creation rule for custom domain"""
|
|
|
|
__tablename__ = "auto_create_rule"
|
|
|
|
__table_args__ = (
|
|
sa.UniqueConstraint(
|
|
"custom_domain_id", "order", name="uq_auto_create_rule_order"
|
|
),
|
|
)
|
|
|
|
custom_domain_id = sa.Column(
|
|
sa.ForeignKey(CustomDomain.id, ondelete="cascade"), nullable=False
|
|
)
|
|
# an alias is auto created if it matches the regex
|
|
regex = sa.Column(sa.String(512), nullable=False)
|
|
|
|
# the order in which rules are evaluated in case there are multiple rules
|
|
order = sa.Column(sa.Integer, default=0, nullable=False)
|
|
|
|
custom_domain = orm.relationship(CustomDomain, backref="_auto_create_rules")
|
|
|
|
mailboxes = orm.relationship(
|
|
"Mailbox", secondary="auto_create_rule__mailbox", lazy="joined"
|
|
)
|
|
|
|
|
|
class AutoCreateRuleMailbox(Base, ModelMixin):
|
|
"""store auto create rule - mailbox association"""
|
|
|
|
__tablename__ = "auto_create_rule__mailbox"
|
|
__table_args__ = (
|
|
sa.UniqueConstraint(
|
|
"auto_create_rule_id", "mailbox_id", name="uq_auto_create_rule_mailbox"
|
|
),
|
|
)
|
|
|
|
auto_create_rule_id = sa.Column(
|
|
sa.ForeignKey(AutoCreateRule.id, ondelete="cascade"), nullable=False
|
|
)
|
|
mailbox_id = sa.Column(
|
|
sa.ForeignKey("mailbox.id", ondelete="cascade"), nullable=False
|
|
)
|
|
|
|
|
|
class DomainDeletedAlias(Base, ModelMixin):
|
|
"""Store all deleted alias for a domain"""
|
|
|
|
__tablename__ = "domain_deleted_alias"
|
|
|
|
__table_args__ = (
|
|
sa.UniqueConstraint("domain_id", "email", name="uq_domain_trash"),
|
|
)
|
|
|
|
email = sa.Column(sa.String(256), nullable=False)
|
|
domain_id = sa.Column(
|
|
sa.ForeignKey("custom_domain.id", ondelete="cascade"), nullable=False
|
|
)
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
|
|
domain = orm.relationship(CustomDomain)
|
|
|
|
@classmethod
|
|
def create(cls, **kw):
|
|
raise Exception("should use delete_alias(alias,user) instead")
|
|
|
|
def __repr__(self):
|
|
return f"<DomainDeletedAlias {self.id} {self.email}>"
|
|
|
|
|
|
class LifetimeCoupon(Base, ModelMixin):
|
|
__tablename__ = "lifetime_coupon"
|
|
|
|
code = sa.Column(sa.String(128), nullable=False, unique=True)
|
|
nb_used = sa.Column(sa.Integer, nullable=False)
|
|
paid = sa.Column(sa.Boolean, default=False, server_default="0", nullable=False)
|
|
comment = sa.Column(sa.Text, nullable=True)
|
|
|
|
|
|
class Coupon(Base, ModelMixin):
|
|
__tablename__ = "coupon"
|
|
|
|
code = sa.Column(sa.String(128), nullable=False, unique=True)
|
|
|
|
# by default a coupon is for 1 year
|
|
nb_year = sa.Column(sa.Integer, nullable=False, server_default="1", default=1)
|
|
|
|
# whether the coupon has been used
|
|
used = sa.Column(sa.Boolean, default=False, server_default="0", nullable=False)
|
|
|
|
# the user who uses the code
|
|
# non-null when the coupon is used
|
|
used_by_user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=True
|
|
)
|
|
|
|
is_giveaway = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
comment = sa.Column(sa.Text, nullable=True)
|
|
|
|
# a coupon can have an expiration
|
|
expires_date = sa.Column(ArrowType, nullable=True)
|
|
|
|
|
|
class Directory(Base, ModelMixin):
|
|
__tablename__ = "directory"
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
name = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
# when a directory is disabled, new alias can't be created on the fly
|
|
disabled = sa.Column(sa.Boolean, default=False, nullable=False, server_default="0")
|
|
|
|
user = orm.relationship(User, backref="directories")
|
|
|
|
_mailboxes = orm.relationship(
|
|
"Mailbox", secondary="directory_mailbox", lazy="joined"
|
|
)
|
|
|
|
@property
|
|
def mailboxes(self):
|
|
if self._mailboxes:
|
|
return self._mailboxes
|
|
else:
|
|
return [self.user.default_mailbox]
|
|
|
|
def nb_alias(self):
|
|
return Alias.filter_by(directory_id=self.id).count()
|
|
|
|
@classmethod
|
|
def create(cls, *args, **kwargs):
|
|
name = kwargs.get("name")
|
|
if DeletedDirectory.get_by(name=name):
|
|
raise DirectoryInTrashError
|
|
|
|
directory = super(Directory, cls).create(*args, **kwargs)
|
|
Session.flush()
|
|
|
|
user = directory.user
|
|
user._directory_quota -= 1
|
|
|
|
Session.flush()
|
|
return directory
|
|
|
|
@classmethod
|
|
def delete(cls, obj_id):
|
|
obj: Directory = cls.get(obj_id)
|
|
user = obj.user
|
|
# Put all aliases belonging to this directory to global or domain trash
|
|
for alias in Alias.filter_by(directory_id=obj_id):
|
|
from app import alias_utils
|
|
|
|
alias_utils.delete_alias(alias, user)
|
|
|
|
DeletedDirectory.create(name=obj.name)
|
|
cls.filter(cls.id == obj_id).delete()
|
|
|
|
Session.commit()
|
|
|
|
def __repr__(self):
|
|
return f"<Directory {self.name}>"
|
|
|
|
|
|
class Job(Base, ModelMixin):
|
|
"""Used to schedule one-time job in the future"""
|
|
|
|
__tablename__ = "job"
|
|
|
|
name = sa.Column(sa.String(128), nullable=False)
|
|
payload = sa.Column(sa.JSON)
|
|
|
|
# whether the job has been taken by the job runner
|
|
taken = sa.Column(sa.Boolean, default=False, nullable=False)
|
|
run_at = sa.Column(ArrowType)
|
|
|
|
def __repr__(self):
|
|
return f"<Job {self.id} {self.name} {self.payload}>"
|
|
|
|
|
|
class Mailbox(Base, ModelMixin):
|
|
__tablename__ = "mailbox"
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
email = sa.Column(sa.String(256), nullable=False, index=True)
|
|
verified = sa.Column(sa.Boolean, default=False, nullable=False)
|
|
force_spf = sa.Column(sa.Boolean, default=True, server_default="1", nullable=False)
|
|
|
|
# used when user wants to update mailbox email
|
|
new_email = sa.Column(sa.String(256), unique=True)
|
|
|
|
pgp_public_key = sa.Column(sa.Text, nullable=True)
|
|
pgp_finger_print = sa.Column(sa.String(512), nullable=True)
|
|
disable_pgp = sa.Column(
|
|
sa.Boolean, default=False, nullable=False, server_default="0"
|
|
)
|
|
|
|
# incremented when a check is failed on the mailbox
|
|
# alert when the number exceeds a threshold
|
|
# used in sanity_check()
|
|
nb_failed_checks = sa.Column(
|
|
sa.Integer, default=0, server_default="0", nullable=False
|
|
)
|
|
|
|
# a mailbox can be disabled if it can't be reached
|
|
disabled = sa.Column(sa.Boolean, default=False, nullable=False, server_default="0")
|
|
|
|
generic_subject = sa.Column(sa.String(78), nullable=True)
|
|
|
|
__table_args__ = (sa.UniqueConstraint("user_id", "email", name="uq_mailbox_user"),)
|
|
|
|
user = orm.relationship(User, foreign_keys=[user_id])
|
|
|
|
def pgp_enabled(self) -> bool:
|
|
if self.pgp_finger_print and not self.disable_pgp:
|
|
return True
|
|
|
|
return False
|
|
|
|
def nb_alias(self):
|
|
return (
|
|
AliasMailbox.filter_by(mailbox_id=self.id).count()
|
|
+ Alias.filter_by(mailbox_id=self.id).count()
|
|
)
|
|
|
|
@classmethod
|
|
def delete(cls, obj_id):
|
|
mailbox: Mailbox = cls.get(obj_id)
|
|
user = mailbox.user
|
|
|
|
# Put all aliases belonging to this mailbox to global or domain trash
|
|
for alias in Alias.filter_by(mailbox_id=obj_id):
|
|
# special handling for alias that has several mailboxes and has mailbox_id=obj_id
|
|
if len(alias.mailboxes) > 1:
|
|
# use the first mailbox found in alias._mailboxes
|
|
first_mb = alias._mailboxes[0]
|
|
alias.mailbox_id = first_mb.id
|
|
alias._mailboxes.remove(first_mb)
|
|
else:
|
|
from app import alias_utils
|
|
|
|
# only put aliases that have mailbox as a single mailbox into trash
|
|
alias_utils.delete_alias(alias, user)
|
|
Session.commit()
|
|
|
|
cls.filter(cls.id == obj_id).delete()
|
|
Session.commit()
|
|
|
|
@property
|
|
def aliases(self) -> [Alias]:
|
|
ret = Alias.filter_by(mailbox_id=self.id).all()
|
|
|
|
for am in AliasMailbox.filter_by(mailbox_id=self.id):
|
|
ret.append(am.alias)
|
|
|
|
return ret
|
|
|
|
def __repr__(self):
|
|
return f"<Mailbox {self.id} {self.email}>"
|
|
|
|
|
|
class AccountActivation(Base, ModelMixin):
|
|
"""contains code to activate the user account when they sign up on mobile"""
|
|
|
|
__tablename__ = "account_activation"
|
|
|
|
user_id = sa.Column(
|
|
sa.ForeignKey(User.id, ondelete="cascade"), nullable=False, unique=True
|
|
)
|
|
# the activation code is usually 6 digits
|
|
code = sa.Column(sa.String(10), nullable=False)
|
|
|
|
# nb tries decrements each time user enters wrong code
|
|
tries = sa.Column(sa.Integer, default=3, nullable=False)
|
|
|
|
__table_args__ = (
|
|
CheckConstraint(tries >= 0, name="account_activation_tries_positive"),
|
|
{},
|
|
)
|
|
|
|
|
|
class RefusedEmail(Base, ModelMixin):
|
|
"""Store emails that have been refused, i.e. bounced or classified as spams"""
|
|
|
|
__tablename__ = "refused_email"
|
|
|
|
# Store the full report, including logs from Sending & Receiving MTA
|
|
full_report_path = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
# The original email, to display to user
|
|
path = sa.Column(sa.String(128), unique=True, nullable=True)
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
|
|
# the email content will be deleted at this date
|
|
delete_at = sa.Column(ArrowType, nullable=False, default=_expiration_7d)
|
|
|
|
# toggle this when email content (stored at full_report_path & path are deleted)
|
|
deleted = sa.Column(sa.Boolean, nullable=False, default=False, server_default="0")
|
|
|
|
def get_url(self, expires_in=3600):
|
|
if self.path:
|
|
return s3.get_url(self.path, expires_in)
|
|
else:
|
|
return s3.get_url(self.full_report_path, expires_in)
|
|
|
|
def __repr__(self):
|
|
return f"<Refused Email {self.id} {self.path} {self.delete_at}>"
|
|
|
|
|
|
class Referral(Base, ModelMixin):
|
|
"""Referral code so user can invite others"""
|
|
|
|
__tablename__ = "referral"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
name = sa.Column(sa.String(512), nullable=True, default=None)
|
|
|
|
code = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
user = orm.relationship(User, foreign_keys=[user_id], backref="referrals")
|
|
|
|
@property
|
|
def nb_user(self) -> int:
|
|
return User.filter_by(referral_id=self.id, activated=True).count()
|
|
|
|
@property
|
|
def nb_paid_user(self) -> int:
|
|
res = 0
|
|
for user in User.filter_by(referral_id=self.id, activated=True):
|
|
if user.is_paid():
|
|
res += 1
|
|
|
|
return res
|
|
|
|
def link(self):
|
|
return f"{LANDING_PAGE_URL}?slref={self.code}"
|
|
|
|
def __repr__(self):
|
|
return f"<Referral {self.code}>"
|
|
|
|
|
|
class SentAlert(Base, ModelMixin):
|
|
"""keep track of alerts sent to user.
|
|
User can receive an alert when there's abnormal activity on their aliases such as
|
|
- reverse-alias not used by the owning mailbox
|
|
- SPF fails when using the reverse-alias
|
|
- bounced email
|
|
- ...
|
|
|
|
Different rate controls can then be implemented based on SentAlert:
|
|
- only once alert: an alert type should be sent only once
|
|
- max number of sent per 24H: an alert type should not be sent more than X times in 24h
|
|
"""
|
|
|
|
__tablename__ = "sent_alert"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
to_email = sa.Column(sa.String(256), nullable=False)
|
|
alert_type = sa.Column(sa.String(256), nullable=False)
|
|
|
|
|
|
class AliasMailbox(Base, ModelMixin):
|
|
__tablename__ = "alias_mailbox"
|
|
__table_args__ = (
|
|
sa.UniqueConstraint("alias_id", "mailbox_id", name="uq_alias_mailbox"),
|
|
)
|
|
|
|
alias_id = sa.Column(
|
|
sa.ForeignKey(Alias.id, ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
mailbox_id = sa.Column(
|
|
sa.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False, index=True
|
|
)
|
|
|
|
alias = orm.relationship(Alias)
|
|
|
|
|
|
class AliasHibp(Base, ModelMixin):
|
|
__tablename__ = "alias_hibp"
|
|
|
|
__table_args__ = (sa.UniqueConstraint("alias_id", "hibp_id", name="uq_alias_hibp"),)
|
|
|
|
alias_id = sa.Column(
|
|
sa.Integer(), sa.ForeignKey("alias.id", ondelete="cascade"), index=True
|
|
)
|
|
hibp_id = sa.Column(
|
|
sa.Integer(), sa.ForeignKey("hibp.id", ondelete="cascade"), index=True
|
|
)
|
|
|
|
alias = orm.relationship(
|
|
"Alias", backref=orm.backref("alias_hibp", cascade="all, delete-orphan")
|
|
)
|
|
hibp = orm.relationship(
|
|
"Hibp", backref=orm.backref("alias_hibp", cascade="all, delete-orphan")
|
|
)
|
|
|
|
|
|
class DirectoryMailbox(Base, ModelMixin):
|
|
__tablename__ = "directory_mailbox"
|
|
__table_args__ = (
|
|
sa.UniqueConstraint("directory_id", "mailbox_id", name="uq_directory_mailbox"),
|
|
)
|
|
|
|
directory_id = sa.Column(
|
|
sa.ForeignKey(Directory.id, ondelete="cascade"), nullable=False
|
|
)
|
|
mailbox_id = sa.Column(
|
|
sa.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False
|
|
)
|
|
|
|
|
|
class DomainMailbox(Base, ModelMixin):
|
|
"""store the owning mailboxes for a domain"""
|
|
|
|
__tablename__ = "domain_mailbox"
|
|
|
|
__table_args__ = (
|
|
sa.UniqueConstraint("domain_id", "mailbox_id", name="uq_domain_mailbox"),
|
|
)
|
|
|
|
domain_id = sa.Column(
|
|
sa.ForeignKey(CustomDomain.id, ondelete="cascade"), nullable=False
|
|
)
|
|
mailbox_id = sa.Column(
|
|
sa.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False
|
|
)
|
|
|
|
|
|
_NB_RECOVERY_CODE = 8
|
|
_RECOVERY_CODE_LENGTH = 8
|
|
|
|
|
|
class RecoveryCode(Base, ModelMixin):
|
|
"""allow user to login in case you lose any of your authenticators"""
|
|
|
|
__tablename__ = "recovery_code"
|
|
__table_args__ = (sa.UniqueConstraint("user_id", "code", name="uq_recovery_code"),)
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
code = sa.Column(sa.String(16), nullable=False)
|
|
used = sa.Column(sa.Boolean, nullable=False, default=False)
|
|
used_at = sa.Column(ArrowType, nullable=True, default=None)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
@classmethod
|
|
def generate(cls, user):
|
|
"""generate recovery codes for user"""
|
|
# delete all existing codes
|
|
cls.filter_by(user_id=user.id).delete()
|
|
Session.flush()
|
|
|
|
nb_code = 0
|
|
while nb_code < _NB_RECOVERY_CODE:
|
|
code = random_string(_RECOVERY_CODE_LENGTH)
|
|
if not cls.get_by(user_id=user.id, code=code):
|
|
cls.create(user_id=user.id, code=code)
|
|
nb_code += 1
|
|
|
|
LOG.d("Create recovery codes for %s", user)
|
|
Session.commit()
|
|
|
|
@classmethod
|
|
def empty(cls, user):
|
|
"""Delete all recovery codes for user"""
|
|
cls.filter_by(user_id=user.id).delete()
|
|
Session.commit()
|
|
|
|
|
|
class Notification(Base, ModelMixin):
|
|
__tablename__ = "notification"
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
message = sa.Column(sa.Text, nullable=False)
|
|
title = sa.Column(sa.String(512))
|
|
|
|
# whether user has marked the notification as read
|
|
read = sa.Column(sa.Boolean, nullable=False, default=False)
|
|
|
|
@staticmethod
|
|
def render(template_name, **kwargs) -> str:
|
|
templates_dir = os.path.join(ROOT_DIR, "templates")
|
|
env = Environment(loader=FileSystemLoader(templates_dir))
|
|
|
|
template = env.get_template(template_name)
|
|
|
|
return template.render(
|
|
URL=URL,
|
|
LANDING_PAGE_URL=LANDING_PAGE_URL,
|
|
YEAR=arrow.now().year,
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
class SLDomain(Base, ModelMixin):
|
|
"""SimpleLogin domains"""
|
|
|
|
__tablename__ = "public_domain"
|
|
|
|
domain = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
# only available for premium accounts
|
|
premium_only = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
# if True, the domain can be used for the subdomain feature
|
|
can_use_subdomain = sa.Column(
|
|
sa.Boolean, nullable=False, default=False, server_default="0"
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<SLDomain {self.domain} {'Premium' if self.premium_only else 'Free'}"
|
|
|
|
|
|
class Monitoring(Base, ModelMixin):
|
|
"""
|
|
Store different host information over the time in order to
|
|
- alert issues in (almost) real time
|
|
- analyze data trending
|
|
"""
|
|
|
|
__tablename__ = "monitoring"
|
|
|
|
host = sa.Column(sa.String(256), nullable=False)
|
|
|
|
# Postfix stats
|
|
incoming_queue = sa.Column(sa.Integer, nullable=False)
|
|
active_queue = sa.Column(sa.Integer, nullable=False)
|
|
deferred_queue = sa.Column(sa.Integer, nullable=False)
|
|
|
|
|
|
class BatchImport(Base, ModelMixin):
|
|
__tablename__ = "batch_import"
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
file_id = sa.Column(sa.ForeignKey(File.id, ondelete="cascade"), nullable=False)
|
|
processed = sa.Column(sa.Boolean, nullable=False, default=False)
|
|
summary = sa.Column(sa.Text, nullable=True, default=None)
|
|
|
|
file = orm.relationship(File)
|
|
user = orm.relationship(User)
|
|
|
|
def nb_alias(self):
|
|
return Alias.filter_by(batch_import_id=self.id).count()
|
|
|
|
def __repr__(self):
|
|
return f"<BatchImport {self.id}>"
|
|
|
|
|
|
class AuthorizedAddress(Base, ModelMixin):
|
|
"""Authorize other addresses to send emails from aliases that are owned by a mailbox"""
|
|
|
|
__tablename__ = "authorized_address"
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
mailbox_id = sa.Column(
|
|
sa.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False
|
|
)
|
|
email = sa.Column(sa.String(256), nullable=False)
|
|
|
|
__table_args__ = (
|
|
sa.UniqueConstraint("mailbox_id", "email", name="uq_authorize_address"),
|
|
)
|
|
|
|
mailbox = orm.relationship(Mailbox, backref="authorized_addresses")
|
|
|
|
def __repr__(self):
|
|
return f"<AuthorizedAddress {self.id} {self.email} {self.mailbox_id}>"
|
|
|
|
|
|
class Metric2(Base, ModelMixin):
|
|
"""
|
|
For storing different metrics like number of users, etc
|
|
Store each metric as a column as opposed to having different rows as in Metric
|
|
"""
|
|
|
|
__tablename__ = "metric2"
|
|
date = sa.Column(ArrowType, default=arrow.utcnow, nullable=False)
|
|
|
|
nb_user = sa.Column(sa.Float, nullable=True)
|
|
nb_activated_user = sa.Column(sa.Float, nullable=True)
|
|
|
|
nb_premium = sa.Column(sa.Float, nullable=True)
|
|
nb_apple_premium = sa.Column(sa.Float, nullable=True)
|
|
nb_cancelled_premium = sa.Column(sa.Float, nullable=True)
|
|
nb_manual_premium = sa.Column(sa.Float, nullable=True)
|
|
nb_coinbase_premium = sa.Column(sa.Float, nullable=True)
|
|
|
|
# nb users who have been referred
|
|
nb_referred_user = sa.Column(sa.Float, nullable=True)
|
|
nb_referred_user_paid = sa.Column(sa.Float, nullable=True)
|
|
|
|
nb_alias = sa.Column(sa.Float, nullable=True)
|
|
|
|
# Obsolete as only for the last 14 days
|
|
nb_forward = sa.Column(sa.Float, nullable=True)
|
|
nb_block = sa.Column(sa.Float, nullable=True)
|
|
nb_reply = sa.Column(sa.Float, nullable=True)
|
|
nb_bounced = sa.Column(sa.Float, nullable=True)
|
|
nb_spam = sa.Column(sa.Float, nullable=True)
|
|
|
|
# should be used instead
|
|
nb_forward_last_24h = sa.Column(sa.Float, nullable=True)
|
|
nb_block_last_24h = sa.Column(sa.Float, nullable=True)
|
|
nb_reply_last_24h = sa.Column(sa.Float, nullable=True)
|
|
nb_bounced_last_24h = sa.Column(sa.Float, nullable=True)
|
|
# includes bounces for both forwarding and transactional email
|
|
nb_total_bounced_last_24h = sa.Column(sa.Float, nullable=True)
|
|
|
|
nb_verified_custom_domain = sa.Column(sa.Float, nullable=True)
|
|
nb_subdomain = sa.Column(sa.Float, nullable=True)
|
|
nb_directory = sa.Column(sa.Float, nullable=True)
|
|
|
|
nb_deleted_directory = sa.Column(sa.Float, nullable=True)
|
|
nb_deleted_subdomain = sa.Column(sa.Float, nullable=True)
|
|
|
|
nb_app = sa.Column(sa.Float, nullable=True)
|
|
|
|
|
|
class Bounce(Base, ModelMixin):
|
|
"""Record all bounces. Deleted after 7 days"""
|
|
|
|
__tablename__ = "bounce"
|
|
email = sa.Column(sa.String(256), nullable=False, index=True)
|
|
info = sa.Column(sa.Text, nullable=True)
|
|
|
|
|
|
class TransactionalEmail(Base, ModelMixin):
|
|
"""Storing all email addresses that receive transactional emails, including account email and mailboxes.
|
|
Deleted after 7 days
|
|
"""
|
|
|
|
__tablename__ = "transactional_email"
|
|
email = sa.Column(sa.String(256), nullable=False, unique=False)
|
|
|
|
|
|
class Payout(Base, ModelMixin):
|
|
"""Referral payouts"""
|
|
|
|
__tablename__ = "payout"
|
|
user_id = sa.Column(sa.ForeignKey("users.id", ondelete="cascade"), nullable=False)
|
|
|
|
# in USD
|
|
amount = sa.Column(sa.Float, nullable=False)
|
|
|
|
# BTC, PayPal, etc
|
|
payment_method = sa.Column(sa.String(256), nullable=False)
|
|
|
|
# number of upgraded user included in this payout
|
|
number_upgraded_account = sa.Column(sa.Integer, nullable=False)
|
|
|
|
comment = sa.Column(sa.Text)
|
|
|
|
user = orm.relationship(User)
|
|
|
|
|
|
class IgnoredEmail(Base, ModelMixin):
|
|
"""If an email has mail_from and rcpt_to present in this table, discard it by returning 250 status."""
|
|
|
|
__tablename__ = "ignored_email"
|
|
|
|
mail_from = sa.Column(sa.String(512), nullable=False)
|
|
rcpt_to = sa.Column(sa.String(512), nullable=False)
|
|
|
|
|
|
class IgnoreBounceSender(Base, ModelMixin):
|
|
"""Ignore sender that doesn't correctly handle bounces, for example noreply@github.com"""
|
|
|
|
__tablename__ = "ignore_bounce_sender"
|
|
|
|
mail_from = sa.Column(sa.String(512), nullable=False, unique=True)
|
|
|
|
def __repr__(self):
|
|
return f"<NoReplySender {self.mail_from}"
|
|
|
|
|
|
class MessageIDMatching(Base, ModelMixin):
|
|
"""Store the SL Message ID and the original Message ID"""
|
|
|
|
__tablename__ = "message_id_matching"
|
|
|
|
# SimpleLogin Message ID
|
|
sl_message_id = sa.Column(sa.String(512), unique=True, nullable=False)
|
|
original_message_id = sa.Column(sa.String(1024), unique=True, nullable=False)
|
|
|
|
# to track what email_log that has created this matching
|
|
email_log_id = sa.Column(
|
|
sa.ForeignKey("email_log.id", ondelete="cascade"), nullable=True
|
|
)
|
|
|
|
email_log = orm.relationship("EmailLog")
|
|
|
|
|
|
class DeletedDirectory(Base, ModelMixin):
|
|
"""To avoid directory from being reused"""
|
|
|
|
__tablename__ = "deleted_directory"
|
|
|
|
name = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
|
|
class DeletedSubdomain(Base, ModelMixin):
|
|
"""To avoid directory from being reused"""
|
|
|
|
__tablename__ = "deleted_subdomain"
|
|
|
|
domain = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
|
|
class InvalidMailboxDomain(Base, ModelMixin):
|
|
"""Domains that can't be used as mailbox"""
|
|
|
|
__tablename__ = "invalid_mailbox_domain"
|
|
|
|
domain = sa.Column(sa.String(256), unique=True, nullable=False)
|
|
|
|
|
|
# region Phone
|
|
class PhoneCountry(Base, ModelMixin):
|
|
__tablename__ = "phone_country"
|
|
|
|
name = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
|
|
class PhoneNumber(Base, ModelMixin):
|
|
__tablename__ = "phone_number"
|
|
|
|
country_id = sa.Column(
|
|
sa.ForeignKey(PhoneCountry.id, ondelete="cascade"), nullable=False
|
|
)
|
|
|
|
# with country code, e.g. +33612345678
|
|
number = sa.Column(sa.String(128), unique=True, nullable=False)
|
|
|
|
active = sa.Column(sa.Boolean, nullable=False, default=True)
|
|
|
|
# do not load this column
|
|
comment = deferred(sa.Column(sa.Text, nullable=True))
|
|
|
|
country = orm.relationship(PhoneCountry)
|
|
|
|
|
|
class PhoneReservation(Base, ModelMixin):
|
|
__tablename__ = "phone_reservation"
|
|
|
|
number_id = sa.Column(
|
|
sa.ForeignKey(PhoneNumber.id, ondelete="cascade"), nullable=False
|
|
)
|
|
|
|
user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)
|
|
|
|
number = orm.relationship(PhoneNumber)
|
|
|
|
start = sa.Column(ArrowType, nullable=False)
|
|
end = sa.Column(ArrowType, nullable=False)
|
|
|
|
|
|
class PhoneMessage(Base, ModelMixin):
|
|
__tablename__ = "phone_message"
|
|
|
|
number_id = sa.Column(
|
|
sa.ForeignKey(PhoneNumber.id, ondelete="cascade"), nullable=False
|
|
)
|
|
|
|
from_number = sa.Column(sa.String(128), nullable=False)
|
|
body = sa.Column(sa.Text)
|
|
|
|
number = orm.relationship(PhoneNumber)
|
|
|
|
|
|
class AdminAuditLog(Base):
|
|
__tablename__ = "admin_audit_log"
|
|
|
|
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
|
|
created_at = sa.Column(ArrowType, default=arrow.utcnow, nullable=False)
|
|
admin_user_id = sa.Column(sa.ForeignKey("users.id"), nullable=False)
|
|
action = sa.Column(sa.Integer, nullable=False)
|
|
model = sa.Column(sa.Text, nullable=False)
|
|
model_id = sa.Column(sa.Integer, nullable=True)
|
|
data = sa.Column(sa.JSON, nullable=True)
|
|
|
|
admin = orm.relationship(User, foreign_keys=[admin_user_id])
|
|
|
|
@classmethod
|
|
def create(cls, **kw):
|
|
r = cls(**kw)
|
|
Session.add(r)
|
|
|
|
return r
|
|
|
|
@classmethod
|
|
def create_manual_upgrade(
|
|
cls, admin_user_id: int, upgrade_type: str, user_id: int, giveaway: bool
|
|
):
|
|
cls.create(
|
|
admin_user_id=admin_user_id,
|
|
action=AuditLogActionEnum.manual_upgrade.value,
|
|
model="User",
|
|
model_id=user_id,
|
|
data={
|
|
"upgrade_type": upgrade_type,
|
|
"giveaway": giveaway,
|
|
},
|
|
)
|
|
|
|
@classmethod
|
|
def extend_trial(
|
|
cls, admin_user_id: int, user_id: int, trial_end: arrow.Arrow, extend_time: str
|
|
):
|
|
cls.create(
|
|
admin_user_id=admin_user_id,
|
|
action=AuditLogActionEnum.extend_trial.value,
|
|
model="User",
|
|
model_id=user_id,
|
|
data={
|
|
"trial_end": trial_end.format(arrow.FORMAT_RFC3339),
|
|
"extend_time": extend_time,
|
|
},
|
|
)
|
|
|
|
@classmethod
|
|
def disable_otp_fido(
|
|
cls, admin_user_id: int, user_id: int, had_otp: bool, had_fido: bool
|
|
):
|
|
cls.create(
|
|
admin_user_id=admin_user_id,
|
|
action=AuditLogActionEnum.disable_2fa.value,
|
|
model="User",
|
|
model_id=user_id,
|
|
data={"had_otp": had_otp, "had_fido": had_fido},
|
|
)
|
|
|
|
@classmethod
|
|
def logged_as_user(cls, admin_user_id: int, user_id: int):
|
|
cls.create(
|
|
admin_user_id=admin_user_id,
|
|
action=AuditLogActionEnum.logged_as_user.value,
|
|
model="User",
|
|
model_id=user_id,
|
|
)
|
|
|
|
@classmethod
|
|
def extend_subscription(
|
|
cls,
|
|
admin_user_id: int,
|
|
user_id: int,
|
|
subscription_end: arrow.Arrow,
|
|
extend_time: str,
|
|
):
|
|
cls.create(
|
|
admin_user_id=admin_user_id,
|
|
action=AuditLogActionEnum.extend_subscription.value,
|
|
model="User",
|
|
model_id=user_id,
|
|
data={
|
|
"subscription_end": subscription_end.format(arrow.FORMAT_RFC3339),
|
|
"extend_time": extend_time,
|
|
},
|
|
)
|
|
|
|
|
|
# endregion
|