diff --git a/app/alias_utils.py b/app/alias_utils.py index 43c2d448..109366cd 100644 --- a/app/alias_utils.py +++ b/app/alias_utils.py @@ -6,7 +6,7 @@ from app.email_utils import ( get_email_domain_part, send_cannot_create_directory_alias, send_cannot_create_domain_alias, - email_belongs_to_default_domains, + can_create_directory_for_address, ) from app.errors import AliasInTrashError from app.extensions import db @@ -39,7 +39,7 @@ def try_auto_create_directory(address: str) -> Optional[Alias]: Try to create an alias with directory """ # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format - if email_belongs_to_default_domains(address): + if can_create_directory_for_address(address): # if there's no directory separator in the alias, no way to auto-create it if "/" not in address and "+" not in address and "#" not in address: return None diff --git a/app/dashboard/views/custom_alias.py b/app/dashboard/views/custom_alias.py index ad89c0f4..670676d1 100644 --- a/app/dashboard/views/custom_alias.py +++ b/app/dashboard/views/custom_alias.py @@ -6,6 +6,7 @@ from app.config import ( DISABLE_ALIAS_SUFFIX, ALIAS_DOMAINS, CUSTOM_ALIAS_SECRET, + PREMIUM_ALIAS_DOMAINS, ) from app.dashboard.base import dashboard_bp from app.extensions import db @@ -40,8 +41,8 @@ def available_suffixes(user: User) -> [bool, str, str]: suffix = "." + random_word() + "@" + alias_domain.domain suffixes.append((True, suffix, signer.sign(suffix).decode())) - # then default domain - for domain in ALIAS_DOMAINS: + # then SimpleLogin domain + for domain in user.available_sl_domains(): suffix = ("" if DISABLE_ALIAS_SUFFIX else "." + random_word()) + "@" + domain suffixes.append((False, suffix, signer.sign(suffix).decode())) @@ -185,7 +186,7 @@ def custom_alias(): ) -def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool: +def verify_prefix_suffix(user: User, alias_prefix, alias_suffix) -> bool: """verify if user could create an alias with the given prefix and suffix""" if not alias_prefix or not alias_suffix: # should be caught on frontend return False @@ -200,14 +201,17 @@ def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool: alias_domain_prefix, alias_domain = alias_suffix.split("@", 1) # alias_domain must be either one of user custom domains or built-in domains - if alias_domain not in user_custom_domains and alias_domain not in ALIAS_DOMAINS: + if alias_domain not in user.available_alias_domains(): LOG.exception("wrong alias suffix %s, user %s", alias_suffix, user) return False - # built-in domain case: + # SimpleLogin domain case: # 1) alias_suffix must start with "." and # 2) alias_domain_prefix must come from the word list - if alias_domain in ALIAS_DOMAINS and alias_domain not in user_custom_domains: + if ( + alias_domain in user.available_sl_domains() + and alias_domain not in user_custom_domains + ): if not alias_domain_prefix.startswith("."): LOG.exception("User %s submits a wrong alias suffix %s", user, alias_suffix) return False @@ -226,7 +230,7 @@ def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool: LOG.exception("wrong alias suffix %s, user %s", alias_suffix, user) return False - if alias_domain not in ALIAS_DOMAINS: + if alias_domain not in user.available_sl_domains(): LOG.exception("wrong alias suffix %s, user %s", alias_suffix, user) return False diff --git a/app/dashboard/views/custom_domain.py b/app/dashboard/views/custom_domain.py index d1f0ad12..32d0c044 100644 --- a/app/dashboard/views/custom_domain.py +++ b/app/dashboard/views/custom_domain.py @@ -7,7 +7,7 @@ from app.config import EMAIL_SERVERS_WITH_PRIORITY, ALIAS_DOMAINS from app.dashboard.base import dashboard_bp from app.email_utils import get_email_domain_part from app.extensions import db -from app.models import CustomDomain, Mailbox, DomainMailbox +from app.models import CustomDomain, Mailbox, DomainMailbox, PublicDomain class NewCustomDomainForm(FlaskForm): @@ -40,7 +40,7 @@ def custom_domain(): if new_domain.startswith("https://"): new_domain = new_domain[len("https://") :] - if new_domain in ALIAS_DOMAINS: + if PublicDomain.get_by(domain=new_domain): flash("A custom domain cannot be a built-in domain.", "error") elif CustomDomain.get_by(domain=new_domain): flash(f"{new_domain} already added", "warning") diff --git a/app/dashboard/views/setting.py b/app/dashboard/views/setting.py index 5b7730d3..62868afa 100644 --- a/app/dashboard/views/setting.py +++ b/app/dashboard/views/setting.py @@ -201,8 +201,12 @@ def setting(): default_domain = request.form.get("random-alias-default-domain") if default_domain: - public_domain = PublicDomain.get_by(domain=default_domain) + public_domain: PublicDomain = PublicDomain.get_by(domain=default_domain) if public_domain: + if public_domain.premium_only and not current_user.is_premium(): + flash("You cannot use this domain", "error") + return redirect(url_for("dashboard.setting")) + # make sure only default_random_alias_domain_id or default_random_alias_public_domain_id is set current_user.default_random_alias_public_domain_id = ( public_domain.id diff --git a/app/email_utils.py b/app/email_utils.py index 3e899431..17b115c9 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -32,11 +32,12 @@ from app.config import ( SENDER, URL, LANDING_PAGE_URL, + PREMIUM_ALIAS_DOMAINS, ) from app.dns_utils import get_mx_domains from app.extensions import db from app.log import LOG -from app.models import Mailbox, User, SentAlert +from app.models import Mailbox, User, SentAlert, CustomDomain def render(template_name, **kwargs) -> str: @@ -369,8 +370,9 @@ def delete_all_headers_except(msg: Message, headers: [str]): del msg._headers[i] -def email_belongs_to_default_domains(address: str) -> bool: +def can_create_directory_for_address(address: str) -> bool: """return True if an email ends with one of the alias domains provided by SimpleLogin""" + # not allow creating directory with premium domain for domain in ALIAS_DOMAINS: if address.endswith("@" + domain): return True @@ -378,11 +380,28 @@ def email_belongs_to_default_domains(address: str) -> bool: return False -def email_domain_can_be_used_as_mailbox(email: str) -> bool: - """return True if an email can be used as a personal email. An email domain can be used if it is not +def is_valid_alias_address_domain(address) -> bool: + """Return whether an address domain might a domain handled by SimpleLogin""" + domain = get_email_domain_part(address) + if domain in ALIAS_DOMAINS: + return True + + if domain in PREMIUM_ALIAS_DOMAINS: + return True + + if CustomDomain.get_by(domain=domain, verified=True): + return True + + return False + + +def email_can_be_used_as_mailbox(email: str) -> bool: + """Return True if an email can be used as a personal email. + Use the email domain as criteria. A domain can be used if it is not: - one of ALIAS_DOMAINS + - one of PREMIUM_ALIAS_DOMAINS - one of custom domains - - disposable domain + - a disposable domain """ domain = get_email_domain_part(email) if not domain: @@ -391,6 +410,9 @@ def email_domain_can_be_used_as_mailbox(email: str) -> bool: if domain in ALIAS_DOMAINS: return False + if domain in PREMIUM_ALIAS_DOMAINS: + return False + from app.models import CustomDomain if CustomDomain.get_by(domain=domain, verified=True): diff --git a/app/models.py b/app/models.py index 8fc54336..017047d6 100644 --- a/app/models.py +++ b/app/models.py @@ -25,6 +25,8 @@ from app.config import ( FIRST_ALIAS_DOMAIN, DISABLE_ONBOARDING, PAGE_LIMIT, + ALIAS_DOMAINS, + PREMIUM_ALIAS_DOMAINS, ) from app.errors import AliasInTrashError from app.extensions import db @@ -465,7 +467,7 @@ class User(db.Model, ModelMixin, UserMixin): else: return sub - def verified_custom_domains(self): + def verified_custom_domains(self) -> ["CustomDomain"]: return CustomDomain.query.filter_by(user_id=self.id, verified=True).all() def mailboxes(self) -> List["Mailbox"]: @@ -489,16 +491,14 @@ class User(db.Model, ModelMixin, UserMixin): def available_domains_for_random_alias(self) -> List[Tuple[bool, str]]: """Return available domains for user to create random aliases Each result record contains: - - whether the domain is public (i.e. belongs to SimpleLogin) + - whether the domain belongs to SimpleLogin - the domain """ res = [] - for public_domain in PublicDomain.query.all(): - res.append((True, public_domain.domain)) + for domain in self.available_sl_domains(): + res.append((True, domain)) - for custom_domain in CustomDomain.filter_by( - user_id=self.id, verified=True - ).all(): + for custom_domain in self.verified_custom_domains(): res.append((False, custom_domain.domain)) return res @@ -525,6 +525,12 @@ class User(db.Model, ModelMixin, UserMixin): LOG.exception("Problem with %s public random alias domain", self) return FIRST_ALIAS_DOMAIN + if public_domain.premium_only and not self.is_premium(): + LOG.exception( + "%s is not premium and cannot use %s", self, public_domain + ) + return FIRST_ALIAS_DOMAIN + return public_domain.domain return FIRST_ALIAS_DOMAIN @@ -553,6 +559,32 @@ class User(db.Model, ModelMixin, UserMixin): return None + def available_sl_domains(self) -> [str]: + """ + Return all SimpleLogin domains that user can use when creating a new alias, including: + - SimpleLogin public domains, available for all users (ALIAS_DOMAIN) + - SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN) + """ + domains = ALIAS_DOMAINS + if self.is_premium(): + domains += PREMIUM_ALIAS_DOMAINS + + return domains + + def available_alias_domains(self) -> [str]: + """return all domains that user can use when creating a new alias, including: + - SimpleLogin public domains, available for all users (ALIAS_DOMAIN) + - SimpleLogin premium domains, only available for Premium accounts (PREMIUM_ALIAS_DOMAIN) + - Verified custom domains + + """ + domains = self.get_sl_domains() + + for custom_domain in self.verified_custom_domains(): + domains.append(custom_domain.domain) + + return domains + def __repr__(self): return f"" @@ -949,17 +981,25 @@ class Alias(db.Model, ModelMixin): """create a new random alias""" custom_domain = None + random_email = None + if user.default_random_alias_domain_id: custom_domain = CustomDomain.get(user.default_random_alias_domain_id) random_email = generate_email( scheme=scheme, in_hex=in_hex, alias_domain=custom_domain.domain ) elif user.default_random_alias_public_domain_id: - public_domain = PublicDomain.get(user.default_random_alias_public_domain_id) - random_email = generate_email( - scheme=scheme, in_hex=in_hex, alias_domain=public_domain.domain + public_domain: PublicDomain = PublicDomain.get( + user.default_random_alias_public_domain_id ) - else: + if public_domain.premium_only and not user.is_premium(): + LOG.exception("%s not premium, cannot use %s", user, public_domain) + else: + random_email = generate_email( + scheme=scheme, in_hex=in_hex, alias_domain=public_domain.domain + ) + + if not random_email: random_email = generate_email(scheme=scheme, in_hex=in_hex) alias = Alias.create( diff --git a/cron.py b/cron.py index 4d6dd6e0..6a09352a 100644 --- a/cron.py +++ b/cron.py @@ -22,7 +22,7 @@ from app.email_utils import ( send_email, send_trial_end_soon_email, render, - email_domain_can_be_used_as_mailbox, + email_can_be_used_as_mailbox, send_email_with_rate_control, ) from app.extensions import db @@ -311,7 +311,7 @@ def sanity_check(): # hack to not query DNS too often sleep(1) - if not email_domain_can_be_used_as_mailbox(mailbox.email): + if not email_can_be_used_as_mailbox(mailbox.email): mailbox.nb_failed_checks += 1 nb_email_log = nb_email_log_for_mailbox(mailbox) diff --git a/email_handler.py b/email_handler.py index 8f8090fd..ff41fcb3 100644 --- a/email_handler.py +++ b/email_handler.py @@ -76,13 +76,14 @@ from app.config import ( MAX_REPLY_PHASE_SPAM_SCORE, ALERT_SEND_EMAIL_CYCLE, ALERT_MAILBOX_IS_ALIAS, + PREMIUM_ALIAS_DOMAINS, ) from app.email_utils import ( send_email, add_dkim_signature, add_or_replace_header, delete_header, - email_belongs_to_default_domains, + can_create_directory_for_address, render, get_orig_message_from_bounce, delete_all_headers_except, @@ -96,6 +97,7 @@ from app.email_utils import ( to_bytes, get_header_from_bounce, send_email_at_most_times, + is_valid_alias_address_domain, ) from app.extensions import db from app.greylisting import greylisting_needed @@ -715,10 +717,11 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str): address: str = contact.alias.email alias_domain = address[address.find("@") + 1 :] - # alias must end with one of the ALIAS_DOMAINS or custom-domain - if not email_belongs_to_default_domains(alias.email): - if not CustomDomain.get_by(domain=alias_domain): - return False, "550 SL E5" + # Sanity check: verify alias domain is managed by SimpleLogin + # scenario: a user have removed a domain but due to a bug, the aliases are still there + if not is_valid_alias_address_domain(alias.email): + LOG.exception("%s domain isn't known", alias) + return False, "550 SL E5" user = alias.user mail_from = envelope.mail_from @@ -871,7 +874,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str): else: msg = replace_str_in_msg(msg, reply_email, contact.website_email) - if alias_domain in ALIAS_DOMAINS: + if alias_domain in ALIAS_DOMAINS or alias_domain in PREMIUM_ALIAS_DOMAINS: add_dkim_signature(msg, alias_domain) # add DKIM-Signature for custom-domain alias else: diff --git a/init_app.py b/init_app.py index c12d9d08..3a78b26f 100644 --- a/init_app.py +++ b/init_app.py @@ -1,5 +1,5 @@ """Initial loading script""" -from app.config import ALIAS_DOMAINS +from app.config import ALIAS_DOMAINS, PREMIUM_ALIAS_DOMAINS from app.models import Mailbox, Contact, PublicDomain from app.log import LOG from app.extensions import db @@ -45,6 +45,13 @@ def add_public_domains(): LOG.info("Add %s to public domain", alias_domain) PublicDomain.create(domain=alias_domain) + for premium_domain in PREMIUM_ALIAS_DOMAINS: + if PublicDomain.get_by(domain=premium_domain): + LOG.d("%s is already a public domain", premium_domain) + else: + LOG.info("Add %s to public domain", premium_domain) + PublicDomain.create(domain=premium_domain, premium_only=True) + db.session.commit() diff --git a/shell.py b/shell.py index a0f85ef9..c72044cb 100644 --- a/shell.py +++ b/shell.py @@ -7,6 +7,7 @@ from sqlalchemy_utils import create_database, database_exists, drop_database from app.config import ( DB_URI, ALIAS_DOMAINS, + PREMIUM_ALIAS_DOMAINS, ) from app.email_utils import send_email, render, get_email_domain_part from app.models import * @@ -99,7 +100,10 @@ def migrate_domain_trash(): """Move aliases from global trash to domain trash if applicable""" for deleted_alias in DeletedAlias.query.all(): alias_domain = get_email_domain_part(deleted_alias.email) - if alias_domain not in ALIAS_DOMAINS: + if ( + alias_domain not in ALIAS_DOMAINS + and alias_domain not in PREMIUM_ALIAS_DOMAINS + ): domain = CustomDomain.get_by(domain=alias_domain) if domain: LOG.d("move %s to domain %s trash", deleted_alias, domain) diff --git a/tests/test_email_utils.py b/tests/test_email_utils.py index bceda205..75c250bd 100644 --- a/tests/test_email_utils.py +++ b/tests/test_email_utils.py @@ -4,8 +4,8 @@ from email.message import EmailMessage from app.config import MAX_ALERT_24H from app.email_utils import ( get_email_domain_part, - email_belongs_to_default_domains, - email_domain_can_be_used_as_mailbox, + can_create_directory_for_address, + email_can_be_used_as_mailbox, delete_header, add_or_replace_header, parseaddr_unicode, @@ -24,19 +24,19 @@ def test_get_email_domain_part(): def test_email_belongs_to_alias_domains(): # default alias domain - assert email_belongs_to_default_domains("ab@sl.local") - assert not email_belongs_to_default_domains("ab@not-exist.local") + assert can_create_directory_for_address("ab@sl.local") + assert not can_create_directory_for_address("ab@not-exist.local") - assert email_belongs_to_default_domains("hey@d1.test") - assert not email_belongs_to_default_domains("hey@d3.test") + assert can_create_directory_for_address("hey@d1.test") + assert not can_create_directory_for_address("hey@d3.test") def test_can_be_used_as_personal_email(flask_client): # default alias domain - assert not email_domain_can_be_used_as_mailbox("ab@sl.local") - assert not email_domain_can_be_used_as_mailbox("hey@d1.test") + assert not email_can_be_used_as_mailbox("ab@sl.local") + assert not email_can_be_used_as_mailbox("hey@d1.test") - assert email_domain_can_be_used_as_mailbox("hey@ab.cd") + assert email_can_be_used_as_mailbox("hey@ab.cd") # custom domain user = User.create( email="a@b.c", password="password", name="Test User", activated=True @@ -44,17 +44,17 @@ def test_can_be_used_as_personal_email(flask_client): db.session.commit() CustomDomain.create(user_id=user.id, domain="ab.cd", verified=True) db.session.commit() - assert not email_domain_can_be_used_as_mailbox("hey@ab.cd") + assert not email_can_be_used_as_mailbox("hey@ab.cd") # disposable domain - assert not email_domain_can_be_used_as_mailbox("abcd@10minutesmail.fr") - assert not email_domain_can_be_used_as_mailbox("abcd@temp-mail.com") + assert not email_can_be_used_as_mailbox("abcd@10minutesmail.fr") + assert not email_can_be_used_as_mailbox("abcd@temp-mail.com") # subdomain will not work - assert not email_domain_can_be_used_as_mailbox("abcd@sub.temp-mail.com") + assert not email_can_be_used_as_mailbox("abcd@sub.temp-mail.com") # valid domains should not be affected - assert email_domain_can_be_used_as_mailbox("abcd@protonmail.com") - assert email_domain_can_be_used_as_mailbox("abcd@gmail.com") - assert email_domain_can_be_used_as_mailbox("abcd@example.com") + assert email_can_be_used_as_mailbox("abcd@protonmail.com") + assert email_can_be_used_as_mailbox("abcd@gmail.com") + assert email_can_be_used_as_mailbox("abcd@example.com") def test_delete_header():