Merge branch 'master' into master

This commit is contained in:
Raymond Nook 2021-06-05 22:57:27 -07:00 committed by GitHub
commit e40c276a68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 261 additions and 534 deletions

View File

@ -117,7 +117,12 @@ def manual_upgrade(way: str, ids: [int], is_giveaway: bool):
flash(f"Subscription extended to {manual_sub.end_at.humanize()}", "success")
continue
if user.is_premium() and not user.in_trial():
# user can have manual subscription applied if their current subscription is canceled
if (
user.is_premium()
and not user.in_trial()
and not user.subscription_cancelled
):
flash(f"User {user} is already premium", "warning")
continue

View File

@ -2,245 +2,13 @@ from flask import jsonify, request, g
from sqlalchemy import desc
from app.api.base import api_bp, require_api_auth
from app.config import ALIAS_DOMAINS, DISABLE_ALIAS_SUFFIX
from app.dashboard.views.custom_alias import (
get_available_suffixes,
)
from app.extensions import db
from app.log import LOG
from app.models import AliasUsedOn, Alias, User
from app.utils import convert_to_id, get_suffix
@api_bp.route("/alias/options")
@require_api_auth
def options():
"""
Return what options user has when creating new alias.
Input:
a valid api-key in "Authentication" header and
optional "hostname" in args
Output: cf README
optional recommendation:
optional custom
can_create_custom: boolean
existing: array of existing aliases
"""
LOG.e("/alias/options is obsolete")
user = g.user
hostname = request.args.get("hostname")
ret = {
"existing": [ge.email for ge in Alias.query.filter_by(user_id=user.id)],
"can_create_custom": user.can_create_new_alias(),
}
# recommendation alias if exist
if hostname:
# put the latest used alias first
q = (
db.session.query(AliasUsedOn, Alias, User)
.filter(
AliasUsedOn.alias_id == Alias.id,
Alias.user_id == user.id,
AliasUsedOn.hostname == hostname,
)
.order_by(desc(AliasUsedOn.created_at))
)
r = q.first()
if r:
_, alias, _ = r
LOG.d("found alias %s %s %s", alias, hostname, user)
ret["recommendation"] = {"alias": alias.email, "hostname": hostname}
# custom alias suggestion and suffix
ret["custom"] = {}
if hostname:
# keep only the domain name of hostname, ignore TLD and subdomain
# for ex www.groupon.com -> groupon
domain_name = hostname
if "." in hostname:
parts = hostname.split(".")
domain_name = parts[-2]
domain_name = convert_to_id(domain_name)
ret["custom"]["suggestion"] = domain_name
else:
ret["custom"]["suggestion"] = ""
ret["custom"]["suffixes"] = []
# maybe better to make sure the suffix is never used before
# but this is ok as there's a check when creating a new custom alias
for domain in ALIAS_DOMAINS:
if DISABLE_ALIAS_SUFFIX:
ret["custom"]["suffixes"].append(f"@{domain}")
else:
ret["custom"]["suffixes"].append(f".{get_suffix(user)}@{domain}")
for custom_domain in user.verified_custom_domains():
ret["custom"]["suffixes"].append("@" + custom_domain.domain)
# custom domain should be put first
ret["custom"]["suffixes"] = list(reversed(ret["custom"]["suffixes"]))
return jsonify(ret)
@api_bp.route("/v2/alias/options")
@require_api_auth
def options_v2():
"""
Return what options user has when creating new alias.
Input:
a valid api-key in "Authentication" header and
optional "hostname" in args
Output: cf README
can_create: bool
suffixes: [str]
prefix_suggestion: str
existing: [str]
recommendation: Optional dict
alias: str
hostname: str
"""
LOG.e("/v2/alias/options is obsolete")
user = g.user
hostname = request.args.get("hostname")
ret = {
"existing": [
ge.email for ge in Alias.query.filter_by(user_id=user.id, enabled=True)
],
"can_create": user.can_create_new_alias(),
"suffixes": [],
"prefix_suggestion": "",
}
# recommendation alias if exist
if hostname:
# put the latest used alias first
q = (
db.session.query(AliasUsedOn, Alias, User)
.filter(
AliasUsedOn.alias_id == Alias.id,
Alias.user_id == user.id,
AliasUsedOn.hostname == hostname,
)
.order_by(desc(AliasUsedOn.created_at))
)
r = q.first()
if r:
_, alias, _ = r
LOG.d("found alias %s %s %s", alias, hostname, user)
ret["recommendation"] = {"alias": alias.email, "hostname": hostname}
# custom alias suggestion and suffix
if hostname:
# keep only the domain name of hostname, ignore TLD and subdomain
# for ex www.groupon.com -> groupon
domain_name = hostname
if "." in hostname:
parts = hostname.split(".")
domain_name = parts[-2]
domain_name = convert_to_id(domain_name)
ret["prefix_suggestion"] = domain_name
# maybe better to make sure the suffix is never used before
# but this is ok as there's a check when creating a new custom alias
for domain in ALIAS_DOMAINS:
if DISABLE_ALIAS_SUFFIX:
ret["suffixes"].append(f"@{domain}")
else:
ret["suffixes"].append(f".{get_suffix(user)}@{domain}")
for custom_domain in user.verified_custom_domains():
ret["suffixes"].append("@" + custom_domain.domain)
# custom domain should be put first
ret["suffixes"] = list(reversed(ret["suffixes"]))
return jsonify(ret)
@api_bp.route("/v3/alias/options")
@require_api_auth
def options_v3():
"""
Return what options user has when creating new alias.
Same as v2 but do NOT return existing alias
Input:
a valid api-key in "Authentication" header and
optional "hostname" in args
Output: cf README
can_create: bool
suffixes: [str]
prefix_suggestion: str
recommendation: Optional dict
alias: str
hostname: str
"""
LOG.e("/v3/alias/options is obsolete")
user = g.user
hostname = request.args.get("hostname")
ret = {
"can_create": user.can_create_new_alias(),
"suffixes": [],
"prefix_suggestion": "",
}
# recommendation alias if exist
if hostname:
# put the latest used alias first
q = (
db.session.query(AliasUsedOn, Alias, User)
.filter(
AliasUsedOn.alias_id == Alias.id,
Alias.user_id == user.id,
AliasUsedOn.hostname == hostname,
)
.order_by(desc(AliasUsedOn.created_at))
)
r = q.first()
if r:
_, alias, _ = r
LOG.d("found alias %s %s %s", alias, hostname, user)
ret["recommendation"] = {"alias": alias.email, "hostname": hostname}
# custom alias suggestion and suffix
if hostname:
# keep only the domain name of hostname, ignore TLD and subdomain
# for ex www.groupon.com -> groupon
domain_name = hostname
if "." in hostname:
parts = hostname.split(".")
domain_name = parts[-2]
domain_name = convert_to_id(domain_name)
ret["prefix_suggestion"] = domain_name
# maybe better to make sure the suffix is never used before
# but this is ok as there's a check when creating a new custom alias
for domain in ALIAS_DOMAINS:
if DISABLE_ALIAS_SUFFIX:
ret["suffixes"].append(f"@{domain}")
else:
ret["suffixes"].append(f".{get_suffix(user)}@{domain}")
for custom_domain in user.verified_custom_domains():
ret["suffixes"].append("@" + custom_domain.domain)
# custom domain should be put first
ret["suffixes"] = list(reversed(ret["suffixes"]))
return jsonify(ret)
from app.utils import convert_to_id
@api_bp.route("/v4/alias/options")

View File

@ -41,6 +41,9 @@ def create_mailbox():
user = g.user
mailbox_email = sanitize_email(request.get_json().get("email"))
if not user.is_premium():
return jsonify(error=f"Only premium plan can add additional mailbox"), 400
if not is_valid_email(mailbox_email):
return jsonify(error=f"{mailbox_email} invalid"), 400
elif mailbox_already_used(mailbox_email, user):

View File

@ -7,8 +7,6 @@ from app.api.base import api_bp, require_api_auth
from app.api.serializer import (
serialize_alias_info_v2,
get_alias_info_v2,
serialize_alias_info,
get_alias_info,
)
from app.config import MAX_NB_EMAIL_FREE_PLAN, ALIAS_LIMIT
from app.dashboard.views.custom_alias import verify_prefix_suffix, signer
@ -27,78 +25,6 @@ from app.models import (
from app.utils import convert_to_id
@api_bp.route("/alias/custom/new", methods=["POST"])
@limiter.limit(ALIAS_LIMIT)
@require_api_auth
def new_custom_alias():
"""
Currently used by Safari extension.
Create a new custom alias
Input:
alias_prefix, for ex "www_groupon_com"
alias_suffix, either .random_letters@simplelogin.co or @my-domain.com
optional "hostname" in args
optional "note"
Output:
201 if success
409 if the alias already exists
"""
LOG.e("/alias/custom/new is obsolete")
user: User = g.user
if not user.can_create_new_alias():
LOG.d("user %s cannot create any custom alias", user)
return (
jsonify(
error="You have reached the limitation of a free account with the maximum of "
f"{MAX_NB_EMAIL_FREE_PLAN} aliases, please upgrade your plan to create more aliases"
),
400,
)
hostname = request.args.get("hostname")
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
alias_prefix = data.get("alias_prefix", "").strip().lower().replace(" ", "")
alias_suffix = data.get("alias_suffix", "").strip().lower().replace(" ", "")
note = data.get("note")
alias_prefix = convert_to_id(alias_prefix)
if not verify_prefix_suffix(user, alias_prefix, alias_suffix):
return jsonify(error="wrong alias prefix or suffix"), 400
full_alias = alias_prefix + alias_suffix
if (
Alias.get_by(email=full_alias)
or DeletedAlias.get_by(email=full_alias)
or DomainDeletedAlias.get_by(email=full_alias)
):
LOG.d("full alias already used %s", full_alias)
return jsonify(error=f"alias {full_alias} already exists"), 409
alias = Alias.create(
user_id=user.id, email=full_alias, mailbox_id=user.default_mailbox_id, note=note
)
if alias_suffix.startswith("@"):
alias_domain = alias_suffix[1:]
domain = CustomDomain.get_by(domain=alias_domain)
if domain:
LOG.d("set alias %s to domain %s", full_alias, domain)
alias.custom_domain_id = domain.id
db.session.commit()
if hostname:
AliasUsedOn.create(alias_id=alias.id, hostname=hostname, user_id=alias.user_id)
db.session.commit()
return jsonify(alias=full_alias, **serialize_alias_info(get_alias_info(alias))), 201
@api_bp.route("/v2/alias/custom/new", methods=["POST"])
@limiter.limit(ALIAS_LIMIT)
@require_api_auth

View File

@ -13,7 +13,7 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import make_msgid, formatdate, parseaddr
from smtplib import SMTP, SMTPServerDisconnected
from typing import Tuple, List
from typing import Tuple, List, Optional
import arrow
import dkim
@ -1198,3 +1198,20 @@ def sl_sendmail(
)
else:
raise
def get_queue_id(msg: Message) -> Optional[str]:
"""Get the Postfix queue-id from a message"""
received_header = msg["Received"]
if not received_header:
return
# received_header looks like 'from mail-wr1-x434.google.com (mail-wr1-x434.google.com [IPv6:2a00:1450:4864:20::434])\r\n\t(using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits))\r\n\t(No client certificate requested)\r\n\tby mx1.simplelogin.co (Postfix) with ESMTPS id 4FxQmw1DXdz2vK2\r\n\tfor <jglfdjgld@alias.com>; Fri, 4 Jun 2021 14:55:43 +0000 (UTC)'
search_result = re.search("with ESMTPS id [0-9a-zA-Z]{1,}", received_header)
if not search_result:
return
# the "with ESMTPS id 4FxQmw1DXdz2vK2" part
with_esmtps = received_header[search_result.start() : search_result.end()]
return with_esmtps[len("with ESMTPS id ") :]

View File

@ -18,7 +18,7 @@ _MESSAGE_ID = ""
def set_message_id(message_id):
global _MESSAGE_ID
print("set message_id", message_id)
LOG.d("set message_id %s", message_id)
_MESSAGE_ID = message_id

View File

@ -5,7 +5,6 @@ from email.utils import formataddr
from typing import List, Tuple, Optional
import arrow
import bcrypt
from arrow import Arrow
from flask import url_for
from flask_login import UserMixin
@ -30,6 +29,7 @@ from app.errors import AliasInTrashError
from app.extensions import db
from app.log import LOG
from app.oauth_models import Scope
from app.pw_models import PasswordOracle
from app.utils import (
convert_to_id,
random_string,
@ -189,13 +189,10 @@ class Fido(db.Model, ModelMixin):
name = db.Column(db.String(128), nullable=False, unique=False)
class User(db.Model, ModelMixin, UserMixin):
class User(db.Model, ModelMixin, UserMixin, PasswordOracle):
__tablename__ = "users"
email = db.Column(db.String(256), unique=True, nullable=False)
salt = db.Column(db.String(128), nullable=True)
password = db.Column(db.String(128), nullable=True)
name = db.Column(db.String(128), nullable=True)
is_admin = db.Column(db.Boolean, nullable=False, default=False)
alias_generator = db.Column(
@ -480,7 +477,10 @@ class User(db.Model, ModelMixin, UserMixin):
sub: Subscription = self.get_subscription()
if sub:
return f"Paddle Subscription {sub.subscription_id}"
if sub.cancelled:
return f"Cancelled Paddle Subscription {sub.subscription_id}"
else:
return f"Active Paddle Subscription {sub.subscription_id}"
apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id)
if apple_sub and apple_sub.is_valid():
@ -502,6 +502,28 @@ class User(db.Model, ModelMixin, UserMixin):
return "N/A"
@property
def subscription_cancelled(self) -> bool:
sub: Subscription = self.get_subscription()
if sub and sub.cancelled:
return True
apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=self.id)
if apple_sub and not apple_sub.is_valid():
return True
manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=self.id)
if manual_sub and not manual_sub.is_active():
return True
coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by(
user_id=self.id
)
if coinbase_subscription and not coinbase_subscription.is_active():
return True
return False
@property
def premium_end(self) -> str:
if self.lifetime:
@ -537,18 +559,6 @@ class User(db.Model, ModelMixin, UserMixin):
else:
return Alias.filter_by(user_id=self.id).count() < MAX_NB_EMAIL_FREE_PLAN
def set_password(self, password):
salt = bcrypt.gensalt()
password_hash = bcrypt.hashpw(password.encode(), salt).decode()
self.salt = salt.decode()
self.password = password_hash
def check_password(self, password) -> bool:
if not self.password:
return False
password_hash = bcrypt.hashpw(password.encode(), self.salt.encode())
return self.password.encode() == password_hash
def profile_picture_url(self):
if self.profile_picture_id:
return self.profile_picture.get_url()
@ -1525,6 +1535,12 @@ class EmailLog(db.Model, ModelMixin):
else:
return "forward"
def get_phase(self) -> str:
if self.is_reply:
return "reply"
else:
return "forward"
def __repr__(self):
return f"<EmailLog {self.id}>"
@ -2037,8 +2053,12 @@ class AliasHibp(db.Model, ModelMixin):
__table_args__ = (db.UniqueConstraint("alias_id", "hibp_id", name="uq_alias_hibp"),)
alias_id = db.Column(db.Integer(), db.ForeignKey("alias.id", ondelete="cascade"))
hibp_id = db.Column(db.Integer(), db.ForeignKey("hibp.id", ondelete="cascade"))
alias_id = db.Column(
db.Integer(), db.ForeignKey("alias.id", ondelete="cascade"), index=True
)
hibp_id = db.Column(
db.Integer(), db.ForeignKey("hibp.id", ondelete="cascade"), index=True
)
alias = db.relationship(
"Alias", backref=db.backref("alias_hibp", cascade="all, delete-orphan")

24
app/pw_models.py Normal file
View File

@ -0,0 +1,24 @@
import unicodedata
import bcrypt
from app.extensions import db
_NORMALIZATION_FORM = "NFKC"
class PasswordOracle:
password = db.Column(db.String(128), nullable=True)
def set_password(self, password):
password = unicodedata.normalize(_NORMALIZATION_FORM, password)
salt = bcrypt.gensalt()
self.password = bcrypt.hashpw(password.encode(), salt).decode()
def check_password(self, password) -> bool:
if not self.password:
return False
password = unicodedata.normalize(_NORMALIZATION_FORM, password)
return bcrypt.checkpw(password.encode(), self.password.encode())

View File

@ -83,3 +83,8 @@ def sanitize_email(email_address: str) -> str:
if email_address:
return email_address.lower().strip().replace(" ", "").replace("\n", " ")
return email_address
def query2str(query):
"""Useful utility method to print out a SQLAlchemy query"""
return query.statement.compile(compile_kwargs={"literal_binds": True})

View File

@ -816,7 +816,7 @@ async def _hibp_check(api_key, queue):
LOG.d("Updated breaches info for %s", alias)
await asyncio.sleep(1.5)
await asyncio.sleep(1.6)
async def check_hibp():

View File

@ -106,6 +106,7 @@ from app.email_utils import (
spf_pass,
sl_sendmail,
sanitize_header,
get_queue_id,
)
from app.extensions import db
from app.greylisting import greylisting_needed
@ -491,13 +492,13 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
LOG.w("User %s disabled, disable forwarding emails for %s", user, alias)
return [(False, "550 SL E20 Account disabled")]
mail_from = envelope.mail_from
for mb in alias.mailboxes:
# email send from a mailbox to alias
if mb.email == mail_from:
LOG.w("cycle email sent from %s to %s", mb, alias)
handle_email_sent_to_ourself(alias, mb, msg, user)
return [(True, "250 Message accepted for delivery")]
# mail_from = envelope.mail_from
# for mb in alias.mailboxes:
# # email send from a mailbox to alias
# if mb.email == mail_from:
# LOG.w("cycle email sent from %s to %s", mb, alias)
# handle_email_sent_to_ourself(alias, mb, msg, user)
# return [(True, "250 Message accepted for delivery")]
LOG.d("Create or get contact for from:%s reply-to:%s", msg["From"], msg["Reply-To"])
# prefer using Reply-To when creating contact
@ -1463,30 +1464,34 @@ def handle_transactional_bounce(envelope: Envelope, rcpt_to):
Bounce.create(email=transactional.email, commit=True)
def handle_bounce(envelope, rcpt_to, msg: Message) -> str:
def handle_bounce(envelope, email_log: EmailLog, msg: Message) -> str:
"""
Return SMTP status, e.g. "500 Error"
"""
LOG.d("handle bounce sent to %s", rcpt_to)
# parse the EmailLog
email_log_id = parse_id_from_bounce(rcpt_to)
email_log = EmailLog.get(email_log_id)
if not email_log:
LOG.w("No such email log")
return "550 SL E27 No such email log"
contact: Contact = email_log.contact
alias = contact.alias
LOG.d(
"handle bounce for %s, phase=%s, contact=%s, alias=%s",
email_log,
email_log.get_phase(),
contact,
alias,
)
if email_log.is_reply:
content_type = msg.get_content_type().lower()
if content_type != "multipart/report" or envelope.mail_from != "<>":
# forward the email again to the alias
# todo: remove logging
LOG.w(
"Handle auto reply %s %s. Msg:\n%s",
LOG.i(
"Handle auto reply %s %s",
content_type,
envelope.mail_from,
msg,
)
contact: Contact = email_log.contact
@ -1531,6 +1536,9 @@ def handle(envelope: Envelope) -> str:
envelope.rcpt_tos = rcpt_tos
msg = email.message_from_bytes(envelope.original_content)
postfix_queue_id = get_queue_id(msg)
if postfix_queue_id:
set_message_id(postfix_queue_id)
# sanitize email headers
sanitize_header(msg, "from")
@ -1575,29 +1583,53 @@ def handle(envelope: Envelope) -> str:
handle_transactional_bounce(envelope, rcpt_tos[0])
return "250 bounce handled"
# whether this is a bounce report
is_bounce = False
# Handle bounce
if (
len(rcpt_tos) == 1
and rcpt_tos[0].startswith(BOUNCE_PREFIX)
and rcpt_tos[0].endswith(BOUNCE_SUFFIX)
):
is_bounce = True
email_log_id = parse_id_from_bounce(rcpt_tos[0])
email_log = EmailLog.get(email_log_id)
return handle_bounce(envelope, email_log, msg)
if len(rcpt_tos) == 1 and rcpt_tos[0].startswith(
f"{BOUNCE_PREFIX_FOR_REPLY_PHASE}+"
):
is_bounce = True
email_log_id = parse_id_from_bounce(rcpt_tos[0])
email_log = EmailLog.get(email_log_id)
return handle_bounce(envelope, email_log, msg)
if is_bounce:
return handle_bounce(envelope, rcpt_tos[0], msg)
# iCloud returns the bounce with mail_from=bounce+{email_log_id}+@simplelogin.co, rcpt_to=alias
if (
len(rcpt_tos) == 1
and mail_from.startswith(BOUNCE_PREFIX)
and mail_from.endswith(BOUNCE_SUFFIX)
):
email_log_id = parse_id_from_bounce(mail_from)
email_log = EmailLog.get(email_log_id)
alias = Alias.get_by(email=rcpt_tos[0])
LOG.w(
"iCloud bounces %s %s msg=%s",
email_log,
alias,
msg.as_string(),
)
return handle_bounce(envelope, email_log, msg)
# Whether it's necessary to apply greylisting
if greylisting_needed(mail_from, rcpt_tos):
LOG.w("Grey listing applied for mail_from:%s rcpt_tos:%s", mail_from, rcpt_tos)
return "421 SL Retry later"
# Handle "out of office" auto notice. An automatic response is sent for every forwarded email
# todo: remove logging
if len(rcpt_tos) == 1 and is_reply_email(rcpt_tos[0]) and mail_from == "<>":
LOG.w(
"out-of-office email to reverse alias %s. %s", rcpt_tos[0], msg.as_string()
)
return "250 SL E28"
# result of all deliveries
# each element is a couple of whether the delivery is successful and the smtp status
res: [(bool, str)] = []
@ -1620,14 +1652,6 @@ def handle(envelope: Envelope) -> str:
# recipient starts with "reply+" or "ra+" (ra=reverse-alias) prefix
if is_reply_email(rcpt_to):
LOG.d("Reply phase %s(%s) -> %s", mail_from, copy_msg["From"], rcpt_to)
# for debugging. A reverse alias should never receive a bounce report from MTA
# as emails are sent with VERP
# but it's possible that some MTA don't send the bounce report correctly
# todo: to remove once this issue is understood
if mail_from == "<>":
LOG.exception("email from <> to reverse alias %s. \n%s", rcpt_to, msg)
is_delivered, smtp_status = handle_reply(envelope, copy_msg, rcpt_to)
res.append((is_delivered, smtp_status))
else: # Forward case

View File

@ -0,0 +1,31 @@
"""empty message
Revision ID: fc2eb1d7e4fc
Revises: 68e2f38e33f4
Create Date: 2021-05-28 19:59:04.259149
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'fc2eb1d7e4fc'
down_revision = '68e2f38e33f4'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(op.f('ix_alias_hibp_alias_id'), 'alias_hibp', ['alias_id'], unique=False)
op.create_index(op.f('ix_alias_hibp_hibp_id'), 'alias_hibp', ['hibp_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_alias_hibp_hibp_id'), table_name='alias_hibp')
op.drop_index(op.f('ix_alias_hibp_alias_id'), table_name='alias_hibp')
# ### end Alembic commands ###

View File

@ -0,0 +1,29 @@
"""empty message
Revision ID: a5e643d562c9
Revises: fc2eb1d7e4fc
Create Date: 2021-06-02 18:50:39.611746
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a5e643d562c9'
down_revision = 'fc2eb1d7e4fc'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('users', 'salt')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('users', sa.Column('salt', sa.VARCHAR(length=128), autoincrement=False, nullable=True))
# ### end Alembic commands ###

View File

@ -102,16 +102,6 @@
<body class="">
<div class="page">
{% block announcement %}
<div class="text-center mb-0" role="alert" style="color: white; background-color: #e96a48; font-weight: 500;">
SimpleLogin is on ProductHunt
<img src="/static/images/producthunt.svg" style="width: 18px">
,
<a href="https://www.producthunt.com/posts/simplelogin" target="_blank"
style="font-weight: 500; color: white; text-decoration: underline;">
Support Us
<i class="fe fe-external-link"></i>
</a>
</div>
{% endblock %}
<div class="container">

View File

@ -6,59 +6,7 @@ from app.extensions import db
from app.models import User, ApiKey, AliasUsedOn, Alias
def test_different_scenarios(flask_client):
user = User.create(
email="a@b.c",
password="password",
name="Test User",
activated=True,
commit=True,
)
# create api_key
api_key = ApiKey.create(user.id, "for test")
db.session.commit()
# <<< without hostname >>>
r = flask_client.get(
url_for("api.options"), headers={"Authentication": api_key.code}
)
# {
# "can_create_custom": True,
# "custom": {"suffixes": ["azdwbw@sl.local"], "suggestion": ""},
# "existing": ["cat_cat_cat@sl.local"],
# }
assert r.status_code == 200
assert r.json["can_create_custom"]
assert len(r.json["existing"]) == 1
assert len(r.json["custom"]["suffixes"]) == 4
assert r.json["custom"]["suggestion"] == "" # no hostname => no suggestion
# <<< with hostname >>>
r = flask_client.get(
url_for("api.options", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["custom"]["suggestion"] == "test"
# <<< with recommendation >>>
alias = Alias.create_new(user, prefix="test")
db.session.commit()
AliasUsedOn.create(alias_id=alias.id, hostname="www.test.com", user_id=user.id)
db.session.commit()
r = flask_client.get(
url_for("api.options", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["recommendation"]["alias"] == alias.email
assert r.json["recommendation"]["hostname"] == "www.test.com"
def test_different_scenarios_v2(flask_client):
def test_different_scenarios_v4(flask_client):
user = User.create(
email="a@b.c", password="password", name="Test User", activated=True
)
@ -70,54 +18,7 @@ def test_different_scenarios_v2(flask_client):
# <<< without hostname >>>
r = flask_client.get(
url_for("api.options_v2"), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
# {'can_create': True, 'existing': ['my-first-alias.chat@sl.local'], 'prefix_suggestion': '', 'suffixes': ['.meo@sl.local']}
assert r.json["can_create"]
assert len(r.json["existing"]) == 1
assert r.json["suffixes"]
assert r.json["prefix_suggestion"] == "" # no hostname => no suggestion
# <<< with hostname >>>
r = flask_client.get(
url_for("api.options_v2", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["prefix_suggestion"] == "test"
# <<< with recommendation >>>
alias = Alias.create_new(user, prefix="test")
db.session.commit()
AliasUsedOn.create(
alias_id=alias.id, hostname="www.test.com", user_id=alias.user_id
)
db.session.commit()
r = flask_client.get(
url_for("api.options_v2", hostname="www.test.com"),
headers={"Authentication": api_key.code},
)
assert r.json["recommendation"]["alias"] == alias.email
assert r.json["recommendation"]["hostname"] == "www.test.com"
def test_different_scenarios_v3(flask_client):
user = User.create(
email="a@b.c", password="password", name="Test User", activated=True
)
db.session.commit()
# create api_key
api_key = ApiKey.create(user.id, "for test")
db.session.commit()
# <<< without hostname >>>
r = flask_client.get(
url_for("api.options_v3"), headers={"Authentication": api_key.code}
"/api/v4/alias/options", headers={"Authentication": api_key.code}
)
assert r.status_code == 200

View File

@ -1,39 +1,24 @@
import unicodedata
import pytest
from flask import url_for
from app.extensions import db
from app.models import User, AccountActivation
def test_auth_login_success_mfa_disabled(flask_client):
User.create(
email="abcd@gmail.com", password="password", name="Test User", activated=True
)
db.session.commit()
r = flask_client.post(
url_for("api.auth_login"),
json={
"email": "abcd@gmail.com",
"password": "password",
"device": "Test Device",
},
)
assert r.status_code == 200
assert r.json["api_key"]
assert r.json["email"]
assert not r.json["mfa_enabled"]
assert r.json["mfa_key"] is None
assert r.json["name"] == "Test User"
PASSWORD_1 = "Aurélie"
PASSWORD_2 = unicodedata.normalize("NFKD", PASSWORD_1)
assert PASSWORD_1 != PASSWORD_2
def test_auth_login_success_mfa_enabled(flask_client):
@pytest.mark.parametrize("mfa", (True, False), ids=("MFA", "no MFA"))
def test_auth_login_success(flask_client, mfa: bool):
User.create(
email="abcd@gmail.com",
password="password",
password=PASSWORD_1,
name="Test User",
activated=True,
enable_otp=True,
enable_otp=mfa,
)
db.session.commit()
@ -41,16 +26,23 @@ def test_auth_login_success_mfa_enabled(flask_client):
url_for("api.auth_login"),
json={
"email": "abcd@gmail.com",
"password": "password",
"password": PASSWORD_2,
"device": "Test Device",
},
)
assert r.status_code == 200
assert r.json["api_key"] is None
assert r.json["mfa_enabled"]
assert r.json["mfa_key"]
assert r.json["name"] == "Test User"
assert r.json["email"]
if mfa:
assert r.json["api_key"] is None
assert r.json["mfa_enabled"]
assert r.json["mfa_key"]
else:
assert r.json["api_key"]
assert not r.json["mfa_enabled"]
assert r.json["mfa_key"] is None
def test_auth_login_device_exist(flask_client):

View File

@ -31,6 +31,20 @@ def test_create_mailbox(flask_client):
assert r.json == {"error": "gmail.com invalid"}
def test_create_mailbox_fail_for_free_user(flask_client):
user = login(flask_client)
user.trial_end = None
db.session.commit()
r = flask_client.post(
"/api/mailboxes",
json={"email": "mailbox@gmail.com"},
)
assert r.status_code == 400
assert r.json == {"error": "Only premium plan can add additional mailbox"}
def test_delete_mailbox(flask_client):
user = login(flask_client)

View File

@ -9,37 +9,6 @@ from app.utils import random_word
from tests.utils import login
def test_v1(flask_client):
login(flask_client)
word = random_word()
suffix = f".{word}@{EMAIL_DOMAIN}"
r = flask_client.post(
"/api/alias/custom/new",
json={
"alias_prefix": "prefix",
"alias_suffix": suffix,
},
)
assert r.status_code == 201
assert r.json["alias"] == f"prefix.{word}@{EMAIL_DOMAIN}"
res = r.json
assert "id" in res
assert "email" in res
assert "creation_date" in res
assert "creation_timestamp" in res
assert "nb_forward" in res
assert "nb_block" in res
assert "nb_reply" in res
assert "enabled" in res
new_alias: Alias = Alias.get_by(email=r.json["alias"])
assert len(new_alias.mailboxes) == 1
def test_v2(flask_client):
login(flask_client)

View File

@ -27,6 +27,7 @@ from app.email_utils import (
should_disable,
decode_text,
parse_id_from_bounce,
get_queue_id,
)
from app.extensions import db
from app.models import User, CustomDomain, Alias, Contact, EmailLog
@ -720,3 +721,11 @@ def test_parse_id_from_bounce():
assert parse_id_from_bounce("bounces+1234+@local") == 1234
assert parse_id_from_bounce("anything+1234+@local") == 1234
assert parse_id_from_bounce(BOUNCE_EMAIL.format(1234)) == 1234
def test_get_queue_id():
msg = email.message_from_string(
"Received: from mail-wr1-x434.google.com (mail-wr1-x434.google.com [IPv6:2a00:1450:4864:20::434])\r\n\t(using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits))\r\n\t(No client certificate requested)\r\n\tby mx1.simplelogin.co (Postfix) with ESMTPS id 4FxQmw1DXdz2vK2\r\n\tfor <jglfdjgld@alias.com>; Fri, 4 Jun 2021 14:55:43 +0000 (UTC)"
)
assert get_queue_id(msg) == "4FxQmw1DXdz2vK2"