Merge pull request #921 from simple-login/ac-free-no-reverse-alias

Prevent free users from creating reverse-alias
This commit is contained in:
Adrià Casajús 2022-05-13 17:13:48 +02:00 committed by GitHub
commit 34ad81c7c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 286 additions and 126 deletions

View File

@ -1,6 +1,4 @@
from deprecated import deprecated
from flanker.addresslib import address
from flanker.addresslib.address import EmailAddress
from flask import g
from flask import jsonify
from flask import request
@ -17,15 +15,16 @@ from app.api.serializer import (
get_alias_info_v2,
get_alias_infos_with_pagination_v3,
)
from app.dashboard.views.alias_contact_manager import create_contact
from app.dashboard.views.alias_log import get_alias_log
from app.db import Session
from app.email_utils import (
generate_reply_email,
from app.errors import (
CannotCreateContactForReverseAlias,
ErrContactErrorUpgradeNeeded,
ErrContactAlreadyExists,
ErrAddressInvalid,
)
from app.errors import CannotCreateContactForReverseAlias
from app.log import LOG
from app.models import Alias, Contact, Mailbox, AliasMailbox
from app.utils import sanitize_email
@deprecated
@ -407,50 +406,26 @@ def create_contact_route(alias_id):
Output:
201 if success
409 if contact already added
"""
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
user = g.user
alias: Alias = Alias.get(alias_id)
if alias.user_id != user.id:
if alias.user_id != g.user.id:
return jsonify(error="Forbidden"), 403
contact_addr = data.get("contact")
if not contact_addr:
return jsonify(error="Contact cannot be empty"), 400
full_address: EmailAddress = address.parse(contact_addr)
if not full_address:
return jsonify(error=f"invalid contact email {contact_addr}"), 400
contact_name, contact_email = full_address.display_name, full_address.address
contact_email = sanitize_email(contact_email, not_lower=True)
# already been added
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
if contact:
return jsonify(**serialize_contact(contact, existed=True)), 200
contact_address = data.get("contact")
try:
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=contact_name,
reply_email=generate_reply_email(contact_email, user),
)
except CannotCreateContactForReverseAlias:
return jsonify(error="You can't create contact for a reverse alias"), 400
LOG.d("create reverse-alias for %s %s", contact_addr, alias)
Session.commit()
contact = create_contact(g.user, alias, contact_address)
except ErrContactErrorUpgradeNeeded as err:
return jsonify(error=err.error_for_user()), 403
except (ErrAddressInvalid, CannotCreateContactForReverseAlias) as err:
return jsonify(error=err.error_for_user()), 400
except ErrContactAlreadyExists as err:
return jsonify(**serialize_contact(err.contact, existed=True)), 200
return jsonify(**serialize_contact(contact)), 201

View File

@ -463,3 +463,5 @@ def setup_nameservers():
NAMESERVERS = setup_nameservers()
DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = False

View File

@ -8,7 +8,8 @@ from flask_wtf import FlaskForm
from sqlalchemy import and_, func, case
from wtforms import StringField, validators, ValidationError
from app.config import PAGE_LIMIT
# Need to import directly from config to allow modification from the tests
from app import config
from app.dashboard.base import dashboard_bp
from app.db import Session
from app.email_utils import (
@ -16,9 +17,15 @@ from app.email_utils import (
generate_reply_email,
parse_full_address,
)
from app.errors import CannotCreateContactForReverseAlias
from app.errors import (
CannotCreateContactForReverseAlias,
ErrContactErrorUpgradeNeeded,
ErrAddressInvalid,
ErrContactAlreadyExists,
)
from app.log import LOG
from app.models import Alias, Contact, EmailLog
from app.models import Alias, Contact, EmailLog, User
from app.utils import sanitize_email
def email_validator():
@ -44,6 +51,59 @@ def email_validator():
return _check
def user_can_create_contacts(user: User) -> bool:
if user.is_premium():
return True
if user.flags & User.FLAG_FREE_DISABLE_CREATE_ALIAS == 0:
return True
return not config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS
def create_contact(user: User, alias: Alias, contact_address: str) -> Contact:
"""
Create a contact for a user. Can be restricted for new free users by enabling DISABLE_CREATE_CONTACTS_FOR_FREE_USERS.
Can throw exceptions:
- ErrAddressInvalid
- ErrContactAlreadyExists
- ErrContactUpgradeNeeded - If DISABLE_CREATE_CONTACTS_FOR_FREE_USERS this exception will be raised for new free users
"""
if not contact_address:
raise ErrAddressInvalid("Empty address")
try:
contact_name, contact_email = parse_full_address(contact_address)
except ValueError:
raise ErrAddressInvalid(contact_address)
contact_email = sanitize_email(contact_email)
if not is_valid_email(contact_email):
raise ErrAddressInvalid(contact_email)
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
if contact:
raise ErrContactAlreadyExists(contact)
if not user_can_create_contacts(user):
raise ErrContactErrorUpgradeNeeded()
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=contact_name,
reply_email=generate_reply_email(contact_email, user),
)
LOG.d(
"create reverse-alias for %s %s, reverse alias:%s",
contact_address,
alias,
contact.reply_email,
)
Session.commit()
return contact
class NewContactForm(FlaskForm):
email = StringField(
"Email", validators=[validators.DataRequired(), email_validator()]
@ -135,7 +195,11 @@ def get_contact_infos(
],
else_=Contact.created_at,
)
q = q.order_by(latest_activity.desc()).limit(PAGE_LIMIT).offset(page * PAGE_LIMIT)
q = (
q.order_by(latest_activity.desc())
.limit(config.PAGE_LIMIT)
.offset(page * config.PAGE_LIMIT)
)
ret = []
for contact, latest_email_log, nb_reply, nb_forward in q:
@ -150,6 +214,21 @@ def get_contact_infos(
return ret
def delete_contact(alias: Alias, contact_id: int):
contact = Contact.get(contact_id)
if not contact:
flash("Unknown error. Refresh the page", "warning")
elif contact.alias_id != alias.id:
flash("You cannot delete reverse-alias", "warning")
else:
delete_contact_email = contact.website_email
Contact.delete(contact_id)
Session.commit()
flash(f"Reverse-alias for {delete_contact_email} has been deleted", "success")
@dashboard_bp.route("/alias_contact_manager/<alias_id>/", methods=["GET", "POST"])
@login_required
def alias_contact_manager(alias_id):
@ -179,45 +258,18 @@ def alias_contact_manager(alias_id):
if request.method == "POST":
if request.form.get("form-name") == "create":
if new_contact_form.validate():
contact_addr = new_contact_form.email.data.strip()
contact_address = new_contact_form.email.data.strip()
try:
contact_name, contact_email = parse_full_address(contact_addr)
except Exception:
flash(f"{contact_addr} is invalid", "error")
contact = create_contact(current_user, alias, contact_address)
except (
ErrContactErrorUpgradeNeeded,
ErrAddressInvalid,
ErrContactAlreadyExists,
CannotCreateContactForReverseAlias,
) as excp:
flash(excp.error_for_user(), "error")
return redirect(request.url)
if not is_valid_email(contact_email):
flash(f"{contact_email} is invalid", "error")
return redirect(request.url)
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
# already been added
if contact:
flash(f"{contact_email} is already added", "error")
return redirect(request.url)
try:
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=contact_name,
reply_email=generate_reply_email(contact_email, current_user),
)
except CannotCreateContactForReverseAlias:
flash("You can't create contact for a reverse alias", "error")
return redirect(request.url)
LOG.d(
"create reverse-alias for %s %s, reverse alias:%s",
contact_addr,
alias,
contact.reply_email,
)
Session.commit()
flash(f"Reverse alias for {contact_addr} is created", "success")
flash(f"Reverse alias for {contact_address} is created", "success")
return redirect(
url_for(
"dashboard.alias_contact_manager",
@ -227,27 +279,7 @@ def alias_contact_manager(alias_id):
)
elif request.form.get("form-name") == "delete":
contact_id = request.form.get("contact-id")
contact = Contact.get(contact_id)
if not contact:
flash("Unknown error. Refresh the page", "warning")
return redirect(
url_for("dashboard.alias_contact_manager", alias_id=alias_id)
)
elif contact.alias_id != alias.id:
flash("You cannot delete reverse-alias", "warning")
return redirect(
url_for("dashboard.alias_contact_manager", alias_id=alias_id)
)
delete_contact_email = contact.website_email
Contact.delete(contact_id)
Session.commit()
flash(
f"Reverse-alias for {delete_contact_email} has been deleted", "success"
)
delete_contact(alias, contact_id)
return redirect(
url_for("dashboard.alias_contact_manager", alias_id=alias_id)
)
@ -264,7 +296,7 @@ def alias_contact_manager(alias_id):
)
contact_infos = get_contact_infos(alias, page, query=query)
last_page = len(contact_infos) < PAGE_LIMIT
last_page = len(contact_infos) < config.PAGE_LIMIT
nb_contact = Contact.filter(Contact.alias_id == alias.id).count()
# if highlighted contact isn't included, fetch it
@ -286,4 +318,5 @@ def alias_contact_manager(alias_id):
last_page=last_page,
query=query,
nb_contact=nb_contact,
can_create_contacts=user_can_create_contacts(current_user),
)

View File

@ -3,6 +3,10 @@ class SLException(Exception):
super_str = super().__str__()
return f"{type(self).__name__} {super_str}"
def error_for_user(self) -> str:
"""By default send the exception errror to the user. Should be overloaded by the child exceptions"""
return str(self)
class AliasInTrashError(SLException):
"""raised when alias is deleted before"""
@ -25,7 +29,8 @@ class SubdomainInTrashError(SLException):
class CannotCreateContactForReverseAlias(SLException):
"""raised when a contact is created that has website_email=reverse_alias of another contact"""
pass
def error_for_user(self) -> str:
return "You can't create contact for a reverse alias"
class NonReverseAliasInReplyPhase(SLException):
@ -60,3 +65,31 @@ class MailSentFromReverseAlias(SLException):
class ProtonPartnerNotSetUp(SLException):
pass
class ErrContactErrorUpgradeNeeded(SLException):
"""raised when user cannot create a contact because the plan doesn't allow it"""
def error_for_user(self) -> str:
return f"Please upgrade to premium to create reverse-alias"
class ErrAddressInvalid(SLException):
"""raised when an address is invalid"""
def __init__(self, address: str):
self.address = address
def error_for_user(self) -> str:
return f"{self.address} is not a valid email address"
class ErrContactAlreadyExists(SLException):
"""raised when a contact already exists"""
# TODO: type-hint this as a contact when models are almost dataclasses and don't import errors
def __init__(self, contact: "Contact"): # noqa: F821
self.contact = contact
def error_for_user(self) -> str:
return f"{self.contact.website_email} is already added"

View File

@ -292,6 +292,9 @@ class Fido(Base, ModelMixin):
class User(Base, ModelMixin, UserMixin, PasswordOracle):
__tablename__ = "users"
FLAG_FREE_DISABLE_CREATE_ALIAS = 1
email = sa.Column(sa.String(256), unique=True, nullable=False)
name = sa.Column(sa.String(128), nullable=True)
@ -490,6 +493,14 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
),
)
# bitwise flags. Allow for future expansion
flags = sa.Column(
sa.BigInteger,
default=FLAG_FREE_DISABLE_CREATE_ALIAS,
server_default="0",
nullable=False,
)
@property
def directory_quota(self):
return min(

View File

@ -600,6 +600,14 @@ Return 200 and `existed=true` if contact is already added.
}
```
It can return 403 with an error if the user cannot create reverse alias.
``json
{
"error": "Please upgrade to create a reverse-alias"
}
```
### Mailbox endpoints
#### GET /api/v2/mailboxes

View File

@ -0,0 +1,29 @@
"""add flags to the user model
Revision ID: 088f23324464
Revises: e866ad0e78e1
Create Date: 2022-05-12 13:32:30.898367
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '088f23324464'
down_revision = 'e866ad0e78e1'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('users', sa.Column('flags', sa.BigInteger(), server_default='0', nullable=False))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('users', 'flags')
# ### end Alembic commands ###

10
scripts/new-migration.sh Normal file → Executable file
View File

@ -2,9 +2,11 @@
# To run it:
# sh scripts/new-migration.sh
container_name=sl-db-new-migration
# create a postgres database for SimpleLogin
docker rm -f sl-db
docker run -p 25432:5432 --name sl-db -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=sl -d postgres:13
docker rm -f ${container_name}
docker run -p 25432:5432 --name ${container_name} -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=sl -d postgres:13
# sleep a little bit for the db to be ready
sleep 3
@ -13,7 +15,7 @@ sleep 3
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl poetry run alembic upgrade head
# generate the migration script.
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl poetry run alembic revision --autogenerate
env DB_URI=postgresql://postgres:postgres@127.0.0.1:25432/sl poetry run alembic revision --autogenerate $@
# remove the db
docker rm -f sl-db
docker rm -f ${container_name}

View File

@ -64,7 +64,11 @@
<div class="small-text">
Where do you want to send the email?
</div>
<button class="btn btn-primary mt-2">Create reverse-alias</button>
{% if can_create_contacts %}
<button class="btn btn-primary mt-2">Create reverse-alias</button>
{% else %}
<button disabled title='Upgrade to premium to create reverse-aliases' class="btn btn-primary mt-2">Create reverse-alias</button>
{% endif %}
</form>
</div>

View File

@ -1,11 +1,13 @@
from flask import url_for
import arrow
from app.config import PAGE_LIMIT
# Need to import directly from config to allow modification from the tests
from app import config
from app.db import Session
from app.email_utils import is_reverse_alias
from app.models import User, ApiKey, Alias, Contact, EmailLog, Mailbox
from tests.api.utils import get_new_user_and_api_key
from tests.utils import login
from tests.utils import login, random_domain
def test_get_aliases_error_without_pagination(flask_client):
@ -42,17 +44,17 @@ def test_get_aliases_with_pagination(flask_client):
api_key = ApiKey.create(user.id, "for test")
Session.commit()
# create more aliases than PAGE_LIMIT
for _ in range(PAGE_LIMIT + 1):
# create more aliases than config.PAGE_LIMIT
for _ in range(config.PAGE_LIMIT + 1):
Alias.create_new_random(user)
Session.commit()
# get aliases on the 1st page, should return PAGE_LIMIT aliases
# get aliases on the 1st page, should return config.PAGE_LIMIT aliases
r = flask_client.get(
url_for("api.get_aliases", page_id=0), headers={"Authentication": api_key.code}
)
assert r.status_code == 200
assert len(r.json["aliases"]) == PAGE_LIMIT
assert len(r.json["aliases"]) == config.PAGE_LIMIT
# assert returned field
for a in r.json["aliases"]:
@ -67,7 +69,7 @@ def test_get_aliases_with_pagination(flask_client):
assert "note" in a
# get aliases on the 2nd page, should return 2 aliases
# as the total number of aliases is PAGE_LIMIT +2
# as the total number of aliases is config.PAGE_LIMIT +2
# 1 alias is created when user is created
r = flask_client.get(
url_for("api.get_aliases", page_id=1), headers={"Authentication": api_key.code}
@ -86,7 +88,7 @@ def test_get_aliases_query(flask_client):
api_key = ApiKey.create(user.id, "for test")
Session.commit()
# create more aliases than PAGE_LIMIT
# create more aliases than config.PAGE_LIMIT
Alias.create_new(user, "prefix1")
Alias.create_new(user, "prefix2")
Session.commit()
@ -277,7 +279,7 @@ def test_alias_activities(flask_client):
)
Session.commit()
for _ in range(int(PAGE_LIMIT / 2)):
for _ in range(int(config.PAGE_LIMIT / 2)):
EmailLog.create(
contact_id=contact.id,
is_reply=True,
@ -285,7 +287,7 @@ def test_alias_activities(flask_client):
alias_id=contact.alias_id,
)
for _ in range(int(PAGE_LIMIT / 2) + 2):
for _ in range(int(config.PAGE_LIMIT / 2) + 2):
EmailLog.create(
contact_id=contact.id,
blocked=True,
@ -299,7 +301,7 @@ def test_alias_activities(flask_client):
)
assert r.status_code == 200
assert len(r.json["activities"]) == PAGE_LIMIT
assert len(r.json["activities"]) == config.PAGE_LIMIT
for ac in r.json["activities"]:
assert ac["from"]
assert ac["to"]
@ -452,7 +454,7 @@ def test_alias_contacts(flask_client):
Session.commit()
# create some alias log
for i in range(PAGE_LIMIT + 1):
for i in range(config.PAGE_LIMIT + 1):
contact = Contact.create(
website_email=f"marketing-{i}@example.com",
reply_email=f"reply-{i}@a.b",
@ -472,7 +474,7 @@ def test_alias_contacts(flask_client):
r = flask_client.get(f"/api/aliases/{alias.id}/contacts?page_id=0")
assert r.status_code == 200
assert len(r.json["contacts"]) == PAGE_LIMIT
assert len(r.json["contacts"]) == config.PAGE_LIMIT
for ac in r.json["contacts"]:
assert ac["creation_date"]
assert ac["creation_timestamp"]
@ -527,6 +529,67 @@ def test_create_contact_route(flask_client):
assert r.json["existed"]
def test_create_contact_route_invalid_alias(flask_client):
user, api_key = get_new_user_and_api_key()
other_user, other_api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(other_user)
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": "First Last <first@example.com>"},
)
assert r.status_code == 403
def test_create_contact_route_free_users(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
# On trial, should be ok
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 201
# End trial but allow via flags for older free users
user.trial_end = arrow.now()
user.flags = 0
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 201
# End trial and disallow for new free users. Config should allow it
user.flags = User.FLAG_FREE_DISABLE_CREATE_ALIAS
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 201
# Set the global config to disable free users from create contacts
config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = True
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 403
config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = False
def test_create_contact_route_empty_contact_address(flask_client):
user = login(flask_client)
alias = Alias.filter_by(user_id=user.id).first()
@ -537,7 +600,7 @@ def test_create_contact_route_empty_contact_address(flask_client):
)
assert r.status_code == 400
assert r.json["error"] == "Contact cannot be empty"
assert r.json["error"] == "Empty address is not a valid email address"
def test_create_contact_route_invalid_contact_email(flask_client):
@ -550,7 +613,7 @@ def test_create_contact_route_invalid_contact_email(flask_client):
)
assert r.status_code == 400
assert r.json["error"] == "invalid contact email @gmail.com"
assert r.json["error"] == "@gmail.com is not a valid email address"
def test_delete_contact(flask_client):
@ -579,11 +642,11 @@ def test_delete_contact(flask_client):
def test_get_alias(flask_client):
user, api_key = get_new_user_and_api_key()
# create more aliases than PAGE_LIMIT
# create more aliases than config.PAGE_LIMIT
alias = Alias.create_new_random(user)
Session.commit()
# get aliases on the 1st page, should return PAGE_LIMIT aliases
# get aliases on the 1st page, should return config.PAGE_LIMIT aliases
r = flask_client.get(
url_for("api.get_alias", alias_id=alias.id),
headers={"Authentication": api_key.code},