This reverts commit 851ba0a99a
.
This commit is contained in:
parent
851ba0a99a
commit
7b24cdd98a
|
@ -1,4 +1,4 @@
|
|||
from flask import request, redirect, url_for, flash, render_template
|
||||
from flask import request, redirect, url_for, flash, render_template, g
|
||||
from flask_login import login_user, current_user
|
||||
|
||||
from app import email_utils
|
||||
|
@ -11,7 +11,9 @@ from app.utils import sanitize_next_url
|
|||
|
||||
|
||||
@auth_bp.route("/activate", methods=["GET", "POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit(
|
||||
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
|
||||
)
|
||||
def activate():
|
||||
if current_user.is_authenticated:
|
||||
return (
|
||||
|
@ -24,6 +26,8 @@ def activate():
|
|||
activation_code: ActivationCode = ActivationCode.get_by(code=code)
|
||||
|
||||
if not activation_code:
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
return (
|
||||
render_template(
|
||||
"auth/activate.html", error="Activation code cannot be found"
|
||||
|
|
|
@ -11,6 +11,7 @@ from flask import (
|
|||
flash,
|
||||
session,
|
||||
make_response,
|
||||
g,
|
||||
)
|
||||
from flask_login import login_user
|
||||
from flask_wtf import FlaskForm
|
||||
|
@ -34,7 +35,9 @@ class FidoTokenForm(FlaskForm):
|
|||
|
||||
|
||||
@auth_bp.route("/fido", methods=["GET", "POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit(
|
||||
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
|
||||
)
|
||||
def fido():
|
||||
# passed from login page
|
||||
user_id = session.get(MFA_USER_ID)
|
||||
|
@ -62,6 +65,9 @@ def fido():
|
|||
flash(f"Welcome back!", "success")
|
||||
# Redirect user to correct page
|
||||
return redirect(next_url or url_for("dashboard.index"))
|
||||
else:
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
|
||||
# Handling POST requests
|
||||
if fido_token_form.validate_on_submit():
|
||||
|
@ -94,6 +100,8 @@ def fido():
|
|||
except Exception as e:
|
||||
LOG.w(f"An error occurred in WebAuthn verification process: {e}")
|
||||
flash("Key verification failed.", "warning")
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
auto_activate = False
|
||||
else:
|
||||
user.fido_sign_count = new_sign_count
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from flask import request, render_template, redirect, url_for, flash
|
||||
from flask import request, render_template, redirect, url_for, flash, g
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms import StringField, validators
|
||||
|
||||
|
@ -15,11 +15,16 @@ class ForgotPasswordForm(FlaskForm):
|
|||
|
||||
|
||||
@auth_bp.route("/forgot_password", methods=["GET", "POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit(
|
||||
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
|
||||
)
|
||||
def forgot_password():
|
||||
form = ForgotPasswordForm(request.form)
|
||||
|
||||
if form.validate_on_submit():
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
|
||||
email = sanitize_email(form.email.data)
|
||||
flash(
|
||||
"If your email is correct, you are going to receive an email to reset your password",
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from flask import request, render_template, redirect, url_for, flash
|
||||
from flask import request, render_template, redirect, url_for, flash, g
|
||||
from flask_login import current_user
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms import StringField, validators
|
||||
|
@ -19,7 +19,9 @@ class LoginForm(FlaskForm):
|
|||
|
||||
|
||||
@auth_bp.route("/login", methods=["GET", "POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit(
|
||||
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
|
||||
)
|
||||
def login():
|
||||
next_url = sanitize_next_url(request.args.get("next"))
|
||||
|
||||
|
@ -39,6 +41,8 @@ def login():
|
|||
user = User.filter_by(email=sanitize_email(form.email.data)).first()
|
||||
|
||||
if not user or not user.check_password(form.password.data):
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
form.password.data = None
|
||||
flash("Email or password incorrect", "error")
|
||||
LoginEvent(LoginEvent.ActionType.failed).send()
|
||||
|
|
|
@ -7,6 +7,7 @@ from flask import (
|
|||
session,
|
||||
make_response,
|
||||
request,
|
||||
g,
|
||||
)
|
||||
from flask_login import login_user
|
||||
from flask_wtf import FlaskForm
|
||||
|
@ -29,7 +30,9 @@ class OtpTokenForm(FlaskForm):
|
|||
|
||||
|
||||
@auth_bp.route("/mfa", methods=["GET", "POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit(
|
||||
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
|
||||
)
|
||||
def mfa():
|
||||
# passed from login page
|
||||
user_id = session.get(MFA_USER_ID)
|
||||
|
@ -55,6 +58,9 @@ def mfa():
|
|||
flash(f"Welcome back!", "success")
|
||||
# Redirect user to correct page
|
||||
return redirect(next_url or url_for("dashboard.index"))
|
||||
else:
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
|
||||
if otp_token_form.validate_on_submit():
|
||||
totp = pyotp.TOTP(user.otp_secret)
|
||||
|
@ -88,6 +94,8 @@ def mfa():
|
|||
|
||||
else:
|
||||
flash("Incorrect token", "warning")
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
otp_token_form.token.data = None
|
||||
send_invalid_totp_login_email(user, "TOTP")
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import arrow
|
||||
from flask import request, render_template, redirect, url_for, flash, session
|
||||
from flask import request, render_template, redirect, url_for, flash, session, g
|
||||
from flask_login import login_user
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms import StringField, validators
|
||||
|
@ -19,7 +19,9 @@ class RecoveryForm(FlaskForm):
|
|||
|
||||
|
||||
@auth_bp.route("/recovery", methods=["GET", "POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit(
|
||||
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
|
||||
)
|
||||
def recovery_route():
|
||||
# passed from login page
|
||||
user_id = session.get(MFA_USER_ID)
|
||||
|
@ -44,6 +46,8 @@ def recovery_route():
|
|||
|
||||
if recovery_code:
|
||||
if recovery_code.used:
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
flash("Code already used", "error")
|
||||
else:
|
||||
del session[MFA_USER_ID]
|
||||
|
@ -63,6 +67,8 @@ def recovery_route():
|
|||
LOG.d("redirect user to dashboard")
|
||||
return redirect(url_for("dashboard.index"))
|
||||
else:
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
flash("Incorrect code", "error")
|
||||
send_invalid_totp_login_email(user, "recovery")
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import uuid
|
||||
|
||||
from flask import request, flash, render_template, url_for
|
||||
from flask import request, flash, render_template, url_for, g
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms import StringField, validators
|
||||
|
||||
|
@ -19,7 +19,9 @@ class ResetPasswordForm(FlaskForm):
|
|||
|
||||
|
||||
@auth_bp.route("/reset_password", methods=["GET", "POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit(
|
||||
"10/minute", deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit
|
||||
)
|
||||
def reset_password():
|
||||
form = ResetPasswordForm(request.form)
|
||||
|
||||
|
@ -30,6 +32,8 @@ def reset_password():
|
|||
)
|
||||
|
||||
if not reset_password_code:
|
||||
# Trigger rate limiter
|
||||
g.deduct_limit = True
|
||||
error = (
|
||||
"The reset password link can be used only once. "
|
||||
"Please request a new link to reset password."
|
||||
|
|
|
@ -8,6 +8,7 @@ from urllib.parse import urlparse
|
|||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
ROOT_DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
|
||||
|
||||
|
||||
|
@ -522,5 +523,3 @@ if ENABLE_ALL_REVERSE_ALIAS_REPLACEMENT:
|
|||
MAX_NB_REVERSE_ALIAS_REPLACEMENT = int(
|
||||
os.environ["MAX_NB_REVERSE_ALIAS_REPLACEMENT"]
|
||||
)
|
||||
|
||||
DISABLE_RATE_LIMIT = "DISABLE_RATE_LIMIT" in os.environ
|
||||
|
|
|
@ -3,7 +3,7 @@ import urllib.parse
|
|||
from typing import Union
|
||||
|
||||
import requests
|
||||
from flask import render_template, request, flash, url_for, redirect
|
||||
from flask import render_template, request, flash, url_for, redirect, g
|
||||
from flask_login import login_required, current_user
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
|
@ -83,6 +83,7 @@ def create_zendesk_request(email: str, content: str, files: [FileStorage]) -> bo
|
|||
@limiter.limit(
|
||||
"2/hour",
|
||||
methods=["POST"],
|
||||
deduct_when=lambda r: hasattr(g, "deduct_limit") and g.deduct_limit,
|
||||
)
|
||||
def support_route():
|
||||
if not ZENDESK_HOST:
|
||||
|
@ -112,6 +113,8 @@ def support_route():
|
|||
"dashboard/support.html", ticket_email=email, ticket_content=content
|
||||
)
|
||||
|
||||
# only enable rate limiting for successful Zendesk ticket creation
|
||||
g.deduct_limit = True
|
||||
flash(
|
||||
"Support ticket is created. You will receive an email about its status.",
|
||||
"success",
|
||||
|
|
|
@ -2,8 +2,6 @@ from flask_limiter import Limiter
|
|||
from flask_limiter.util import get_remote_address
|
||||
from flask_login import current_user, LoginManager
|
||||
|
||||
from app import config
|
||||
|
||||
login_manager = LoginManager()
|
||||
login_manager.session_protection = "strong"
|
||||
|
||||
|
@ -22,12 +20,6 @@ def __key_func():
|
|||
# Setup rate limit facility
|
||||
limiter = Limiter(key_func=__key_func)
|
||||
|
||||
|
||||
@limiter.request_filter
|
||||
def disable_rate_limit():
|
||||
return config.DISABLE_RATE_LIMIT
|
||||
|
||||
|
||||
# @limiter.request_filter
|
||||
# def ip_whitelist():
|
||||
# # Uncomment line to test rate limit in dev environment
|
||||
|
|
|
@ -114,7 +114,7 @@ def test_import_no_mailboxes_no_domains(flask_client):
|
|||
"ebay@my-domain.com,Used on eBay",
|
||||
'facebook@my-domain.com,"Used on Facebook, Instagram."',
|
||||
]
|
||||
file = File.create(path=f"/{random_token()}", commit=True)
|
||||
file = File.create(path="/test", commit=True)
|
||||
batch_import = BatchImport.create(user_id=user.id, file_id=file.id, commit=True)
|
||||
|
||||
import_from_csv(batch_import, user, alias_data)
|
||||
|
@ -130,18 +130,19 @@ def test_import_no_mailboxes(flask_client):
|
|||
# Check start state
|
||||
assert len(Alias.filter_by(user_id=user.id).all()) == 1 # Onboarding alias
|
||||
|
||||
domain = random_domain()
|
||||
# Create domain
|
||||
CustomDomain.create(user_id=user.id, domain=domain, ownership_verified=True)
|
||||
CustomDomain.create(
|
||||
user_id=user.id, domain="my-domain.com", ownership_verified=True
|
||||
)
|
||||
Session.commit()
|
||||
|
||||
alias_data = [
|
||||
"alias,note",
|
||||
f"ebay@{domain},Used on eBay",
|
||||
f'facebook@{domain},"Used on Facebook, Instagram."',
|
||||
"ebay@my-domain.com,Used on eBay",
|
||||
'facebook@my-domain.com,"Used on Facebook, Instagram."',
|
||||
]
|
||||
|
||||
file = File.create(path=f"/{random_token()}", commit=True)
|
||||
file = File.create(path="/test", commit=True)
|
||||
batch_import = BatchImport.create(user_id=user.id, file_id=file.id)
|
||||
|
||||
import_from_csv(batch_import, user, alias_data)
|
||||
|
@ -162,7 +163,7 @@ def test_import_no_domains(flask_client):
|
|||
'facebook@my-domain.com,"Used on Facebook, Instagram.",destination1@my-destination-domain.com destination2@my-destination-domain.com',
|
||||
]
|
||||
|
||||
file = File.create(path=f"/{random_token()}", commit=True)
|
||||
file = File.create(path="/test", commit=True)
|
||||
batch_import = BatchImport.create(user_id=user.id, file_id=file.id)
|
||||
|
||||
import_from_csv(batch_import, user, alias_data)
|
||||
|
@ -178,29 +179,31 @@ def test_import(flask_client):
|
|||
# Check start state
|
||||
assert len(Alias.filter_by(user_id=user.id).all()) == 1 # Onboarding alias
|
||||
|
||||
domain1 = random_domain()
|
||||
domain2 = random_domain()
|
||||
# Create domains
|
||||
CustomDomain.create(user_id=user.id, domain=domain1, ownership_verified=True)
|
||||
CustomDomain.create(user_id=user.id, domain=domain2, ownership_verified=True)
|
||||
CustomDomain.create(
|
||||
user_id=user.id, domain="my-domain.com", ownership_verified=True
|
||||
)
|
||||
CustomDomain.create(
|
||||
user_id=user.id, domain="my-destination-domain.com", ownership_verified=True
|
||||
)
|
||||
Session.commit()
|
||||
|
||||
# Create mailboxes
|
||||
mailbox1 = Mailbox.create(
|
||||
user_id=user.id, email=f"destination@{domain2}", verified=True
|
||||
user_id=user.id, email="destination@my-destination-domain.com", verified=True
|
||||
)
|
||||
mailbox2 = Mailbox.create(
|
||||
user_id=user.id, email=f"destination2@{domain2}", verified=True
|
||||
user_id=user.id, email="destination2@my-destination-domain.com", verified=True
|
||||
)
|
||||
Session.commit()
|
||||
|
||||
alias_data = [
|
||||
"alias,note,mailboxes",
|
||||
f"ebay@{domain1},Used on eBay,destination@{domain2}",
|
||||
f'facebook@{domain1},"Used on Facebook, Instagram.",destination@{domain2} destination2@{domain2}',
|
||||
"ebay@my-domain.com,Used on eBay,destination@my-destination-domain.com",
|
||||
'facebook@my-domain.com,"Used on Facebook, Instagram.",destination@my-destination-domain.com destination2@my-destination-domain.com',
|
||||
]
|
||||
|
||||
file = File.create(path=f"/{random_token()}", commit=True)
|
||||
file = File.create(path="/test", commit=True)
|
||||
batch_import = BatchImport.create(user_id=user.id, file_id=file.id)
|
||||
|
||||
import_from_csv(batch_import, user, alias_data)
|
||||
|
@ -211,7 +214,7 @@ def test_import(flask_client):
|
|||
# aliases[0] is the onboarding alias, skip it
|
||||
|
||||
# eBay alias
|
||||
assert aliases[1].email == f"ebay@{domain1}"
|
||||
assert aliases[1].email == "ebay@my-domain.com"
|
||||
assert len(aliases[1].mailboxes) == 1
|
||||
# First one should be primary
|
||||
assert aliases[1].mailbox_id == mailbox1.id
|
||||
|
@ -219,7 +222,7 @@ def test_import(flask_client):
|
|||
assert aliases[1].mailboxes[0] == mailbox1
|
||||
|
||||
# Facebook alias
|
||||
assert aliases[2].email == f"facebook@{domain1}"
|
||||
assert aliases[2].email == "facebook@my-domain.com"
|
||||
assert len(aliases[2].mailboxes) == 2
|
||||
# First one should be primary
|
||||
assert aliases[2].mailbox_id == mailbox1.id
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
from flask import g
|
||||
|
||||
from app import config
|
||||
from app.alias_suffix import signer
|
||||
from app.alias_utils import delete_alias
|
||||
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
|
||||
from app.db import Session
|
||||
from app.models import Alias, CustomDomain, Mailbox, AliasUsedOn
|
||||
from app.utils import random_word
|
||||
from tests.utils import login, random_domain, random_token
|
||||
from tests.utils import login, random_domain
|
||||
|
||||
|
||||
def test_v2(flask_client):
|
||||
|
@ -93,14 +92,12 @@ def test_full_payload(flask_client):
|
|||
suffix = f".{word}@{EMAIL_DOMAIN}"
|
||||
signed_suffix = signer.sign(suffix).decode()
|
||||
|
||||
prefix = random_token()
|
||||
|
||||
assert AliasUsedOn.filter(AliasUsedOn.user_id == user.id).count() == 0
|
||||
assert AliasUsedOn.count() == 0
|
||||
|
||||
r = flask_client.post(
|
||||
"/api/v3/alias/custom/new?hostname=example.com",
|
||||
json={
|
||||
"alias_prefix": prefix,
|
||||
"alias_prefix": "prefix",
|
||||
"signed_suffix": signed_suffix,
|
||||
"note": "test note",
|
||||
"mailbox_ids": [user.default_mailbox_id, mb.id],
|
||||
|
@ -109,7 +106,7 @@ def test_full_payload(flask_client):
|
|||
)
|
||||
|
||||
assert r.status_code == 201
|
||||
assert r.json["alias"] == f"{prefix}.{word}@{EMAIL_DOMAIN}"
|
||||
assert r.json["alias"] == f"prefix.{word}@{EMAIL_DOMAIN}"
|
||||
|
||||
# assert returned field
|
||||
res = r.json
|
||||
|
@ -120,7 +117,7 @@ def test_full_payload(flask_client):
|
|||
assert new_alias.note == "test note"
|
||||
assert len(new_alias.mailboxes) == 2
|
||||
|
||||
alias_used_on = AliasUsedOn.filter(AliasUsedOn.user_id == user.id).first()
|
||||
alias_used_on = AliasUsedOn.first()
|
||||
assert alias_used_on.alias_id == new_alias.id
|
||||
assert alias_used_on.hostname == "example.com"
|
||||
|
||||
|
@ -253,8 +250,6 @@ def test_cannot_create_alias_in_trash(flask_client):
|
|||
|
||||
|
||||
def test_too_many_requests(flask_client):
|
||||
config.DISABLE_RATE_LIMIT = False
|
||||
|
||||
user = login(flask_client)
|
||||
|
||||
# create a custom domain
|
||||
|
|
|
@ -2,7 +2,6 @@ import uuid
|
|||
|
||||
from flask import url_for, g
|
||||
|
||||
from app import config
|
||||
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
|
||||
from app.db import Session
|
||||
from app.models import Alias, CustomDomain, AliasUsedOn
|
||||
|
@ -123,7 +122,6 @@ def test_out_of_quota(flask_client):
|
|||
|
||||
|
||||
def test_too_many_requests(flask_client):
|
||||
config.DISABLE_RATE_LIMIT = False
|
||||
login(flask_client)
|
||||
|
||||
# can't create more than 5 aliases in 1 minute
|
||||
|
|
|
@ -42,22 +42,15 @@ def flask_app():
|
|||
yield app
|
||||
|
||||
|
||||
from app import config
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def flask_client():
|
||||
transaction = connection.begin()
|
||||
|
||||
with app.app_context():
|
||||
# disable rate limit during test
|
||||
config.DISABLE_RATE_LIMIT = True
|
||||
try:
|
||||
client = app.test_client()
|
||||
yield client
|
||||
finally:
|
||||
# disable rate limit again as some tests might enable rate limit
|
||||
config.DISABLE_RATE_LIMIT = True
|
||||
# roll back all commits made during a test
|
||||
transaction.rollback()
|
||||
Session.rollback()
|
||||
|
|
|
@ -17,7 +17,7 @@ def test_api_key_page_requires_password(flask_client):
|
|||
|
||||
def test_create_delete_api_key(flask_client):
|
||||
user = login(flask_client)
|
||||
nb_api_key = ApiKey.count()
|
||||
Session.commit()
|
||||
|
||||
# to bypass sudo mode
|
||||
with flask_client.session_transaction() as session:
|
||||
|
@ -31,7 +31,7 @@ def test_create_delete_api_key(flask_client):
|
|||
)
|
||||
assert create_r.status_code == 200
|
||||
api_key = ApiKey.get_by(user_id=user.id)
|
||||
assert ApiKey.filter(ApiKey.user_id == user.id).count() == 1
|
||||
assert ApiKey.count() == 1
|
||||
assert api_key.name == "for test"
|
||||
|
||||
# delete api_key
|
||||
|
@ -41,12 +41,10 @@ def test_create_delete_api_key(flask_client):
|
|||
follow_redirects=True,
|
||||
)
|
||||
assert delete_r.status_code == 200
|
||||
assert ApiKey.count() == nb_api_key
|
||||
assert ApiKey.count() == 0
|
||||
|
||||
|
||||
def test_delete_all_api_keys(flask_client):
|
||||
nb_api_keys = ApiKey.count()
|
||||
|
||||
# create two test users
|
||||
user_1 = login(flask_client)
|
||||
user_2 = User.create(
|
||||
|
@ -61,7 +59,7 @@ def test_delete_all_api_keys(flask_client):
|
|||
Session.commit()
|
||||
|
||||
assert (
|
||||
ApiKey.count() == nb_api_keys + 3
|
||||
ApiKey.count() == 3
|
||||
) # assert that the total number of API keys for all users is 3.
|
||||
# assert that each user has the API keys created
|
||||
assert ApiKey.filter(ApiKey.user_id == user_1.id).count() == 2
|
||||
|
@ -79,7 +77,7 @@ def test_delete_all_api_keys(flask_client):
|
|||
)
|
||||
assert r.status_code == 200
|
||||
assert (
|
||||
ApiKey.count() == nb_api_keys + 1
|
||||
ApiKey.count() == 1
|
||||
) # assert that the total number of API keys for all users is now 1.
|
||||
assert (
|
||||
ApiKey.filter(ApiKey.user_id == user_1.id).count() == 0
|
||||
|
|
|
@ -2,7 +2,6 @@ from random import random
|
|||
|
||||
from flask import url_for, g
|
||||
|
||||
from app import config
|
||||
from app.alias_suffix import (
|
||||
get_alias_suffixes,
|
||||
AliasSuffix,
|
||||
|
@ -329,7 +328,6 @@ def test_add_alias_in_custom_domain_trash(flask_client):
|
|||
|
||||
|
||||
def test_too_many_requests(flask_client):
|
||||
config.DISABLE_RATE_LIMIT = False
|
||||
user = login(flask_client)
|
||||
|
||||
# create a custom domain
|
||||
|
|
|
@ -61,7 +61,6 @@ def test_create_directory_in_trash(flask_client):
|
|||
|
||||
def test_create_directory_out_of_quota(flask_client):
|
||||
user = login(flask_client)
|
||||
nb_directory = Directory.count()
|
||||
|
||||
for i in range(MAX_NB_DIRECTORY - Directory.count()):
|
||||
Directory.create(name=f"test{i}", user_id=user.id, commit=True)
|
||||
|
@ -75,4 +74,4 @@ def test_create_directory_out_of_quota(flask_client):
|
|||
)
|
||||
|
||||
# no new directory is created
|
||||
assert Directory.count() == MAX_NB_DIRECTORY + nb_directory
|
||||
assert Directory.count() == MAX_NB_DIRECTORY
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from flask import url_for, g
|
||||
|
||||
from app import config
|
||||
from app.models import (
|
||||
Alias,
|
||||
)
|
||||
|
@ -21,7 +20,6 @@ def test_create_random_alias_success(flask_client):
|
|||
|
||||
|
||||
def test_too_many_requests(flask_client):
|
||||
config.DISABLE_RATE_LIMIT = False
|
||||
login(flask_client)
|
||||
|
||||
# can't create more than 5 aliases in 1 minute
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
from flask import g
|
||||
from http import HTTPStatus
|
||||
from random import Random
|
||||
|
||||
from flask import g
|
||||
|
||||
from app import config
|
||||
from app.extensions import limiter
|
||||
from tests.conftest import app as test_app
|
||||
from tests.utils import login
|
||||
from tests.conftest import app as test_app
|
||||
|
||||
# IMPORTANT NOTICE
|
||||
# ----------------
|
||||
|
@ -43,7 +41,6 @@ def request_headers(source_ip: str) -> dict:
|
|||
|
||||
|
||||
def test_rate_limit_limits_by_source_ip(flask_client):
|
||||
config.DISABLE_RATE_LIMIT = False
|
||||
source_ip = random_ip()
|
||||
|
||||
for _ in range(_MAX_PER_MINUTE):
|
||||
|
@ -62,7 +59,6 @@ def test_rate_limit_limits_by_source_ip(flask_client):
|
|||
|
||||
|
||||
def test_rate_limit_limits_by_user_id(flask_client):
|
||||
config.DISABLE_RATE_LIMIT = False
|
||||
# Login with a user
|
||||
login(flask_client)
|
||||
fix_rate_limit_after_request()
|
||||
|
@ -79,7 +75,6 @@ def test_rate_limit_limits_by_user_id(flask_client):
|
|||
|
||||
|
||||
def test_rate_limit_limits_by_user_id_ignoring_ip(flask_client):
|
||||
config.DISABLE_RATE_LIMIT = False
|
||||
source_ip = random_ip()
|
||||
|
||||
# Login with a user
|
||||
|
|
|
@ -18,7 +18,7 @@ from app.models import (
|
|||
PlanEnum,
|
||||
PADDLE_SUBSCRIPTION_GRACE_DAYS,
|
||||
)
|
||||
from tests.utils import login, create_new_user, random_token
|
||||
from tests.utils import login, create_new_user
|
||||
|
||||
|
||||
def test_generate_email(flask_client):
|
||||
|
@ -71,48 +71,45 @@ def test_website_send_to(flask_client):
|
|||
alias = Alias.create_new_random(user)
|
||||
Session.commit()
|
||||
|
||||
prefix = random_token()
|
||||
|
||||
# non-empty name
|
||||
c1 = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=f"{prefix}@example.com",
|
||||
website_email="abcd@example.com",
|
||||
reply_email="rep@SL",
|
||||
name="First Last",
|
||||
)
|
||||
assert c1.website_send_to() == f'"First Last | {prefix} at example.com" <rep@SL>'
|
||||
assert c1.website_send_to() == '"First Last | abcd at example.com" <rep@SL>'
|
||||
|
||||
# empty name, ascii website_from, easy case
|
||||
c1.name = None
|
||||
c1.website_from = f"First Last <{prefix}@example.com>"
|
||||
assert c1.website_send_to() == f'"First Last | {prefix} at example.com" <rep@SL>'
|
||||
c1.website_from = "First Last <abcd@example.com>"
|
||||
assert c1.website_send_to() == '"First Last | abcd at example.com" <rep@SL>'
|
||||
|
||||
# empty name, RFC 2047 website_from
|
||||
c1.name = None
|
||||
c1.website_from = f"=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <{prefix}@example.com>"
|
||||
assert c1.website_send_to() == f'"Nhơn Nguyễn | {prefix} at example.com" <rep@SL>'
|
||||
c1.website_from = "=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <abcd@example.com>"
|
||||
assert c1.website_send_to() == '"Nhơn Nguyễn | abcd at example.com" <rep@SL>'
|
||||
|
||||
|
||||
def test_new_addr_default_sender_format(flask_client):
|
||||
user = login(flask_client)
|
||||
alias = Alias.first()
|
||||
prefix = random_token()
|
||||
|
||||
contact = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=f"{prefix}@example.com",
|
||||
website_email="abcd@example.com",
|
||||
reply_email="rep@SL",
|
||||
name="First Last",
|
||||
commit=True,
|
||||
)
|
||||
|
||||
assert contact.new_addr() == f'"First Last - {prefix} at example.com" <rep@SL>'
|
||||
assert contact.new_addr() == '"First Last - abcd at example.com" <rep@SL>'
|
||||
|
||||
# Make sure email isn't duplicated if sender name equals email
|
||||
contact.name = f"{prefix}@example.com"
|
||||
assert contact.new_addr() == f'"{prefix} at example.com" <rep@SL>'
|
||||
contact.name = "abcd@example.com"
|
||||
assert contact.new_addr() == '"abcd at example.com" <rep@SL>'
|
||||
|
||||
|
||||
def test_new_addr_a_sender_format(flask_client):
|
||||
|
@ -120,18 +117,17 @@ def test_new_addr_a_sender_format(flask_client):
|
|||
user.sender_format = SenderFormatEnum.A.value
|
||||
Session.commit()
|
||||
alias = Alias.first()
|
||||
prefix = random_token()
|
||||
|
||||
contact = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=f"{prefix}@example.com",
|
||||
website_email="abcd@example.com",
|
||||
reply_email="rep@SL",
|
||||
name="First Last",
|
||||
commit=True,
|
||||
)
|
||||
|
||||
assert contact.new_addr() == f'"First Last - {prefix}(a)example.com" <rep@SL>'
|
||||
assert contact.new_addr() == '"First Last - abcd(a)example.com" <rep@SL>'
|
||||
|
||||
|
||||
def test_new_addr_no_name_sender_format(flask_client):
|
||||
|
@ -139,12 +135,11 @@ def test_new_addr_no_name_sender_format(flask_client):
|
|||
user.sender_format = SenderFormatEnum.NO_NAME.value
|
||||
Session.commit()
|
||||
alias = Alias.first()
|
||||
prefix = random_token()
|
||||
|
||||
contact = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=f"{prefix}@example.com",
|
||||
website_email="abcd@example.com",
|
||||
reply_email="rep@SL",
|
||||
name="First Last",
|
||||
commit=True,
|
||||
|
@ -158,12 +153,11 @@ def test_new_addr_name_only_sender_format(flask_client):
|
|||
user.sender_format = SenderFormatEnum.NAME_ONLY.value
|
||||
Session.commit()
|
||||
alias = Alias.first()
|
||||
prefix = random_token()
|
||||
|
||||
contact = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=f"{prefix}@example.com",
|
||||
website_email="abcd@example.com",
|
||||
reply_email="rep@SL",
|
||||
name="First Last",
|
||||
commit=True,
|
||||
|
@ -177,29 +171,27 @@ def test_new_addr_at_only_sender_format(flask_client):
|
|||
user.sender_format = SenderFormatEnum.AT_ONLY.value
|
||||
Session.commit()
|
||||
alias = Alias.first()
|
||||
prefix = random_token()
|
||||
|
||||
contact = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=f"{prefix}@example.com",
|
||||
website_email="abcd@example.com",
|
||||
reply_email="rep@SL",
|
||||
name="First Last",
|
||||
commit=True,
|
||||
)
|
||||
|
||||
assert contact.new_addr() == f'"{prefix} at example.com" <rep@SL>'
|
||||
assert contact.new_addr() == '"abcd at example.com" <rep@SL>'
|
||||
|
||||
|
||||
def test_new_addr_unicode(flask_client):
|
||||
user = login(flask_client)
|
||||
alias = Alias.first()
|
||||
|
||||
random_prefix = random_token()
|
||||
contact = Contact.create(
|
||||
user_id=user.id,
|
||||
alias_id=alias.id,
|
||||
website_email=f"{random_prefix}@example.com",
|
||||
website_email="abcd@example.com",
|
||||
reply_email="rep@SL",
|
||||
name="Nhơn Nguyễn",
|
||||
commit=True,
|
||||
|
@ -207,12 +199,12 @@ def test_new_addr_unicode(flask_client):
|
|||
|
||||
assert (
|
||||
contact.new_addr()
|
||||
== f"=?utf-8?q?Nh=C6=A1n_Nguy=E1=BB=85n_-_{random_prefix}_at_example=2Ecom?= <rep@SL>"
|
||||
== "=?utf-8?q?Nh=C6=A1n_Nguy=E1=BB=85n_-_abcd_at_example=2Ecom?= <rep@SL>"
|
||||
)
|
||||
|
||||
# sanity check
|
||||
assert parse_full_address(contact.new_addr()) == (
|
||||
f"Nhơn Nguyễn - {random_prefix} at example.com",
|
||||
"Nhơn Nguyễn - abcd at example.com",
|
||||
"rep@sl",
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue