take into account Premium domains

This commit is contained in:
Son NK 2020-10-15 16:21:31 +02:00
parent dcbd7baabc
commit e79522b638
11 changed files with 138 additions and 54 deletions

View File

@ -6,7 +6,7 @@ from app.email_utils import (
get_email_domain_part,
send_cannot_create_directory_alias,
send_cannot_create_domain_alias,
email_belongs_to_default_domains,
can_create_directory_for_address,
)
from app.errors import AliasInTrashError
from app.extensions import db
@ -39,7 +39,7 @@ def try_auto_create_directory(address: str) -> Optional[Alias]:
Try to create an alias with directory
"""
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
if email_belongs_to_default_domains(address):
if can_create_directory_for_address(address):
# if there's no directory separator in the alias, no way to auto-create it
if "/" not in address and "+" not in address and "#" not in address:
return None

View File

@ -6,6 +6,7 @@ from app.config import (
DISABLE_ALIAS_SUFFIX,
ALIAS_DOMAINS,
CUSTOM_ALIAS_SECRET,
PREMIUM_ALIAS_DOMAINS,
)
from app.dashboard.base import dashboard_bp
from app.extensions import db
@ -40,8 +41,8 @@ def available_suffixes(user: User) -> [bool, str, str]:
suffix = "." + random_word() + "@" + alias_domain.domain
suffixes.append((True, suffix, signer.sign(suffix).decode()))
# then default domain
for domain in ALIAS_DOMAINS:
# then SimpleLogin domain
for domain in user.available_sl_domains():
suffix = ("" if DISABLE_ALIAS_SUFFIX else "." + random_word()) + "@" + domain
suffixes.append((False, suffix, signer.sign(suffix).decode()))
@ -185,7 +186,7 @@ def custom_alias():
)
def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool:
def verify_prefix_suffix(user: User, alias_prefix, alias_suffix) -> bool:
"""verify if user could create an alias with the given prefix and suffix"""
if not alias_prefix or not alias_suffix: # should be caught on frontend
return False
@ -200,14 +201,17 @@ def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool:
alias_domain_prefix, alias_domain = alias_suffix.split("@", 1)
# alias_domain must be either one of user custom domains or built-in domains
if alias_domain not in user_custom_domains and alias_domain not in ALIAS_DOMAINS:
if alias_domain not in user.available_alias_domains():
LOG.exception("wrong alias suffix %s, user %s", alias_suffix, user)
return False
# built-in domain case:
# SimpleLogin domain case:
# 1) alias_suffix must start with "." and
# 2) alias_domain_prefix must come from the word list
if alias_domain in ALIAS_DOMAINS and alias_domain not in user_custom_domains:
if (
alias_domain in user.available_sl_domains()
and alias_domain not in user_custom_domains
):
if not alias_domain_prefix.startswith("."):
LOG.exception("User %s submits a wrong alias suffix %s", user, alias_suffix)
return False
@ -226,7 +230,7 @@ def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool:
LOG.exception("wrong alias suffix %s, user %s", alias_suffix, user)
return False
if alias_domain not in ALIAS_DOMAINS:
if alias_domain not in user.available_sl_domains():
LOG.exception("wrong alias suffix %s, user %s", alias_suffix, user)
return False

View File

@ -7,7 +7,7 @@ from app.config import EMAIL_SERVERS_WITH_PRIORITY, ALIAS_DOMAINS
from app.dashboard.base import dashboard_bp
from app.email_utils import get_email_domain_part
from app.extensions import db
from app.models import CustomDomain, Mailbox, DomainMailbox
from app.models import CustomDomain, Mailbox, DomainMailbox, PublicDomain
class NewCustomDomainForm(FlaskForm):
@ -40,7 +40,7 @@ def custom_domain():
if new_domain.startswith("https://"):
new_domain = new_domain[len("https://") :]
if new_domain in ALIAS_DOMAINS:
if PublicDomain.get_by(domain=new_domain):
flash("A custom domain cannot be a built-in domain.", "error")
elif CustomDomain.get_by(domain=new_domain):
flash(f"{new_domain} already added", "warning")

View File

@ -201,8 +201,12 @@ def setting():
default_domain = request.form.get("random-alias-default-domain")
if default_domain:
public_domain = PublicDomain.get_by(domain=default_domain)
public_domain: PublicDomain = PublicDomain.get_by(domain=default_domain)
if public_domain:
if public_domain.premium_only and not current_user.is_premium():
flash("You cannot use this domain", "error")
return redirect(url_for("dashboard.setting"))
# make sure only default_random_alias_domain_id or default_random_alias_public_domain_id is set
current_user.default_random_alias_public_domain_id = (
public_domain.id

View File

@ -32,11 +32,12 @@ from app.config import (
SENDER,
URL,
LANDING_PAGE_URL,
PREMIUM_ALIAS_DOMAINS,
)
from app.dns_utils import get_mx_domains
from app.extensions import db
from app.log import LOG
from app.models import Mailbox, User, SentAlert
from app.models import Mailbox, User, SentAlert, CustomDomain
def render(template_name, **kwargs) -> str:
@ -369,8 +370,9 @@ def delete_all_headers_except(msg: Message, headers: [str]):
del msg._headers[i]
def email_belongs_to_default_domains(address: str) -> bool:
def can_create_directory_for_address(address: str) -> bool:
"""return True if an email ends with one of the alias domains provided by SimpleLogin"""
# not allow creating directory with premium domain
for domain in ALIAS_DOMAINS:
if address.endswith("@" + domain):
return True
@ -378,11 +380,28 @@ def email_belongs_to_default_domains(address: str) -> bool:
return False
def email_domain_can_be_used_as_mailbox(email: str) -> bool:
"""return True if an email can be used as a personal email. An email domain can be used if it is not
def is_valid_alias_address_domain(address) -> bool:
"""Return whether an address domain might a domain handled by SimpleLogin"""
domain = get_email_domain_part(address)
if domain in ALIAS_DOMAINS:
return True
if domain in PREMIUM_ALIAS_DOMAINS:
return True
if CustomDomain.get_by(domain=domain, verified=True):
return True
return False
def email_can_be_used_as_mailbox(email: str) -> bool:
"""Return True if an email can be used as a personal email.
Use the email domain as criteria. A domain can be used if it is not:
- one of ALIAS_DOMAINS
- one of PREMIUM_ALIAS_DOMAINS
- one of custom domains
- disposable domain
- a disposable domain
"""
domain = get_email_domain_part(email)
if not domain:
@ -391,6 +410,9 @@ def email_domain_can_be_used_as_mailbox(email: str) -> bool:
if domain in ALIAS_DOMAINS:
return False
if domain in PREMIUM_ALIAS_DOMAINS:
return False
from app.models import CustomDomain
if CustomDomain.get_by(domain=domain, verified=True):

View File

@ -25,6 +25,8 @@ from app.config import (
FIRST_ALIAS_DOMAIN,
DISABLE_ONBOARDING,
PAGE_LIMIT,
ALIAS_DOMAINS,
PREMIUM_ALIAS_DOMAINS,
)
from app.errors import AliasInTrashError
from app.extensions import db
@ -465,7 +467,7 @@ class User(db.Model, ModelMixin, UserMixin):
else:
return sub
def verified_custom_domains(self):
def verified_custom_domains(self) -> ["CustomDomain"]:
return CustomDomain.query.filter_by(user_id=self.id, verified=True).all()
def mailboxes(self) -> List["Mailbox"]:
@ -489,16 +491,14 @@ class User(db.Model, ModelMixin, UserMixin):
def available_domains_for_random_alias(self) -> List[Tuple[bool, str]]:
"""Return available domains for user to create random aliases
Each result record contains:
- whether the domain is public (i.e. belongs to SimpleLogin)
- whether the domain belongs to SimpleLogin
- the domain
"""
res = []
for public_domain in PublicDomain.query.all():
res.append((True, public_domain.domain))
for domain in self.available_sl_domains():
res.append((True, domain))
for custom_domain in CustomDomain.filter_by(
user_id=self.id, verified=True
).all():
for custom_domain in self.verified_custom_domains():
res.append((False, custom_domain.domain))
return res
@ -525,6 +525,12 @@ class User(db.Model, ModelMixin, UserMixin):
LOG.exception("Problem with %s public random alias domain", self)
return FIRST_ALIAS_DOMAIN
if public_domain.premium_only and not self.is_premium():
LOG.exception(
"%s is not premium and cannot use %s", self, public_domain
)
return FIRST_ALIAS_DOMAIN
return public_domain.domain
return FIRST_ALIAS_DOMAIN
@ -553,6 +559,32 @@ class User(db.Model, ModelMixin, UserMixin):
return None
def available_sl_domains(self) -> [str]:
"""
Return all SimpleLogin domains that user can use when creating a new alias, including:
- SimpleLogin public domains, available for all users (ALIAS_DOMAIN)
- SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN)
"""
domains = ALIAS_DOMAINS
if self.is_premium():
domains += PREMIUM_ALIAS_DOMAINS
return domains
def available_alias_domains(self) -> [str]:
"""return all domains that user can use when creating a new alias, including:
- SimpleLogin public domains, available for all users (ALIAS_DOMAIN)
- SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN)
- Verified custom domains
"""
domains = self.get_sl_domains()
for custom_domain in self.verified_custom_domains():
domains.append(custom_domain.domain)
return domains
def __repr__(self):
return f"<User {self.id} {self.name} {self.email}>"
@ -949,17 +981,25 @@ class Alias(db.Model, ModelMixin):
"""create a new random alias"""
custom_domain = None
random_email = None
if user.default_random_alias_domain_id:
custom_domain = CustomDomain.get(user.default_random_alias_domain_id)
random_email = generate_email(
scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain
)
elif user.default_random_alias_public_domain_id:
public_domain = PublicDomain.get(user.default_random_alias_public_domain_id)
random_email = generate_email(
scheme=scheme, in_hex=in_hex, alias_domain=public_domain.domain
public_domain: PublicDomain = PublicDomain.get(
user.default_random_alias_public_domain_id
)
else:
if public_domain.premium_only and not user.is_premium():
LOG.exception("%s not premium, cannot use %s", user, public_domain)
else:
random_email = generate_email(
scheme=scheme, in_hex=in_hex, alias_domain=public_domain.domain
)
if not random_email:
random_email = generate_email(scheme=scheme, in_hex=in_hex)
alias = Alias.create(

View File

@ -22,7 +22,7 @@ from app.email_utils import (
send_email,
send_trial_end_soon_email,
render,
email_domain_can_be_used_as_mailbox,
email_can_be_used_as_mailbox,
send_email_with_rate_control,
)
from app.extensions import db
@ -311,7 +311,7 @@ def sanity_check():
# hack to not query DNS too often
sleep(1)
if not email_domain_can_be_used_as_mailbox(mailbox.email):
if not email_can_be_used_as_mailbox(mailbox.email):
mailbox.nb_failed_checks += 1
nb_email_log = nb_email_log_for_mailbox(mailbox)

View File

@ -76,13 +76,14 @@ from app.config import (
MAX_REPLY_PHASE_SPAM_SCORE,
ALERT_SEND_EMAIL_CYCLE,
ALERT_MAILBOX_IS_ALIAS,
PREMIUM_ALIAS_DOMAINS,
)
from app.email_utils import (
send_email,
add_dkim_signature,
add_or_replace_header,
delete_header,
email_belongs_to_default_domains,
can_create_directory_for_address,
render,
get_orig_message_from_bounce,
delete_all_headers_except,
@ -96,6 +97,7 @@ from app.email_utils import (
to_bytes,
get_header_from_bounce,
send_email_at_most_times,
is_valid_alias_address_domain,
)
from app.extensions import db
from app.greylisting import greylisting_needed
@ -715,10 +717,11 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
address: str = contact.alias.email
alias_domain = address[address.find("@") + 1 :]
# alias must end with one of the ALIAS_DOMAINS or custom-domain
if not email_belongs_to_default_domains(alias.email):
if not CustomDomain.get_by(domain=alias_domain):
return False, "550 SL E5"
# Sanity check: verify alias domain is managed by SimpleLogin
# scenario: a user have removed a domain but due to a bug, the aliases are still there
if not is_valid_alias_address_domain(alias.email):
LOG.exception("%s domain isn't known", alias)
return False, "550 SL E5"
user = alias.user
mail_from = envelope.mail_from
@ -871,7 +874,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
else:
msg = replace_str_in_msg(msg, reply_email, contact.website_email)
if alias_domain in ALIAS_DOMAINS:
if alias_domain in ALIAS_DOMAINS or alias_domain in PREMIUM_ALIAS_DOMAINS:
add_dkim_signature(msg, alias_domain)
# add DKIM-Signature for custom-domain alias
else:

View File

@ -1,5 +1,5 @@
"""Initial loading script"""
from app.config import ALIAS_DOMAINS
from app.config import ALIAS_DOMAINS, PREMIUM_ALIAS_DOMAINS
from app.models import Mailbox, Contact, PublicDomain
from app.log import LOG
from app.extensions import db
@ -45,6 +45,13 @@ def add_public_domains():
LOG.info("Add %s to public domain", alias_domain)
PublicDomain.create(domain=alias_domain)
for premium_domain in PREMIUM_ALIAS_DOMAINS:
if PublicDomain.get_by(domain=premium_domain):
LOG.d("%s is already a public domain", premium_domain)
else:
LOG.info("Add %s to public domain", premium_domain)
PublicDomain.create(domain=premium_domain, premium_only=True)
db.session.commit()

View File

@ -7,6 +7,7 @@ from sqlalchemy_utils import create_database, database_exists, drop_database
from app.config import (
DB_URI,
ALIAS_DOMAINS,
PREMIUM_ALIAS_DOMAINS,
)
from app.email_utils import send_email, render, get_email_domain_part
from app.models import *
@ -99,7 +100,10 @@ def migrate_domain_trash():
"""Move aliases from global trash to domain trash if applicable"""
for deleted_alias in DeletedAlias.query.all():
alias_domain = get_email_domain_part(deleted_alias.email)
if alias_domain not in ALIAS_DOMAINS:
if (
alias_domain not in ALIAS_DOMAINS
and alias_domain not in PREMIUM_ALIAS_DOMAINS
):
domain = CustomDomain.get_by(domain=alias_domain)
if domain:
LOG.d("move %s to domain %s trash", deleted_alias, domain)

View File

@ -4,8 +4,8 @@ from email.message import EmailMessage
from app.config import MAX_ALERT_24H
from app.email_utils import (
get_email_domain_part,
email_belongs_to_default_domains,
email_domain_can_be_used_as_mailbox,
can_create_directory_for_address,
email_can_be_used_as_mailbox,
delete_header,
add_or_replace_header,
parseaddr_unicode,
@ -24,19 +24,19 @@ def test_get_email_domain_part():
def test_email_belongs_to_alias_domains():
# default alias domain
assert email_belongs_to_default_domains("ab@sl.local")
assert not email_belongs_to_default_domains("ab@not-exist.local")
assert can_create_directory_for_address("ab@sl.local")
assert not can_create_directory_for_address("ab@not-exist.local")
assert email_belongs_to_default_domains("hey@d1.test")
assert not email_belongs_to_default_domains("hey@d3.test")
assert can_create_directory_for_address("hey@d1.test")
assert not can_create_directory_for_address("hey@d3.test")
def test_can_be_used_as_personal_email(flask_client):
# default alias domain
assert not email_domain_can_be_used_as_mailbox("ab@sl.local")
assert not email_domain_can_be_used_as_mailbox("hey@d1.test")
assert not email_can_be_used_as_mailbox("ab@sl.local")
assert not email_can_be_used_as_mailbox("hey@d1.test")
assert email_domain_can_be_used_as_mailbox("hey@ab.cd")
assert email_can_be_used_as_mailbox("hey@ab.cd")
# custom domain
user = User.create(
email="a@b.c", password="password", name="Test User", activated=True
@ -44,17 +44,17 @@ def test_can_be_used_as_personal_email(flask_client):
db.session.commit()
CustomDomain.create(user_id=user.id, domain="ab.cd", verified=True)
db.session.commit()
assert not email_domain_can_be_used_as_mailbox("hey@ab.cd")
assert not email_can_be_used_as_mailbox("hey@ab.cd")
# disposable domain
assert not email_domain_can_be_used_as_mailbox("abcd@10minutesmail.fr")
assert not email_domain_can_be_used_as_mailbox("abcd@temp-mail.com")
assert not email_can_be_used_as_mailbox("abcd@10minutesmail.fr")
assert not email_can_be_used_as_mailbox("abcd@temp-mail.com")
# subdomain will not work
assert not email_domain_can_be_used_as_mailbox("abcd@sub.temp-mail.com")
assert not email_can_be_used_as_mailbox("abcd@sub.temp-mail.com")
# valid domains should not be affected
assert email_domain_can_be_used_as_mailbox("abcd@protonmail.com")
assert email_domain_can_be_used_as_mailbox("abcd@gmail.com")
assert email_domain_can_be_used_as_mailbox("abcd@example.com")
assert email_can_be_used_as_mailbox("abcd@protonmail.com")
assert email_can_be_used_as_mailbox("abcd@gmail.com")
assert email_can_be_used_as_mailbox("abcd@example.com")
def test_delete_header():