From 5dde39eb371b4253a55042002337d840e1997be2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Thu, 21 Apr 2022 18:40:24 +0200 Subject: [PATCH] Prevent free users from creating reverse-alias --- app/api/views/alias.py | 55 ++----- app/dashboard/views/alias_contact_manager.py | 134 ++++++++++-------- app/errors.py | 35 ++++- app/models.py | 11 ++ ...2_050317_98040e190381_add_flags_to_user.py | 29 ++++ pytest.ini | 2 +- .../dashboard/alias_contact_manager.html | 6 +- tests/api/test_alias.py | 58 +++++++- 8 files changed, 224 insertions(+), 106 deletions(-) create mode 100644 migrations/versions/2022_050317_98040e190381_add_flags_to_user.py diff --git a/app/api/views/alias.py b/app/api/views/alias.py index 8c1a4575..eef24ba1 100644 --- a/app/api/views/alias.py +++ b/app/api/views/alias.py @@ -1,6 +1,4 @@ from deprecated import deprecated -from flanker.addresslib import address -from flanker.addresslib.address import EmailAddress from flask import g from flask import jsonify from flask import request @@ -17,15 +15,16 @@ from app.api.serializer import ( get_alias_info_v2, get_alias_infos_with_pagination_v3, ) +from app.dashboard.views.alias_contact_manager import create_contact from app.dashboard.views.alias_log import get_alias_log from app.db import Session -from app.email_utils import ( - generate_reply_email, +from app.errors import ( + CannotCreateContactForReverseAlias, + ErrContactErrorUpgradeNeeded, + ErrContactAlreadyExists, + ErrAddressInvalid, ) -from app.errors import CannotCreateContactForReverseAlias -from app.log import LOG from app.models import Alias, Contact, Mailbox, AliasMailbox -from app.utils import sanitize_email @deprecated @@ -407,50 +406,26 @@ def create_contact_route(alias_id): Output: 201 if success 409 if contact already added - - """ data = request.get_json() if not data: return jsonify(error="request body cannot be empty"), 400 - user = g.user alias: Alias = Alias.get(alias_id) - if alias.user_id != user.id: + if alias.user_id != g.user.id: return jsonify(error="Forbidden"), 403 - contact_addr = data.get("contact") - - if not contact_addr: - return jsonify(error="Contact cannot be empty"), 400 - - full_address: EmailAddress = address.parse(contact_addr) - if not full_address: - return jsonify(error=f"invalid contact email {contact_addr}"), 400 - - contact_name, contact_email = full_address.display_name, full_address.address - - contact_email = sanitize_email(contact_email, not_lower=True) - - # already been added - contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) - if contact: - return jsonify(**serialize_contact(contact, existed=True)), 200 + contact_address = data.get("contact") try: - contact = Contact.create( - user_id=alias.user_id, - alias_id=alias.id, - website_email=contact_email, - name=contact_name, - reply_email=generate_reply_email(contact_email, user), - ) - except CannotCreateContactForReverseAlias: - return jsonify(error="You can't create contact for a reverse alias"), 400 - - LOG.d("create reverse-alias for %s %s", contact_addr, alias) - Session.commit() + contact = create_contact(g.user, alias, contact_address) + except ErrContactErrorUpgradeNeeded as err: + return jsonify(error=err.error_for_user()), 403 + except (ErrAddressInvalid, CannotCreateContactForReverseAlias) as err: + return jsonify(error=err.error_for_user()), 400 + except ErrContactAlreadyExists as err: + return jsonify(**serialize_contact(err.contact, existed=True)), 200 return jsonify(**serialize_contact(contact)), 201 diff --git a/app/dashboard/views/alias_contact_manager.py b/app/dashboard/views/alias_contact_manager.py index adaf8694..dbe78ea1 100644 --- a/app/dashboard/views/alias_contact_manager.py +++ b/app/dashboard/views/alias_contact_manager.py @@ -16,9 +16,14 @@ from app.email_utils import ( generate_reply_email, parse_full_address, ) -from app.errors import CannotCreateContactForReverseAlias +from app.errors import ( + CannotCreateContactForReverseAlias, + ErrContactErrorUpgradeNeeded, + ErrAddressInvalid, + ErrContactAlreadyExists, +) from app.log import LOG -from app.models import Alias, Contact, EmailLog +from app.models import Alias, Contact, EmailLog, User def email_validator(): @@ -44,6 +49,47 @@ def email_validator(): return _check +def create_contact( + user: User, alias: Alias, contact_address: str, fail_if_already_exist: bool = True +) -> Contact: + if not contact_address: + raise ErrAddressInvalid("Empty address") + try: + contact_name, contact_email = parse_full_address(contact_address) + except ValueError: + raise ErrAddressInvalid(contact_address) + + if not is_valid_email(contact_email): + raise ErrAddressInvalid(contact_email) + + contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) + if contact: + if fail_if_already_exist: + raise ErrContactAlreadyExists(contact) + return contact + + if not user.is_premium() and user.flags & User.FLAG_FREE_DISABLE_CREATE_ALIAS > 0: + raise ErrContactErrorUpgradeNeeded() + + contact = Contact.create( + user_id=alias.user_id, + alias_id=alias.id, + website_email=contact_email, + name=contact_name, + reply_email=generate_reply_email(contact_email, user), + ) + + LOG.d( + "create reverse-alias for %s %s, reverse alias:%s", + contact_address, + alias, + contact.reply_email, + ) + Session.commit() + + return contact + + class NewContactForm(FlaskForm): email = StringField( "Email", validators=[validators.DataRequired(), email_validator()] @@ -150,6 +196,21 @@ def get_contact_infos( return ret +def delete_contact(alias: Alias, contact_id: int): + contact = Contact.get(contact_id) + + if not contact: + flash("Unknown error. Refresh the page", "warning") + elif contact.alias_id != alias.id: + flash("You cannot delete reverse-alias", "warning") + else: + delete_contact_email = contact.website_email + Contact.delete(contact_id) + Session.commit() + + flash(f"Reverse-alias for {delete_contact_email} has been deleted", "success") + + @dashboard_bp.route("/alias_contact_manager//", methods=["GET", "POST"]) @login_required def alias_contact_manager(alias_id): @@ -179,45 +240,18 @@ def alias_contact_manager(alias_id): if request.method == "POST": if request.form.get("form-name") == "create": if new_contact_form.validate(): - contact_addr = new_contact_form.email.data.strip() - + contact_address = new_contact_form.email.data.strip() try: - contact_name, contact_email = parse_full_address(contact_addr) - except Exception: - flash(f"{contact_addr} is invalid", "error") + contact = create_contact(current_user, alias, contact_address) + except ( + ErrContactErrorUpgradeNeeded, + ErrAddressInvalid, + ErrContactAlreadyExists, + CannotCreateContactForReverseAlias, + ) as excp: + flash(excp.error_for_user(), "error") return redirect(request.url) - - if not is_valid_email(contact_email): - flash(f"{contact_email} is invalid", "error") - return redirect(request.url) - - contact = Contact.get_by(alias_id=alias.id, website_email=contact_email) - # already been added - if contact: - flash(f"{contact_email} is already added", "error") - return redirect(request.url) - - try: - contact = Contact.create( - user_id=alias.user_id, - alias_id=alias.id, - website_email=contact_email, - name=contact_name, - reply_email=generate_reply_email(contact_email, current_user), - ) - except CannotCreateContactForReverseAlias: - flash("You can't create contact for a reverse alias", "error") - return redirect(request.url) - - LOG.d( - "create reverse-alias for %s %s, reverse alias:%s", - contact_addr, - alias, - contact.reply_email, - ) - Session.commit() - flash(f"Reverse alias for {contact_addr} is created", "success") - + flash(f"Reverse alias for {contact_address} is created", "success") return redirect( url_for( "dashboard.alias_contact_manager", @@ -227,27 +261,7 @@ def alias_contact_manager(alias_id): ) elif request.form.get("form-name") == "delete": contact_id = request.form.get("contact-id") - contact = Contact.get(contact_id) - - if not contact: - flash("Unknown error. Refresh the page", "warning") - return redirect( - url_for("dashboard.alias_contact_manager", alias_id=alias_id) - ) - elif contact.alias_id != alias.id: - flash("You cannot delete reverse-alias", "warning") - return redirect( - url_for("dashboard.alias_contact_manager", alias_id=alias_id) - ) - - delete_contact_email = contact.website_email - Contact.delete(contact_id) - Session.commit() - - flash( - f"Reverse-alias for {delete_contact_email} has been deleted", "success" - ) - + delete_contact(alias, contact_id) return redirect( url_for("dashboard.alias_contact_manager", alias_id=alias_id) ) diff --git a/app/errors.py b/app/errors.py index f75e83d1..9d14d272 100644 --- a/app/errors.py +++ b/app/errors.py @@ -3,6 +3,10 @@ class SLException(Exception): super_str = super().__str__() return f"{type(self).__name__} {super_str}" + def error_for_user(self) -> str: + """By default send the exception errror to the user. Should be overloaded by the child exceptions""" + return str(self) + class AliasInTrashError(SLException): """raised when alias is deleted before""" @@ -25,7 +29,8 @@ class SubdomainInTrashError(SLException): class CannotCreateContactForReverseAlias(SLException): """raised when a contact is created that has website_email=reverse_alias of another contact""" - pass + def error_for_user(self) -> str: + return "You can't create contact for a reverse alias" class NonReverseAliasInReplyPhase(SLException): @@ -60,3 +65,31 @@ class MailSentFromReverseAlias(SLException): class ProtonPartnerNotSetUp(SLException): pass + + +class ErrContactErrorUpgradeNeeded(SLException): + """raised when user cannot create a contact because the plan doesn't allow it""" + + def error_for_user(self) -> str: + return f"Please upgrade to premium to create reverse-alias" + + +class ErrAddressInvalid(SLException): + """raised when an address is invalid""" + + def __init__(self, address: str): + self.address = address + + def error_for_user(self) -> str: + return f"{self.address} is not a valid email address" + + +class ErrContactAlreadyExists(SLException): + """raised when a contact already exists""" + + # TODO: type-hint this as a contact when models are almost dataclasses and don't import errors + def __init__(self, contact): + self.contact = contact + + def error_for_user(self) -> str: + return f"{self.contact.website_email} is already added" diff --git a/app/models.py b/app/models.py index d0bb193a..e57a8d08 100644 --- a/app/models.py +++ b/app/models.py @@ -292,6 +292,9 @@ class Fido(Base, ModelMixin): class User(Base, ModelMixin, UserMixin, PasswordOracle): __tablename__ = "users" + + FLAG_FREE_DISABLE_CREATE_ALIAS = 1 + email = sa.Column(sa.String(256), unique=True, nullable=False) name = sa.Column(sa.String(128), nullable=True) @@ -490,6 +493,14 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle): ), ) + # bitwise flags. Allow for future expansion + flags = sa.Column( + sa.BigInteger, + default=FLAG_FREE_DISABLE_CREATE_ALIAS, + server_default="0", + nullable=False, + ) + @property def directory_quota(self): return min( diff --git a/migrations/versions/2022_050317_98040e190381_add_flags_to_user.py b/migrations/versions/2022_050317_98040e190381_add_flags_to_user.py new file mode 100644 index 00000000..b32733c4 --- /dev/null +++ b/migrations/versions/2022_050317_98040e190381_add_flags_to_user.py @@ -0,0 +1,29 @@ +"""add flags to user + +Revision ID: 98040e190381 +Revises: 0aaad1740797 +Create Date: 2022-05-03 17:31:58.559032 + +""" +import sqlalchemy_utils +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '98040e190381' +down_revision = '0aaad1740797' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('users', sa.Column('flags', sa.BigInteger(), server_default='0', nullable=False)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('users', 'flags') + # ### end Alembic commands ### diff --git a/pytest.ini b/pytest.ini index 3d362baf..c0f5472c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,5 +1,5 @@ [pytest] -addopts = +xaddopts = --cov --cov-config coverage.ini --cov-report=html:htmlcov diff --git a/templates/dashboard/alias_contact_manager.html b/templates/dashboard/alias_contact_manager.html index 386a8989..5061406c 100644 --- a/templates/dashboard/alias_contact_manager.html +++ b/templates/dashboard/alias_contact_manager.html @@ -64,7 +64,11 @@
Where do you want to send the email?
- + {% if alias.user.is_premium() %} + + {% else %} + + {% endif %} diff --git a/tests/api/test_alias.py b/tests/api/test_alias.py index d69ea018..1d770baf 100644 --- a/tests/api/test_alias.py +++ b/tests/api/test_alias.py @@ -1,11 +1,12 @@ from flask import url_for +import arrow from app.config import PAGE_LIMIT from app.db import Session from app.email_utils import is_reverse_alias from app.models import User, ApiKey, Alias, Contact, EmailLog, Mailbox from tests.api.utils import get_new_user_and_api_key -from tests.utils import login +from tests.utils import login, random_domain def test_get_aliases_error_without_pagination(flask_client): @@ -527,6 +528,57 @@ def test_create_contact_route(flask_client): assert r.json["existed"] +def test_create_contact_route_invalid_alias(flask_client): + user, api_key = get_new_user_and_api_key() + other_user, other_api_key = get_new_user_and_api_key() + + alias = Alias.create_new_random(other_user) + Session.commit() + + r = flask_client.post( + url_for("api.create_contact_route", alias_id=alias.id), + headers={"Authentication": api_key.code}, + json={"contact": "First Last "}, + ) + + assert r.status_code == 403 + + +def test_create_contact_route_free_users(flask_client): + user, api_key = get_new_user_and_api_key() + + alias = Alias.create_new_random(user) + Session.commit() + # On trial, should be ok + r = flask_client.post( + url_for("api.create_contact_route", alias_id=alias.id), + headers={"Authentication": api_key.code}, + json={"contact": f"First Last "}, + ) + assert r.status_code == 201 + + # End trial but allow via flags for older free users + user.trial_end = arrow.now() + user.flags = 0 + Session.commit() + r = flask_client.post( + url_for("api.create_contact_route", alias_id=alias.id), + headers={"Authentication": api_key.code}, + json={"contact": f"First Last "}, + ) + assert r.status_code == 201 + + # End trial and disallow for new free users + user.flags = User.FLAG_FREE_DISABLE_CREATE_ALIAS + Session.commit() + r = flask_client.post( + url_for("api.create_contact_route", alias_id=alias.id), + headers={"Authentication": api_key.code}, + json={"contact": f"First Last "}, + ) + assert r.status_code == 403 + + def test_create_contact_route_empty_contact_address(flask_client): user = login(flask_client) alias = Alias.filter_by(user_id=user.id).first() @@ -537,7 +589,7 @@ def test_create_contact_route_empty_contact_address(flask_client): ) assert r.status_code == 400 - assert r.json["error"] == "Contact cannot be empty" + assert r.json["error"] == "Empty address is not a valid email address" def test_create_contact_route_invalid_contact_email(flask_client): @@ -550,7 +602,7 @@ def test_create_contact_route_invalid_contact_email(flask_client): ) assert r.status_code == 400 - assert r.json["error"] == "invalid contact email @gmail.com" + assert r.json["error"] == "@gmail.com is not a valid email address" def test_delete_contact(flask_client):