2019-07-02 09:20:12 +02:00
import enum
import random
2019-12-26 12:21:28 +01:00
import uuid
2020-04-25 11:30:09 +02:00
from email.utils import formataddr
from typing import List
2019-07-02 09:20:12 +02:00
import arrow
import bcrypt
from flask import url_for
from flask_login import UserMixin
2020-02-28 13:00:45 +01:00
from sqlalchemy import text, desc, CheckConstraint
from sqlalchemy.exc import IntegrityError
2019-07-02 09:20:12 +02:00
from sqlalchemy_utils import ArrowType
2019-07-02 09:20:12 +02:00
from app import s3
from app.config import (
2020-03-24 21:19:45 +01:00
2020-04-02 23:26:17 +02:00
2020-04-09 22:19:45 +02:00
2020-05-10 14:43:41 +02:00
from app.errors import AliasInTrashError
from app.extensions import db
2019-07-02 09:20:12 +02:00
from app.log import LOG
2019-07-03 12:13:28 +02:00
from app.oauth_models import Scope
from app.utils import convert_to_id, random_string, random_words, random_word
class ModelMixin(object):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
2019-07-02 09:20:12 +02:00
created_at = db.Column(ArrowType, default=arrow.utcnow, nullable=False)
updated_at = db.Column(ArrowType, default=None, onupdate=arrow.utcnow)
2019-07-02 09:20:12 +02:00
_repr_hide = ["created_at", "updated_at"]
def query(cls):
return db.session.query(cls)
def get(cls, id):
return cls.query.get(id)
def get_by(cls, **kw):
return cls.query.filter_by(**kw).first()
def filter_by(cls, **kw):
return cls.query.filter_by(**kw)
def get_or_create(cls, **kw):
r = cls.get_by(**kw)
if not r:
r = cls(**kw)
return r
def create(cls, **kw):
r = cls(**kw)
return r
def save(self):
def delete(cls, obj_id):
cls.query.filter( == obj_id).delete()
2019-07-02 09:20:12 +02:00
def __repr__(self):
values = ", ".join(
"%s=%r" % (n, getattr(self, n))
for n in self.__table__.c.keys()
if n not in self._repr_hide
return "%s(%s)" % (self.__class__.__name__, values)
2019-07-02 09:20:12 +02:00
class File(db.Model, ModelMixin):
path = db.Column(db.String(128), unique=True, nullable=False)
user_id = db.Column(db.ForeignKey("", ondelete="cascade"), nullable=True)
2019-07-02 09:20:12 +02:00
def get_url(self, expires_in=3600):
return s3.get_url(self.path, expires_in)
2019-07-02 09:20:12 +02:00
class PlanEnum(enum.Enum):
monthly = 2
2019-07-02 09:20:12 +02:00
yearly = 3
2019-12-26 12:21:28 +01:00
class AliasGeneratorEnum(enum.Enum):
word = 1 # aliases are generated based on random words
uuid = 2 # aliases are generated based on uuid
def has_value(cls, value: int) -> bool:
return value in set(item.value for item in cls)
class User(db.Model, ModelMixin, UserMixin):
2019-07-02 09:20:12 +02:00
__tablename__ = "users"
2020-01-26 17:22:16 +01:00
email = db.Column(db.String(256), unique=True, nullable=False)
2020-02-27 16:18:26 +01:00
salt = db.Column(db.String(128), nullable=True)
password = db.Column(db.String(128), nullable=True)
2020-02-27 16:18:26 +01:00
2019-07-02 09:20:12 +02:00
name = db.Column(db.String(128), nullable=False)
is_admin = db.Column(db.Boolean, nullable=False, default=False)
2019-12-28 01:03:59 +01:00
alias_generator = db.Column(
2019-12-30 00:37:07 +01:00
notification = db.Column(
2019-12-30 00:47:55 +01:00
db.Boolean, default=True, nullable=False, server_default="1"
2019-12-30 00:37:07 +01:00
2019-07-02 09:20:12 +02:00
activated = db.Column(db.Boolean, default=False, nullable=False)
profile_picture_id = db.Column(db.ForeignKey(, nullable=True)
2019-12-27 15:20:10 +01:00
otp_secret = db.Column(db.String(16), nullable=True)
enable_otp = db.Column(
db.Boolean, nullable=False, default=False, server_default="0"
2020-05-05 10:32:49 +02:00
# Fields for WebAuthn
fido_uuid = db.Column(db.String(), nullable=True, unique=True)
2020-05-05 10:58:42 +02:00
fido_credential_id = db.Column(db.String(), nullable=True, unique=True)
fido_pk = db.Column(db.String(), nullable=True, unique=True)
2020-05-05 12:16:52 +02:00
fido_sign_count = db.Column(db.Integer(), nullable=True)
2020-05-05 10:32:49 +02:00
2020-05-07 17:56:25 +02:00
# whether user can use Fido
can_use_fido = db.Column(
db.Boolean, default=False, nullable=False, server_default="0"
2020-05-07 14:32:52 +02:00
def fido_enabled(self) -> bool:
2020-05-07 17:56:25 +02:00
if self.can_use_fido and self.fido_uuid is not None:
2020-05-07 14:32:52 +02:00
return True
return False
# some users could have lifetime premium
lifetime = db.Column(db.Boolean, default=False, nullable=False, server_default="0")
2020-01-30 04:10:28 +01:00
# user can use all premium features until this date
trial_end = db.Column(
ArrowType, default=lambda:, hours=1), nullable=True
2020-01-30 04:10:28 +01:00
2020-02-23 09:40:41 +01:00
# the mailbox used when create random alias
2020-03-05 17:00:43 +01:00
# this field is nullable but in practice, it's always set
# it cannot be set to non-nullable though
# as this will create foreign key cycle between User and Mailbox
2020-02-23 09:40:41 +01:00
default_mailbox_id = db.Column(
db.ForeignKey(""), nullable=True, default=None
profile_picture = db.relationship(File, foreign_keys=[profile_picture_id])
2019-07-02 09:20:12 +02:00
# Use the "via" format for sender address, i.e. " via SimpleLogin"
# If False, use the format "Name - name at"
use_via_format_for_sender = db.Column(
db.Boolean, default=True, nullable=False, server_default="1"
2020-05-02 18:08:05 +02:00
referral_id = db.Column(
db.ForeignKey("", ondelete="SET NULL"), nullable=True, default=None
2020-04-09 22:19:45 +02:00
referral = db.relationship("Referral", foreign_keys=[referral_id])
2020-04-13 13:22:52 +02:00
# whether intro has been shown to user
intro_shown = db.Column(
db.Boolean, default=False, nullable=False, server_default="0"
2020-05-03 15:54:19 +02:00
default_mailbox = db.relationship("Mailbox", foreign_keys=[default_mailbox_id])
def create(cls, email, name, password=None, **kwargs):
2019-12-26 23:29:40 +01:00
user: User = super(User, cls).create(email=email, name=name, **kwargs)
if password:
2020-02-27 16:31:38 +01:00
mb = Mailbox.create(,, verified=True)
user.default_mailbox_id =
2020-03-05 20:32:08 +01:00
# create a first alias mail to show user how to use when they login
Alias.create_new(user, prefix="my-first-alias",
2020-03-05 20:32:08 +01:00
2020-05-10 14:43:41 +02:00
LOG.d("Disable onboarding emails")
return user
# Schedule onboarding emails
2020-03-24 21:19:45 +01:00
2020-04-02 23:26:17 +02:00
return user
2020-04-12 19:27:14 +02:00
def _lifetime_or_active_subscription(self) -> bool:
"""True if user has lifetime licence or active subscription"""
if self.lifetime:
return True
sub: Subscription = self.get_subscription()
if sub:
return True
apple_sub: AppleSubscription = AppleSubscription.get_by(
if apple_sub and apple_sub.is_valid():
return True
manual_sub: ManualSubscription = ManualSubscription.get_by(
if manual_sub and manual_sub.end_at >
return True
return False
2019-07-02 09:20:12 +02:00
def in_trial(self):
"""return True if user does not have lifetime licence or an active subscription AND is in trial period"""
2020-04-12 19:27:14 +02:00
if self._lifetime_or_active_subscription():
return False
2020-01-30 09:08:26 +01:00
if self.trial_end and < self.trial_end:
return True
return False
def should_show_upgrade_button(self):
2020-04-12 19:27:14 +02:00
if self._lifetime_or_active_subscription():
# user who has canceled can also re-subscribe
sub: Subscription = self.get_subscription()
if sub and sub.cancelled:
return True
return False
return True
def can_upgrade(self):
"""User who has lifetime licence or giveaway manual subscriptions can decide to upgrade to a paid plan"""
sub: Subscription = self.get_subscription()
# user who has canceled can also re-subscribe
if sub and not sub.cancelled:
return False
apple_sub: AppleSubscription = AppleSubscription.get_by(
if apple_sub and apple_sub.is_valid():
return False
manual_sub: ManualSubscription = ManualSubscription.get_by(
# user who has giveaway premium can decide to upgrade
2020-04-13 20:51:29 +02:00
if (
and manual_sub.end_at >
and not manual_sub.is_giveaway
return False
return True
def is_premium(self) -> bool:
user is premium if they:
- have a lifetime deal or
- in trial period or
- active subscription
2020-04-12 19:27:14 +02:00
if self._lifetime_or_active_subscription():
return True
2020-01-30 07:20:32 +01:00
if self.trial_end and < self.trial_end:
return True
return False
def can_create_new_alias(self) -> bool:
2019-07-06 23:25:52 +02:00
if self.is_premium():
return True
2019-11-21 22:44:24 +01:00
return Alias.filter_by( < MAX_NB_EMAIL_FREE_PLAN
2019-07-06 23:25:52 +02:00
def set_password(self, password):
salt = bcrypt.gensalt()
password_hash = bcrypt.hashpw(password.encode(), salt).decode()
self.salt = salt.decode()
self.password = password_hash
def check_password(self, password) -> bool:
if not self.password:
return False
password_hash = bcrypt.hashpw(password.encode(), self.salt.encode())
return self.password.encode() == password_hash
2019-07-02 09:20:12 +02:00
def profile_picture_url(self):
if self.profile_picture_id:
return self.profile_picture.get_url()
return url_for("static", filename="default-avatar.png")
2019-07-02 09:20:12 +02:00
2019-12-08 17:01:09 +01:00
def suggested_emails(self, website_name) -> (str, [str]):
2019-07-22 18:57:35 +02:00
"""return suggested email and other email choices """
2019-12-08 17:01:09 +01:00
website_name = convert_to_id(website_name)
all_aliases = [ for ge in Alias.filter_by(, enabled=True)
if self.can_create_new_alias():
suggested_alias = Alias.create_new(self, prefix=website_name).email
2019-07-22 18:57:35 +02:00
# pick an email from the list of gen emails
suggested_alias = random.choice(all_aliases)
2019-07-22 18:57:35 +02:00
return (
2019-07-22 18:57:35 +02:00
2019-07-22 21:28:17 +02:00
def suggested_names(self) -> (str, [str]):
"""return suggested name and other name choices """
other_name = convert_to_id(
return, [other_name, "Anonymous", "whoami"]
def get_name_initial(self) -> str:
names =" ")
return "".join([n[0].upper() for n in names if n])
def get_subscription(self) -> "Subscription":
"""return *active* subscription
TODO: support user unsubscribe and re-subscribe
sub = Subscription.get_by(
# TODO: sub is active only if sub.next_bill_date > now
# due to a bug on next_bill_date, wait until next month (May 8)
# when all next_bill_date are correctly updated to add this check
if sub and sub.cancelled:
# sub is active until the next billing_date + 1
if sub.next_bill_date >=
return sub
# past subscription, user is considered not having a subscription = free plan
return None
return sub
2019-12-02 01:13:39 +01:00
def verified_custom_domains(self):
return CustomDomain.query.filter_by(, verified=True).all()
def mailboxes(self) -> List["Mailbox"]:
"""list of mailbox that user own"""
mailboxes = []
2020-02-23 07:41:27 +01:00
for mailbox in Mailbox.query.filter_by(, verified=True):
2020-02-23 07:41:27 +01:00
return mailboxes
def nb_directory(self):
return Directory.query.filter_by(
2019-08-30 22:12:31 +02:00
def __repr__(self):
return f"<User {} {} {}>"
2019-07-02 09:20:12 +02:00
def _expiration_1h():
2019-11-18 19:32:58 +01:00
def _expiration_12h():
def _expiration_5m():
2020-03-14 16:07:34 +01:00
def _expiration_7d():
2019-07-02 09:20:12 +02:00
class ActivationCode(db.Model, ModelMixin):
"""For activate user account"""
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
code = db.Column(db.String(128), unique=True, nullable=False)
user = db.relationship(User)
expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
def is_expired(self):
return self.expired <
2019-07-02 09:20:12 +02:00
class ResetPasswordCode(db.Model, ModelMixin):
"""For resetting password"""
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
code = db.Column(db.String(128), unique=True, nullable=False)
user = db.relationship(User)
expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
2019-07-02 09:20:12 +02:00
def is_expired(self):
return self.expired <
2019-07-02 09:20:12 +02:00
class SocialAuth(db.Model, ModelMixin):
"""Store how user authenticates with social login"""
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
# name of the social login used, could be facebook, google or github
social = db.Column(db.String(128), nullable=False)
__table_args__ = (db.UniqueConstraint("user_id", "social", name="uq_social_auth"),)
2019-07-02 09:20:12 +02:00
# <<< OAUTH models >>>
def generate_oauth_client_id(client_name) -> str:
oauth_client_id = convert_to_id(client_name) + "-" + random_string()
# check that the client does not exist yet
if not Client.get_by(oauth_client_id=oauth_client_id):
LOG.debug("generate oauth_client_id %s", oauth_client_id)
return oauth_client_id
# Rerun the function
"client_id %s already exists, generate a new client_id", oauth_client_id
return generate_oauth_client_id(client_name)
class Client(db.Model, ModelMixin):
oauth_client_id = db.Column(db.String(128), unique=True, nullable=False)
oauth_client_secret = db.Column(db.String(128), nullable=False)
name = db.Column(db.String(128), nullable=False)
home_url = db.Column(db.String(1024))
published = db.Column(db.Boolean, default=False, nullable=False)
# user who created this client
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
icon_id = db.Column(db.ForeignKey(, nullable=True)
icon = db.relationship(File)
def nb_user(self):
return ClientUser.filter_by(
2019-07-03 12:13:28 +02:00
def get_scopes(self) -> [Scope]:
# todo: client can choose which scopes they want to have access
2019-07-03 12:13:28 +02:00
return [Scope.NAME, Scope.EMAIL, Scope.AVATAR_URL]
2019-07-02 09:20:12 +02:00
def create_new(cls, name, user_id) -> "Client":
# generate a client-id
oauth_client_id = generate_oauth_client_id(name)
oauth_client_secret = random_string(40)
client = Client.create(
return client
def get_icon_url(self):
if self.icon_id:
return self.icon.get_url()
return URL + "/static/default-icon.svg"
2019-08-16 12:56:13 +02:00
def last_user_login(self) -> "ClientUser":
client_user = (
ClientUser.query.filter(ClientUser.client_id ==
2019-12-28 01:03:59 +01:00
2019-08-16 12:56:13 +02:00
if client_user:
return client_user
return None
2019-07-02 09:20:12 +02:00
class RedirectUri(db.Model, ModelMixin):
"""Valid redirect uris for a client"""
client_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
uri = db.Column(db.String(1024), nullable=False)
client = db.relationship(Client, backref="redirect_uris")
class AuthorizationCode(db.Model, ModelMixin):
2019-07-02 09:20:12 +02:00
code = db.Column(db.String(128), unique=True, nullable=False)
client_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
scope = db.Column(db.String(128))
redirect_uri = db.Column(db.String(1024))
# what is the input response_type, e.g. "code", "code,id_token", ...
response_type = db.Column(db.String(128))
2019-07-02 09:20:12 +02:00
user = db.relationship(User, lazy=False)
client = db.relationship(Client, lazy=False)
expired = db.Column(ArrowType, nullable=False, default=_expiration_5m)
def is_expired(self):
return self.expired <
class OauthToken(db.Model, ModelMixin):
access_token = db.Column(db.String(128), unique=True)
2019-07-02 09:20:12 +02:00
client_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
scope = db.Column(db.String(128))
redirect_uri = db.Column(db.String(1024))
# what is the input response_type, e.g. "token", "token,id_token", ...
response_type = db.Column(db.String(128))
user = db.relationship(User)
2019-07-02 09:20:12 +02:00
client = db.relationship(Client)
expired = db.Column(ArrowType, nullable=False, default=_expiration_1h)
def is_expired(self):
return self.expired <
2019-07-02 09:20:12 +02:00
2019-12-28 01:03:59 +01:00
def generate_email(
scheme: int = AliasGeneratorEnum.word.value, in_hex: bool = False
) -> str:
2019-12-26 12:21:28 +01:00
"""generate an email address that does not exist before
:param scheme: int, value of AliasGeneratorEnum, indicate how the email is generated
:type in_hex: bool, if the generate scheme is uuid, is hex favorable?
if scheme == AliasGeneratorEnum.uuid.value:
name = uuid.uuid4().hex if in_hex else uuid.uuid4().__str__()
random_email = name + "@" + EMAIL_DOMAIN
random_email = random_words() + "@" + EMAIL_DOMAIN
2019-07-02 09:20:12 +02:00
# check that the client does not exist yet
if not Alias.get_by(email=random_email) and not DeletedAlias.get_by(
2019-12-28 01:03:59 +01:00
2019-11-18 15:10:16 +01:00
2019-07-02 09:20:12 +02:00
LOG.debug("generate email %s", random_email)
return random_email
# Rerun the function
LOG.warning("email %s already exists, generate a new email", random_email)
2019-12-26 12:21:28 +01:00
return generate_email(scheme=scheme, in_hex=in_hex)
2019-07-02 09:20:12 +02:00
class Alias(db.Model, ModelMixin):
2019-07-02 09:20:12 +02:00
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
email = db.Column(db.String(128), unique=True, nullable=False)
2020-04-26 10:37:40 +02:00
# the name to use when user replies/sends from alias
name = db.Column(db.String(128), nullable=True, default=None)
2019-07-02 09:20:12 +02:00
enabled = db.Column(db.Boolean(), default=True, nullable=False)
custom_domain_id = db.Column(
db.ForeignKey("", ondelete="cascade"), nullable=True
custom_domain = db.relationship("CustomDomain", foreign_keys=[custom_domain_id])
# To know whether an alias is created "on the fly", i.e. via the custom domain catch-all feature
automatic_creation = db.Column(
db.Boolean, nullable=False, default=False, server_default="0"
# to know whether an alias belongs to a directory
directory_id = db.Column(
db.ForeignKey("", ondelete="cascade"), nullable=True
2020-02-05 09:45:29 +01:00
note = db.Column(db.Text, default=None, nullable=True)
# an alias can be owned by another mailbox
mailbox_id = db.Column(
2020-03-05 17:00:43 +01:00
db.ForeignKey("", ondelete="cascade"), nullable=False
2020-05-03 15:54:19 +02:00
# prefix _ to avoid this object being used accidentally.
# To have the list of all mailboxes, should use AliasInfo instead
_mailboxes = db.relationship("Mailbox", secondary="alias_mailbox")
user = db.relationship(User)
2020-02-10 17:20:25 +01:00
mailbox = db.relationship("Mailbox")
def mailboxes(self):
ret = [self.mailbox]
for m in self._mailboxes:
return ret
def create(cls, **kw):
r = cls(**kw)
# make sure alias is not in global trash, i.e. DeletedAlias table
email = kw["email"]
if DeletedAlias.get_by(email=email):
raise AliasInTrashError
return r
2020-03-05 20:32:08 +01:00
def create_new(cls, user, prefix, note=None, mailbox_id=None):
if not prefix:
raise Exception("alias prefix cannot be empty")
# find the right suffix - avoid infinite loop by running this at max 1000 times
for i in range(1000):
suffix = random_word()
email = f"{prefix}.{suffix}@{FIRST_ALIAS_DOMAIN}"
if not cls.get_by(email=email) and not DeletedAlias.get_by(email=email):
return Alias.create(
2020-03-05 20:32:08 +01:00,
mailbox_id=mailbox_id or user.default_mailbox_id,
2019-12-28 01:03:59 +01:00
def create_new_random(
scheme: int = AliasGeneratorEnum.word.value,
in_hex: bool = False,
note: str = None,
2019-12-28 01:03:59 +01:00
"""create a new random alias"""
2019-12-26 12:21:28 +01:00
random_email = generate_email(scheme=scheme, in_hex=in_hex)
return Alias.create(,
2020-02-22 15:09:07 +01:00
def mailbox_email(self):
if self.mailbox_id:
2019-07-02 09:20:12 +02:00
def __repr__(self):
return f"<Alias {} {}>"
2019-07-02 09:20:12 +02:00
class ClientUser(db.Model, ModelMixin):
__table_args__ = (
db.UniqueConstraint("user_id", "client_id", name="uq_client_user"),
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
client_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
# Null means client has access to user original email
2020-03-17 12:01:18 +01:00
alias_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=True)
2019-07-02 09:20:12 +02:00
# user can decide to send to client another name
name = db.Column(
db.String(128), nullable=True, default=None, server_default=text("NULL")
# user can decide to send to client a default avatar
default_avatar = db.Column(
db.Boolean, nullable=False, default=False, server_default="0"
alias = db.relationship(Alias, backref="client_users")
2019-07-02 09:20:12 +02:00
user = db.relationship(User)
client = db.relationship(Client)
def get_email(self):
2020-03-17 12:01:18 +01:00
return if self.alias_id else
TODO: add migration file add migration for user.promo_codes move email alias on top of apps in dashboard add introjs move encode_url to utils create GenEmail.create_new_gen_email create a first alias mail to show user how to use when they login show intro when user visits the website the first time fix register
2019-07-02 09:20:12 +02:00
2019-08-16 12:56:13 +02:00
def get_user_name(self):
2019-07-02 09:20:12 +02:00
def get_user_info(self) -> dict:
"""return user info according to client scope
Return dict with key being scope name. For now all the fields are the same for all clients:
"client": "Demo",
"email": "",
"email_verified": true,
"id": 1,
"name": "Son GM",
"avatar_url": "http://s3..."
2019-07-02 09:20:12 +02:00
2019-08-11 00:32:00 +02:00
res = {
"email_verified": True,
"sub": str(,
2019-07-02 09:20:12 +02:00
for scope in self.client.get_scopes():
2019-07-03 12:13:28 +02:00
if scope == Scope.NAME:
2019-07-22 21:28:17 +02:00
res[Scope.NAME.value] =
res[Scope.NAME.value] =
2019-07-03 12:13:28 +02:00
elif scope == Scope.AVATAR_URL:
if self.user.profile_picture_id:
2019-07-22 22:24:57 +02:00
if self.default_avatar:
res[Scope.AVATAR_URL.value] = URL + "/static/default-avatar.png"
res[Scope.AVATAR_URL.value] = self.user.profile_picture.get_url(
2019-07-03 12:13:28 +02:00
res[Scope.AVATAR_URL.value] = None
elif scope == Scope.EMAIL:
2019-07-02 09:20:12 +02:00
# Use generated email
2020-03-17 12:01:18 +01:00
if self.alias_id:
2019-07-02 09:20:12 +02:00
"Use gen email for user %s, client %s", self.user, self.client
res[Scope.EMAIL.value] =
2019-07-02 09:20:12 +02:00
# Use user original email
2019-07-03 12:13:28 +02:00
res[Scope.EMAIL.value] =
2019-07-02 09:20:12 +02:00
return res
2019-11-07 17:34:18 +01:00
2020-03-17 10:56:59 +01:00
class Contact(db.Model, ModelMixin):
2019-11-07 17:34:18 +01:00
2020-03-14 12:22:43 +01:00
Store configuration of sender (website-email) and alias.
2019-11-07 17:34:18 +01:00
2019-11-07 17:34:18 +01:00
__table_args__ = (
2020-03-17 12:01:18 +01:00
db.UniqueConstraint("alias_id", "website_email", name="uq_contact"),
2019-11-07 17:34:18 +01:00
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
2020-03-17 12:01:18 +01:00
alias_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
2019-11-07 17:34:18 +01:00
name = db.Column(
db.String(512), nullable=True, default=None, server_default=text("NULL")
2020-04-05 12:18:18 +02:00
2020-03-08 11:33:54 +01:00
website_email = db.Column(db.String(512), nullable=False)
2019-11-07 17:34:18 +01:00
2019-12-09 22:40:31 +01:00
# the email from header, e.g. AB CD <>
# nullable as this field is added after website_email
2020-03-08 11:33:54 +01:00
website_from = db.Column(db.String(1024), nullable=True)
2019-12-09 22:40:31 +01:00
2019-11-07 17:34:18 +01:00
# when user clicks on "reply", they will reply to this address.
# This address allows to hide user personal email
# this reply email is created every time a website sends an email to user
# it has the prefix "reply+" to distinguish with other email
2020-03-08 11:33:54 +01:00
reply_email = db.Column(db.String(512), nullable=False)
2019-11-08 07:55:29 +01:00
2020-03-28 19:05:27 +01:00
# whether a contact is created via CC
is_cc = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
alias = db.relationship(Alias, backref="contacts")
2020-04-05 15:21:04 +02:00
user = db.relationship(User)
2019-12-15 16:17:37 +01:00
def website_send_to(self):
"""return the email address with name.
to use when user wants to send an email from the alias
"First Last | email at" <ra+random_string@SL>
# Prefer using contact name if possible
name =
2020-03-15 23:10:20 +01:00
# if no name, try to parse it from website_from
if not name and self.website_from:
from app.email_utils import parseaddr_unicode
name, _ = parseaddr_unicode(self.website_from)
except Exception:
# Skip if website_from is wrongly formatted
"Cannot parse contact %s website_from %s", self, self.website_from
name = ""
# remove all double quote
if name:
name = name.replace('"', "")
if name:
name = name + " | " + self.website_email.replace("@", " at ")
name = self.website_email.replace("@", " at ")
2020-03-15 23:10:20 +01:00
# cannot use formataddr here as this field is for email client, not for MTA
return f'"{name}" <{self.reply_email}>'
2019-12-15 16:17:37 +01:00
2020-04-05 15:21:04 +02:00
def new_addr(self):
Replace original email by reply_email. 2 possible formats:
- by SimpleLogin <reply_email> OR
- First Last - first at <reply_email>
And return new address with RFC 2047 format
`new_email` is a special reply address
user = self.user
2020-04-14 22:42:20 +02:00
if user and user.use_via_format_for_sender:
2020-04-05 15:21:04 +02:00
new_name = f"{self.website_email} via SimpleLogin"
name = or ""
new_name = (
name + (" - " if name else "") + self.website_email.replace("@", " at ")
new_addr = formataddr((new_name, self.reply_email)).strip()
return new_addr.strip()
2020-03-17 11:10:50 +01:00
def last_reply(self) -> "EmailLog":
2019-12-15 16:17:37 +01:00
"""return the most recent reply"""
return (
2020-03-17 11:10:50 +01:00
EmailLog.query.filter_by(, is_reply=True)
2019-12-28 01:03:59 +01:00
2019-12-15 16:17:37 +01:00
2019-11-14 14:54:17 +01:00
2020-03-28 19:05:27 +01:00
def __repr__(self):
return f"<Contact {} {self.website_email} {self.alias_id}>"
2019-11-14 14:54:17 +01:00
2020-03-17 11:10:50 +01:00
class EmailLog(db.Model, ModelMixin):
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
2020-03-17 11:05:53 +01:00
contact_id = db.Column(
2020-03-17 10:56:59 +01:00
db.ForeignKey(, ondelete="cascade"), nullable=False
2019-11-16 17:06:59 +01:00
# whether this is a reply
is_reply = db.Column(db.Boolean, nullable=False, default=False)
# for ex if alias is disabled, this forwarding is blocked
blocked = db.Column(db.Boolean, nullable=False, default=False)
2020-02-22 06:53:05 +01:00
# can happen when user email service refuses the forwarded email
# usually because the forwarded email is too spammy
bounced = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
# SpamAssassin result
is_spam = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
spam_status = db.Column(db.Text, nullable=True, default=None)
2020-03-14 16:07:34 +01:00
# Point to the email that has been refused
refused_email_id = db.Column(
db.ForeignKey("", ondelete="SET NULL"), nullable=True
2020-05-10 18:34:57 +02:00
# in case of bounce, record on what mailbox the email has been bounced
# useful when an alias has several mailboxes
bounced_mailbox_id = db.Column(
db.ForeignKey("", ondelete="cascade"), nullable=True
2020-03-14 16:07:34 +01:00
refused_email = db.relationship("RefusedEmail")
2020-03-17 10:56:59 +01:00
forward = db.relationship(Contact)
2020-03-14 16:07:34 +01:00
contact = db.relationship(Contact)
2020-05-10 18:41:22 +02:00
def bounced_mailbox(self) -> str:
if self.bounced_mailbox_id:
return Mailbox.get(self.bounced_mailbox_id).email
# retro-compatibility
2020-04-06 22:26:35 +02:00
def get_action(self) -> str:
"""return the action name: forward|reply|block|bounced"""
if self.is_reply:
return "reply"
elif self.bounced:
return "bounced"
elif self.blocked:
2020-04-20 19:58:10 +02:00
return "block"
2020-04-06 22:26:35 +02:00
return "forward"
2019-11-16 17:06:59 +01:00
2019-11-14 14:54:17 +01:00
class Subscription(db.Model, ModelMixin):
# Come from Paddle
cancel_url = db.Column(db.String(1024), nullable=False)
update_url = db.Column(db.String(1024), nullable=False)
subscription_id = db.Column(db.String(1024), nullable=False, unique=True)
event_time = db.Column(ArrowType, nullable=False)
next_bill_date = db.Column(db.Date, nullable=False)
cancelled = db.Column(db.Boolean, nullable=False, default=False)
plan = db.Column(db.Enum(PlanEnum), nullable=False)
user_id = db.Column(
db.ForeignKey(, ondelete="cascade"), nullable=False, unique=True
user = db.relationship(User)
2019-11-18 15:09:04 +01:00
def plan_name(self):
if self.plan == PlanEnum.monthly:
return "Monthly ($2.99/month)"
return "Yearly ($29.99/year)"
2019-11-18 15:09:04 +01:00
2020-02-23 10:31:14 +01:00
class ManualSubscription(db.Model, ModelMixin):
For users who use other forms of payment and therefore not pass by Paddle
user_id = db.Column(
db.ForeignKey(, ondelete="cascade"), nullable=False, unique=True
# an reminder is sent several days before the subscription ends
end_at = db.Column(ArrowType, nullable=False)
# for storing note about this subscription
comment = db.Column(db.Text, nullable=True)
# manual subscription are also used for Premium giveaways
is_giveaway = db.Column(
db.Boolean, default=False, nullable=False, server_default="0"
user = db.relationship(User)
2020-02-23 10:31:14 +01:00
2020-04-20 23:31:25 +02:00
2020-04-18 20:47:33 +02:00
class AppleSubscription(db.Model, ModelMixin):
For users who have subscribed via Apple in-app payment
user_id = db.Column(
db.ForeignKey(, ondelete="cascade"), nullable=False, unique=True
expires_date = db.Column(ArrowType, nullable=False)
# to avoid using "Restore Purchase" on another account
original_transaction_id = db.Column(db.String(256), nullable=False, unique=True)
2020-04-18 20:47:33 +02:00
receipt_data = db.Column(db.Text(), nullable=False)
plan = db.Column(db.Enum(PlanEnum), nullable=False)
user = db.relationship(User)
def is_valid(self):
# Todo: take into account grace period?
2020-04-20 23:31:25 +02:00
return self.expires_date >
2020-04-18 20:47:33 +02:00
2019-11-18 15:09:04 +01:00
class DeletedAlias(db.Model, ModelMixin):
"""Store all deleted alias to make sure they are NOT reused"""
2020-01-26 17:22:16 +01:00
email = db.Column(db.String(256), unique=True, nullable=False)
2019-11-18 19:32:58 +01:00
class EmailChange(db.Model, ModelMixin):
"""Used when user wants to update their email"""
user_id = db.Column(
db.ForeignKey(, ondelete="cascade"),
2020-01-26 17:22:16 +01:00
new_email = db.Column(db.String(256), unique=True, nullable=False)
2019-11-18 19:32:58 +01:00
code = db.Column(db.String(128), unique=True, nullable=False)
expired = db.Column(ArrowType, nullable=False, default=_expiration_12h)
user = db.relationship(User)
def is_expired(self):
return self.expired <
2019-11-28 23:00:19 +01:00
class AliasUsedOn(db.Model, ModelMixin):
"""Used to know where an alias is created"""
__table_args__ = (
2020-03-17 12:01:18 +01:00
db.UniqueConstraint("alias_id", "hostname", name="uq_alias_used"),
2019-11-28 23:00:19 +01:00
2020-03-17 12:01:18 +01:00
alias_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
2020-03-20 12:29:37 +01:00
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
2020-03-20 12:13:00 +01:00
alias = db.relationship(Alias)
2019-11-28 23:00:19 +01:00
hostname = db.Column(db.String(1024), nullable=False)
class ApiKey(db.Model, ModelMixin):
"""used in browser extension to identify user"""
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
code = db.Column(db.String(128), unique=True, nullable=False)
name = db.Column(db.String(128), nullable=False)
last_used = db.Column(ArrowType, default=None)
times = db.Column(db.Integer, default=0, nullable=False)
user = db.relationship(User)
def create(cls, user_id, name):
# generate unique code
found = False
while not found:
code = random_string(60)
if not cls.get_by(code=code):
found = True
a = cls(user_id=user_id, code=code, name=name)
return a
2019-11-30 00:40:07 +01:00
class CustomDomain(db.Model, ModelMixin):
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
domain = db.Column(db.String(128), unique=True, nullable=False)
# default name to use when user replies/sends from alias
name = db.Column(db.String(128), nullable=True, default=None)
2019-11-30 00:40:07 +01:00
verified = db.Column(db.Boolean, nullable=False, default=False)
2019-12-25 18:22:46 +01:00
dkim_verified = db.Column(
db.Boolean, nullable=False, default=False, server_default="0"
2019-12-27 23:44:53 +01:00
spf_verified = db.Column(
db.Boolean, nullable=False, default=False, server_default="0"
2020-05-03 11:51:22 +02:00
dmarc_verified = db.Column(
db.Boolean, nullable=False, default=False, server_default="0"
2019-12-30 18:17:45 +01:00
# an alias is created automatically the first time it receives an email
catch_all = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
user = db.relationship(User)
def delete(cls, obj_id):
# Put all aliases belonging to this domain to global trash
for alias in Alias.query.filter_by(custom_domain_id=obj_id):
except IntegrityError:
LOG.error("Some aliases have been added before to DeletedAlias")
cls.query.filter( == obj_id).delete()
def nb_alias(self):
return Alias.filter_by(
def __repr__(self):
return f"<Custom Domain {self.domain}>"
class LifetimeCoupon(db.Model, ModelMixin):
code = db.Column(db.String(128), nullable=False, unique=True)
nb_used = db.Column(db.Integer, nullable=False)
class Directory(db.Model, ModelMixin):
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
name = db.Column(db.String(128), unique=True, nullable=False)
user = db.relationship(User)
def nb_alias(self):
return Alias.filter_by(
def delete(cls, obj_id):
# Put all aliases belonging to this directory to global trash
for alias in Alias.query.filter_by(directory_id=obj_id):
# this can happen when a previously deleted alias is re-created via catch-all or directory feature
except IntegrityError:
LOG.error("Some aliases have been added before to DeletedAlias")
cls.query.filter( == obj_id).delete()
def __repr__(self):
return f"<Directory {}>"
2020-02-03 07:09:48 +01:00
class Job(db.Model, ModelMixin):
"""Used to schedule one-time job in the future"""
name = db.Column(db.String(128), nullable=False)
payload = db.Column(db.JSON)
# whether the job has been taken by the job runner
taken = db.Column(db.Boolean, default=False, nullable=False)
run_at = db.Column(ArrowType)
def __repr__(self):
return f"<Job {} {} {self.payload}>"
class Mailbox(db.Model, ModelMixin):
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
email = db.Column(db.String(256), nullable=False)
verified = db.Column(db.Boolean, default=False, nullable=False)
2020-05-07 13:28:04 +02:00
force_spf = db.Column(db.Boolean, default=True, server_default="1", nullable=False)
2020-02-23 08:02:02 +01:00
# used when user wants to update mailbox email
new_email = db.Column(db.String(256), unique=True)
pgp_public_key = db.Column(db.Text, nullable=True)
pgp_finger_print = db.Column(db.String(512), nullable=True)
__table_args__ = (db.UniqueConstraint("user_id", "email", name="uq_mailbox_user"),)
def nb_alias(self):
return Alias.filter_by(
def delete(cls, obj_id):
# Put all aliases belonging to this mailbox to global trash
for alias in Alias.query.filter_by(mailbox_id=obj_id):
# this can happen when a previously deleted alias is re-created via catch-all or directory feature
except IntegrityError:
LOG.error("Some aliases have been added before to DeletedAlias")
cls.query.filter( == obj_id).delete()
def __repr__(self):
return f"<Mailbox {}>"
2020-02-28 13:00:45 +01:00
class AccountActivation(db.Model, ModelMixin):
"""contains code to activate the user account when they sign up on mobile"""
user_id = db.Column(
db.ForeignKey(, ondelete="cascade"), nullable=False, unique=True
# the activation code is usually 6 digits
2020-02-28 13:09:01 +01:00
code = db.Column(db.String(10), nullable=False)
2020-02-28 13:00:45 +01:00
# nb tries decrements each time user enters wrong code
tries = db.Column(db.Integer, default=3, nullable=False)
__table_args__ = (
CheckConstraint(tries >= 0, name="account_activation_tries_positive"),
2020-03-14 16:07:34 +01:00
class RefusedEmail(db.Model, ModelMixin):
"""Store emails that have been refused, i.e. bounced or classified as spams"""
# Store the full report, including logs from Sending & Receiving MTA
full_report_path = db.Column(db.String(128), unique=True, nullable=False)
# The original email, to display to user
2020-03-22 16:51:21 +01:00
path = db.Column(db.String(128), unique=True, nullable=True)
2020-03-14 16:07:34 +01:00
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
# the email content will be deleted at this date
delete_at = db.Column(ArrowType, nullable=False, default=_expiration_7d)
2020-03-15 11:10:37 +01:00
# toggle this when email content (stored at full_report_path & path are deleted)
deleted = db.Column(db.Boolean, nullable=False, default=False, server_default="0")
2020-03-14 16:07:34 +01:00
def get_url(self, expires_in=3600):
2020-03-22 16:51:21 +01:00
if self.path:
return s3.get_url(self.path, expires_in)
return s3.get_url(self.full_report_path, expires_in)
2020-03-14 16:07:34 +01:00
def __repr__(self):
return f"<Refused Email {} {self.path} {self.delete_at}>"
2020-04-09 22:19:45 +02:00
class Referral(db.Model, ModelMixin):
"""Referral code so user can invite others"""
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
2020-05-02 18:08:05 +02:00
name = db.Column(db.String(512), nullable=True, default=None)
2020-04-09 22:19:45 +02:00
code = db.Column(db.String(128), unique=True, nullable=False)
def nb_user(self):
return User.filter_by(, activated=True).count()
def link(self):
return f"{LANDING_PAGE_URL}?slref={self.code}"
class SentAlert(db.Model, ModelMixin):
"""keep track of alerts sent to user.
User can receive an alert when there's abnormal activity on their aliases such as
- reverse-alias not used by the owning mailbox
- SPF fails when using the reverse-alias
- bounced email
- ...
Different rate controls can then be implemented based on SentAlert:
- only once alert: an alert type should be sent only once
- max number of sent per 24H: an alert type should not be sent more than X times in 24h
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
to_email = db.Column(db.String(256), nullable=False)
alert_type = db.Column(db.String(256), nullable=False)
2020-05-03 15:54:19 +02:00
class AliasMailbox(db.Model, ModelMixin):
__table_args__ = (
db.UniqueConstraint("alias_id", "mailbox_id", name="uq_alias_mailbox"),
user_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
alias_id = db.Column(db.ForeignKey(, ondelete="cascade"), nullable=False)
mailbox_id = db.Column(
db.ForeignKey(, ondelete="cascade"), nullable=False