Prevent free users from creating reverse-alias

This commit is contained in:
Adrià Casajús 2022-04-21 18:40:24 +02:00
parent b17af67614
commit 5dde39eb37
No known key found for this signature in database
GPG Key ID: F0033226A5AFC9B9
8 changed files with 224 additions and 106 deletions

View File

@ -1,6 +1,4 @@
from deprecated import deprecated
from flanker.addresslib import address
from flanker.addresslib.address import EmailAddress
from flask import g
from flask import jsonify
from flask import request
@ -17,15 +15,16 @@ from app.api.serializer import (
get_alias_info_v2,
get_alias_infos_with_pagination_v3,
)
from app.dashboard.views.alias_contact_manager import create_contact
from app.dashboard.views.alias_log import get_alias_log
from app.db import Session
from app.email_utils import (
generate_reply_email,
from app.errors import (
CannotCreateContactForReverseAlias,
ErrContactErrorUpgradeNeeded,
ErrContactAlreadyExists,
ErrAddressInvalid,
)
from app.errors import CannotCreateContactForReverseAlias
from app.log import LOG
from app.models import Alias, Contact, Mailbox, AliasMailbox
from app.utils import sanitize_email
@deprecated
@ -407,50 +406,26 @@ def create_contact_route(alias_id):
Output:
201 if success
409 if contact already added
"""
data = request.get_json()
if not data:
return jsonify(error="request body cannot be empty"), 400
user = g.user
alias: Alias = Alias.get(alias_id)
if alias.user_id != user.id:
if alias.user_id != g.user.id:
return jsonify(error="Forbidden"), 403
contact_addr = data.get("contact")
if not contact_addr:
return jsonify(error="Contact cannot be empty"), 400
full_address: EmailAddress = address.parse(contact_addr)
if not full_address:
return jsonify(error=f"invalid contact email {contact_addr}"), 400
contact_name, contact_email = full_address.display_name, full_address.address
contact_email = sanitize_email(contact_email, not_lower=True)
# already been added
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
if contact:
return jsonify(**serialize_contact(contact, existed=True)), 200
contact_address = data.get("contact")
try:
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=contact_name,
reply_email=generate_reply_email(contact_email, user),
)
except CannotCreateContactForReverseAlias:
return jsonify(error="You can't create contact for a reverse alias"), 400
LOG.d("create reverse-alias for %s %s", contact_addr, alias)
Session.commit()
contact = create_contact(g.user, alias, contact_address)
except ErrContactErrorUpgradeNeeded as err:
return jsonify(error=err.error_for_user()), 403
except (ErrAddressInvalid, CannotCreateContactForReverseAlias) as err:
return jsonify(error=err.error_for_user()), 400
except ErrContactAlreadyExists as err:
return jsonify(**serialize_contact(err.contact, existed=True)), 200
return jsonify(**serialize_contact(contact)), 201

View File

@ -16,9 +16,14 @@ from app.email_utils import (
generate_reply_email,
parse_full_address,
)
from app.errors import CannotCreateContactForReverseAlias
from app.errors import (
CannotCreateContactForReverseAlias,
ErrContactErrorUpgradeNeeded,
ErrAddressInvalid,
ErrContactAlreadyExists,
)
from app.log import LOG
from app.models import Alias, Contact, EmailLog
from app.models import Alias, Contact, EmailLog, User
def email_validator():
@ -44,6 +49,47 @@ def email_validator():
return _check
def create_contact(
user: User, alias: Alias, contact_address: str, fail_if_already_exist: bool = True
) -> Contact:
if not contact_address:
raise ErrAddressInvalid("Empty address")
try:
contact_name, contact_email = parse_full_address(contact_address)
except ValueError:
raise ErrAddressInvalid(contact_address)
if not is_valid_email(contact_email):
raise ErrAddressInvalid(contact_email)
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
if contact:
if fail_if_already_exist:
raise ErrContactAlreadyExists(contact)
return contact
if not user.is_premium() and user.flags & User.FLAG_FREE_DISABLE_CREATE_ALIAS > 0:
raise ErrContactErrorUpgradeNeeded()
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=contact_name,
reply_email=generate_reply_email(contact_email, user),
)
LOG.d(
"create reverse-alias for %s %s, reverse alias:%s",
contact_address,
alias,
contact.reply_email,
)
Session.commit()
return contact
class NewContactForm(FlaskForm):
email = StringField(
"Email", validators=[validators.DataRequired(), email_validator()]
@ -150,6 +196,21 @@ def get_contact_infos(
return ret
def delete_contact(alias: Alias, contact_id: int):
contact = Contact.get(contact_id)
if not contact:
flash("Unknown error. Refresh the page", "warning")
elif contact.alias_id != alias.id:
flash("You cannot delete reverse-alias", "warning")
else:
delete_contact_email = contact.website_email
Contact.delete(contact_id)
Session.commit()
flash(f"Reverse-alias for {delete_contact_email} has been deleted", "success")
@dashboard_bp.route("/alias_contact_manager/<alias_id>/", methods=["GET", "POST"])
@login_required
def alias_contact_manager(alias_id):
@ -179,45 +240,18 @@ def alias_contact_manager(alias_id):
if request.method == "POST":
if request.form.get("form-name") == "create":
if new_contact_form.validate():
contact_addr = new_contact_form.email.data.strip()
contact_address = new_contact_form.email.data.strip()
try:
contact_name, contact_email = parse_full_address(contact_addr)
except Exception:
flash(f"{contact_addr} is invalid", "error")
contact = create_contact(current_user, alias, contact_address)
except (
ErrContactErrorUpgradeNeeded,
ErrAddressInvalid,
ErrContactAlreadyExists,
CannotCreateContactForReverseAlias,
) as excp:
flash(excp.error_for_user(), "error")
return redirect(request.url)
if not is_valid_email(contact_email):
flash(f"{contact_email} is invalid", "error")
return redirect(request.url)
contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
# already been added
if contact:
flash(f"{contact_email} is already added", "error")
return redirect(request.url)
try:
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=contact_name,
reply_email=generate_reply_email(contact_email, current_user),
)
except CannotCreateContactForReverseAlias:
flash("You can't create contact for a reverse alias", "error")
return redirect(request.url)
LOG.d(
"create reverse-alias for %s %s, reverse alias:%s",
contact_addr,
alias,
contact.reply_email,
)
Session.commit()
flash(f"Reverse alias for {contact_addr} is created", "success")
flash(f"Reverse alias for {contact_address} is created", "success")
return redirect(
url_for(
"dashboard.alias_contact_manager",
@ -227,27 +261,7 @@ def alias_contact_manager(alias_id):
)
elif request.form.get("form-name") == "delete":
contact_id = request.form.get("contact-id")
contact = Contact.get(contact_id)
if not contact:
flash("Unknown error. Refresh the page", "warning")
return redirect(
url_for("dashboard.alias_contact_manager", alias_id=alias_id)
)
elif contact.alias_id != alias.id:
flash("You cannot delete reverse-alias", "warning")
return redirect(
url_for("dashboard.alias_contact_manager", alias_id=alias_id)
)
delete_contact_email = contact.website_email
Contact.delete(contact_id)
Session.commit()
flash(
f"Reverse-alias for {delete_contact_email} has been deleted", "success"
)
delete_contact(alias, contact_id)
return redirect(
url_for("dashboard.alias_contact_manager", alias_id=alias_id)
)

View File

@ -3,6 +3,10 @@ class SLException(Exception):
super_str = super().__str__()
return f"{type(self).__name__} {super_str}"
def error_for_user(self) -> str:
"""By default send the exception errror to the user. Should be overloaded by the child exceptions"""
return str(self)
class AliasInTrashError(SLException):
"""raised when alias is deleted before"""
@ -25,7 +29,8 @@ class SubdomainInTrashError(SLException):
class CannotCreateContactForReverseAlias(SLException):
"""raised when a contact is created that has website_email=reverse_alias of another contact"""
pass
def error_for_user(self) -> str:
return "You can't create contact for a reverse alias"
class NonReverseAliasInReplyPhase(SLException):
@ -60,3 +65,31 @@ class MailSentFromReverseAlias(SLException):
class ProtonPartnerNotSetUp(SLException):
pass
class ErrContactErrorUpgradeNeeded(SLException):
"""raised when user cannot create a contact because the plan doesn't allow it"""
def error_for_user(self) -> str:
return f"Please upgrade to premium to create reverse-alias"
class ErrAddressInvalid(SLException):
"""raised when an address is invalid"""
def __init__(self, address: str):
self.address = address
def error_for_user(self) -> str:
return f"{self.address} is not a valid email address"
class ErrContactAlreadyExists(SLException):
"""raised when a contact already exists"""
# TODO: type-hint this as a contact when models are almost dataclasses and don't import errors
def __init__(self, contact):
self.contact = contact
def error_for_user(self) -> str:
return f"{self.contact.website_email} is already added"

View File

@ -292,6 +292,9 @@ class Fido(Base, ModelMixin):
class User(Base, ModelMixin, UserMixin, PasswordOracle):
__tablename__ = "users"
FLAG_FREE_DISABLE_CREATE_ALIAS = 1
email = sa.Column(sa.String(256), unique=True, nullable=False)
name = sa.Column(sa.String(128), nullable=True)
@ -490,6 +493,14 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
),
)
# bitwise flags. Allow for future expansion
flags = sa.Column(
sa.BigInteger,
default=FLAG_FREE_DISABLE_CREATE_ALIAS,
server_default="0",
nullable=False,
)
@property
def directory_quota(self):
return min(

View File

@ -0,0 +1,29 @@
"""add flags to user
Revision ID: 98040e190381
Revises: 0aaad1740797
Create Date: 2022-05-03 17:31:58.559032
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '98040e190381'
down_revision = '0aaad1740797'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('users', sa.Column('flags', sa.BigInteger(), server_default='0', nullable=False))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('users', 'flags')
# ### end Alembic commands ###

View File

@ -1,5 +1,5 @@
[pytest]
addopts =
xaddopts =
--cov
--cov-config coverage.ini
--cov-report=html:htmlcov

View File

@ -64,7 +64,11 @@
<div class="small-text">
Where do you want to send the email?
</div>
<button class="btn btn-primary mt-2">Create reverse-alias</button>
{% if alias.user.is_premium() %}
<button class="btn btn-primary mt-2">Create reverse-alias</button>
{% else %}
<button disabled title='Upgrade to premium to create reverse-aliases' class="btn btn-primary mt-2">Create reverse-alias</button>
{% endif %}
</form>
</div>

View File

@ -1,11 +1,12 @@
from flask import url_for
import arrow
from app.config import PAGE_LIMIT
from app.db import Session
from app.email_utils import is_reverse_alias
from app.models import User, ApiKey, Alias, Contact, EmailLog, Mailbox
from tests.api.utils import get_new_user_and_api_key
from tests.utils import login
from tests.utils import login, random_domain
def test_get_aliases_error_without_pagination(flask_client):
@ -527,6 +528,57 @@ def test_create_contact_route(flask_client):
assert r.json["existed"]
def test_create_contact_route_invalid_alias(flask_client):
user, api_key = get_new_user_and_api_key()
other_user, other_api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(other_user)
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": "First Last <first@example.com>"},
)
assert r.status_code == 403
def test_create_contact_route_free_users(flask_client):
user, api_key = get_new_user_and_api_key()
alias = Alias.create_new_random(user)
Session.commit()
# On trial, should be ok
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 201
# End trial but allow via flags for older free users
user.trial_end = arrow.now()
user.flags = 0
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 201
# End trial and disallow for new free users
user.flags = User.FLAG_FREE_DISABLE_CREATE_ALIAS
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
headers={"Authentication": api_key.code},
json={"contact": f"First Last <first@{random_domain()}>"},
)
assert r.status_code == 403
def test_create_contact_route_empty_contact_address(flask_client):
user = login(flask_client)
alias = Alias.filter_by(user_id=user.id).first()
@ -537,7 +589,7 @@ def test_create_contact_route_empty_contact_address(flask_client):
)
assert r.status_code == 400
assert r.json["error"] == "Contact cannot be empty"
assert r.json["error"] == "Empty address is not a valid email address"
def test_create_contact_route_invalid_contact_email(flask_client):
@ -550,7 +602,7 @@ def test_create_contact_route_invalid_contact_email(flask_client):
)
assert r.status_code == 400
assert r.json["error"] == "invalid contact email @gmail.com"
assert r.json["error"] == "@gmail.com is not a valid email address"
def test_delete_contact(flask_client):