Refactor unsubscribe handling (#1090)

* Refactor unsubscribe email handling

* MR comments

* Moved all unsub logic to the encoder

* remove unused

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
Adrià Casajús 2022-06-30 11:40:01 +02:00 committed by GitHub
parent c85ed7d29e
commit 21feced342
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 600 additions and 199 deletions

View File

@ -0,0 +1,76 @@
import enum
from dataclasses import dataclass
from typing import Optional
from app import config
class UnsubscribeAction(enum.Enum):
UnsubscribeNewsletter = 1
DisableAlias = 2
DisableContact = 3
@dataclass
class UnsubscribeData:
action: UnsubscribeAction
data: int
@dataclass
class UnsubscribeLink:
link: str
via_email: bool
class UnsubscribeEncoder:
@staticmethod
def encode(action: UnsubscribeAction, data: int) -> UnsubscribeLink:
if config.UNSUBSCRIBER:
return UnsubscribeLink(UnsubscribeEncoder.encode_mailto(action, data), True)
return UnsubscribeLink(UnsubscribeEncoder.encode_url(action, data), False)
@staticmethod
def encode_subject(action: UnsubscribeAction, data: int) -> str:
if action == UnsubscribeAction.DisableAlias:
return f"{data}="
if action == UnsubscribeAction.DisableContact:
return f"{data}_"
if action == UnsubscribeAction.UnsubscribeNewsletter:
return f"{data}*"
@staticmethod
def encode_mailto(action: UnsubscribeAction, data: int) -> str:
subject = UnsubscribeEncoder.encode_subject(action, data)
return f"mailto:{config.UNSUBSCRIBER}?subject={subject}"
@staticmethod
def encode_url(action: UnsubscribeAction, data: int) -> str:
if action == UnsubscribeAction.DisableAlias:
return f"{config.URL}/dashboard/unsubscribe/{data}"
if action == UnsubscribeAction.DisableContact:
return f"{config.URL}/dashboard/block_contact/{data}"
if action == UnsubscribeAction.UnsubscribeNewsletter:
raise Exception("Cannot encode url to disable newsletter")
@staticmethod
def decode_subject(data: str) -> Optional[UnsubscribeData]:
try:
# subject has the format {alias.id}=
if data.endswith("="):
alias_id = int(data[:-1])
return UnsubscribeData(UnsubscribeAction.DisableAlias, alias_id)
# {contact.id}_
elif data.endswith("_"):
contact_id = int(data[:-1])
return UnsubscribeData(UnsubscribeAction.DisableContact, contact_id)
# {user.id}*
elif data.endswith("*"):
user_id = int(data[:-1])
return UnsubscribeData(UnsubscribeAction.UnsubscribeNewsletter, user_id)
else:
# some email providers might strip off the = suffix
alias_id = int(data)
return UnsubscribeData(UnsubscribeAction.DisableAlias, alias_id)
except ValueError:
return None

View File

@ -0,0 +1,34 @@
from email.message import Message
from app.email import headers
from app.email_utils import add_or_replace_header
from app.handler.unsubscribe_encoder import (
UnsubscribeEncoder,
UnsubscribeAction,
)
from app.models import Alias, Contact
class UnsubscribeGenerator:
def add_header_to_message(
self, alias: Alias, contact: Contact, message: Message
) -> Message:
"""
Add List-Unsubscribe header
"""
user = alias.user
if user.one_click_unsubscribe_block_sender:
unsub_link = UnsubscribeEncoder.encode(
UnsubscribeAction.DisableContact, contact.id
)
else:
unsub_link = UnsubscribeEncoder.encode(
UnsubscribeAction.DisableAlias, alias.id
)
add_or_replace_header(message, headers.LIST_UNSUBSCRIBE, f"<{unsub_link.link}>")
if not unsub_link.via_email:
add_or_replace_header(
message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
)
return message

View File

@ -0,0 +1,175 @@
from email.message import Message
from typing import Optional
from aiosmtpd.smtp import Envelope
from app import config
from app.db import Session
from app.email import headers, status
from app.email_utils import send_email, render
from app.handler.unsubscribe_encoder import (
UnsubscribeData,
UnsubscribeEncoder,
UnsubscribeAction,
)
from app.log import LOG
from app.models import Alias, Contact, User, Mailbox
class UnsubscribeHandler:
def _extract_unsub_info_from_message(
self, message: Message
) -> Optional[UnsubscribeData]:
header_value = message[headers.SUBJECT]
if not header_value:
return None
return UnsubscribeEncoder.decode_subject(header_value)
def handle_unsubscribe_from_message(self, envelope: Envelope, msg: Message) -> str:
unsub_data = self._extract_unsub_info_from_message(msg)
if not unsub_data:
LOG.w("Wrong format subject %s", msg[headers.SUBJECT])
return status.E507
mailbox = Mailbox.get_by(email=envelope.mail_from)
if not mailbox:
LOG.w("Unknown mailbox %s", msg[headers.SUBJECT])
return status.E507
if unsub_data.action == UnsubscribeAction.DisableAlias:
return self._disable_alias(unsub_data.data, mailbox.user, mailbox)
elif unsub_data.action == UnsubscribeAction.DisableContact:
return self._disable_contact(unsub_data.data, mailbox.user, mailbox)
elif unsub_data.action == UnsubscribeAction.UnsubscribeNewsletter:
return self._unsubscribe_user_from_newsletter(unsub_data.data, mailbox.user)
else:
raise Exception(f"Unknown unsubscribe action {unsub_data.action}")
def _disable_alias(
self, alias_id: int, user: User, mailbox: Optional[Mailbox] = None
) -> str:
alias = Alias.get(alias_id)
if not alias:
return status.E508
if alias.user_id != user.id:
LOG.w("Alias doesn't belong to user")
return status.E508
# Only alias's owning mailbox can send the unsubscribe request
if mailbox and not self._check_email_is_authorized_for_alias(
mailbox.email, alias
):
return status.E509
alias.enabled = False
Session.commit()
enable_alias_url = config.URL + f"/dashboard/?highlight_alias_id={alias.id}"
for mailbox in alias.mailboxes:
send_email(
mailbox.email,
f"Alias {alias.email} has been disabled successfully",
render(
"transactional/unsubscribe-disable-alias.txt",
user=alias.user,
alias=alias.email,
enable_alias_url=enable_alias_url,
),
render(
"transactional/unsubscribe-disable-alias.html",
user=alias.user,
alias=alias.email,
enable_alias_url=enable_alias_url,
),
)
return status.E202
def _disable_contact(
self, contact_id: int, user: User, mailbox: Optional[Mailbox] = None
) -> str:
contact = Contact.get(contact_id)
if not contact:
return status.E508
if contact.user_id != user.id:
LOG.w("Contact doesn't belong to user")
return status.E508
# Only contact's owning mailbox can send the unsubscribe request
if mailbox and not self._check_email_is_authorized_for_alias(
mailbox.email, contact.alias
):
return status.E509
alias = contact.alias
contact.block_forward = True
Session.commit()
unblock_contact_url = (
config.URL
+ f"/dashboard/alias_contact_manager/{alias.id}?highlight_contact_id={contact.id}"
)
for mailbox in alias.mailboxes:
send_email(
mailbox.email,
f"Emails from {contact.website_email} to {alias.email} are now blocked",
render(
"transactional/unsubscribe-block-contact.txt.jinja2",
user=alias.user,
alias=alias,
contact=contact,
unblock_contact_url=unblock_contact_url,
),
)
return status.E202
def _unsubscribe_user_from_newsletter(
self, user_id: int, request_user: User
) -> str:
"""return the SMTP status"""
user = User.get(user_id)
if not user:
LOG.w("No such user %s", user_id)
return status.E510
if user.id != request_user.id:
LOG.w("Unauthorized unsubscribe user from", request_user)
return status.E511
user.notification = False
Session.commit()
send_email(
user.email,
"You have been unsubscribed from SimpleLogin newsletter",
render(
"transactional/unsubscribe-newsletter.txt",
user=user,
),
render(
"transactional/unsubscribe-newsletter.html",
user=user,
),
)
return status.E202
def _check_email_is_authorized_for_alias(
self, email_address: str, alias: Alias
) -> bool:
"""return if the email_address is authorized to unsubscribe from an alias or block a contact
Usually the mail_from=mailbox.email but it can also be one of the authorized address
"""
for mailbox in alias.mailboxes:
if mailbox.email == email_address:
return True
for authorized_address in mailbox.authorized_addresses:
if authorized_address.email == email_address:
LOG.d(
"Found an authorized address for %s %s %s",
alias,
mailbox,
authorized_address,
)
return True
LOG.d(
"%s cannot disable alias %s. Alias authorized addresses:%s",
email_address,
alias,
alias.authorized_addresses,
)
return False

View File

@ -27,25 +27,7 @@ from sqlalchemy.sql import and_
from sqlalchemy_utils import ArrowType
from app import s3
from app.config import (
MAX_NB_EMAIL_FREE_PLAN,
URL,
AVATAR_URL_EXPIRATION,
JOB_ONBOARDING_1,
JOB_ONBOARDING_2,
JOB_ONBOARDING_4,
LANDING_PAGE_URL,
FIRST_ALIAS_DOMAIN,
DISABLE_ONBOARDING,
UNSUBSCRIBER,
ALIAS_RANDOM_SUFFIX_LENGTH,
MAX_NB_SUBDOMAIN,
MAX_NB_DIRECTORY,
ROOT_DIR,
NOREPLY,
PARTNER_API_TOKEN_SECRET,
JOB_SEND_PROTON_WELCOME_1,
)
from app import config
from app.db import Session
from app.errors import (
AliasInTrashError,
@ -53,6 +35,7 @@ from app.errors import (
SubdomainInTrashError,
CannotCreateContactForReverseAlias,
)
from app.handler.unsubscribe_encoder import UnsubscribeAction, UnsubscribeEncoder
from app.log import LOG
from app.oauth_models import Scope
from app.pw_models import PasswordOracle
@ -514,14 +497,14 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
def directory_quota(self):
return min(
self._directory_quota,
MAX_NB_DIRECTORY - Directory.filter_by(user_id=self.id).count(),
config.MAX_NB_DIRECTORY - Directory.filter_by(user_id=self.id).count(),
)
@property
def subdomain_quota(self):
return min(
self._subdomain_quota,
MAX_NB_SUBDOMAIN
config.MAX_NB_SUBDOMAIN
- CustomDomain.filter_by(user_id=self.id, is_sl_subdomain=True).count(),
)
@ -579,30 +562,30 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
user.notification = False
user.trial_end = None
Job.create(
name=JOB_SEND_PROTON_WELCOME_1,
name=config.JOB_SEND_PROTON_WELCOME_1,
payload={"user_id": user.id},
run_at=arrow.now(),
)
Session.flush()
return user
if DISABLE_ONBOARDING:
if config.DISABLE_ONBOARDING:
LOG.d("Disable onboarding emails")
return user
# Schedule onboarding emails
Job.create(
name=JOB_ONBOARDING_1,
name=config.JOB_ONBOARDING_1,
payload={"user_id": user.id},
run_at=arrow.now().shift(days=1),
)
Job.create(
name=JOB_ONBOARDING_2,
name=config.JOB_ONBOARDING_2,
payload={"user_id": user.id},
run_at=arrow.now().shift(days=2),
)
Job.create(
name=JOB_ONBOARDING_4,
name=config.JOB_ONBOARDING_4,
payload={"user_id": user.id},
run_at=arrow.now().shift(days=3),
)
@ -748,7 +731,9 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if self.lifetime_or_active_subscription():
return True
else:
return Alias.filter_by(user_id=self.id).count() < MAX_NB_EMAIL_FREE_PLAN
return (
Alias.filter_by(user_id=self.id).count() < config.MAX_NB_EMAIL_FREE_PLAN
)
def profile_picture_url(self):
if self.profile_picture_id:
@ -854,7 +839,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
or custom_domain.user_id != self.id
):
LOG.w("Problem with %s default random alias domain", self)
return FIRST_ALIAS_DOMAIN
return config.FIRST_ALIAS_DOMAIN
return custom_domain.domain
@ -863,7 +848,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
# sanity check
if not sl_domain:
LOG.e("Problem with %s public random alias domain", self)
return FIRST_ALIAS_DOMAIN
return config.FIRST_ALIAS_DOMAIN
if sl_domain.premium_only and not self.is_premium():
LOG.w(
@ -874,11 +859,11 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
self.default_alias_custom_domain_id = None
self.default_alias_public_domain_id = None
Session.commit()
return FIRST_ALIAS_DOMAIN
return config.FIRST_ALIAS_DOMAIN
return sl_domain.domain
return FIRST_ALIAS_DOMAIN
return config.FIRST_ALIAS_DOMAIN
def fido_enabled(self) -> bool:
if self.fido_uuid is not None:
@ -899,16 +884,24 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
if self.newsletter_alias_id:
alias = Alias.get(self.newsletter_alias_id)
if alias.enabled:
unsubscribe_link, via_email = alias.unsubscribe_link()
return alias.email, unsubscribe_link, via_email
unsub = UnsubscribeEncoder.encode(
UnsubscribeAction.DisableAlias, alias.id
)
return alias.email, unsub.link, unsub.via_email
# alias disabled -> user doesn't want to receive newsletter
else:
return None, None, False
else:
# do not handle http POST unsubscribe
if UNSUBSCRIBER:
if config.UNSUBSCRIBER:
# use * as suffix instead of = as for alias unsubscribe
return self.email, f"mailto:{UNSUBSCRIBER}?subject={self.id}*", True
return (
self.email,
UnsubscribeEncoder.encode_mailto(
UnsubscribeAction.UnsubscribeNewsletter, self.id
),
True,
)
return None, None, False
@ -961,7 +954,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
str: the random suffix generated
"""
if self.random_alias_suffix == AliasSuffixEnum.random_string.value:
return random_string(ALIAS_RANDOM_SUFFIX_LENGTH, include_digits=True)
return random_string(config.ALIAS_RANDOM_SUFFIX_LENGTH, include_digits=True)
return random_word()
def __repr__(self):
@ -1137,7 +1130,7 @@ class Client(Base, ModelMixin):
if self.icon_id:
return self.icon.get_url()
else:
return URL + "/static/default-icon.svg"
return config.URL + "/static/default-icon.svg"
def last_user_login(self) -> "ClientUser":
client_user = (
@ -1210,7 +1203,7 @@ class OauthToken(Base, ModelMixin):
def generate_email(
scheme: int = AliasGeneratorEnum.word.value,
in_hex: bool = False,
alias_domain=FIRST_ALIAS_DOMAIN,
alias_domain=config.FIRST_ALIAS_DOMAIN,
) -> str:
"""generate an email address that does not exist before
:param alias_domain: the domain used to generate the alias.
@ -1430,7 +1423,7 @@ class Alias(Base, ModelMixin):
# find the right suffix - avoid infinite loop by running this at max 1000 times
for _ in range(1000):
suffix = user.get_random_alias_suffix()
email = f"{prefix}.{suffix}@{FIRST_ALIAS_DOMAIN}"
email = f"{prefix}.{suffix}@{config.FIRST_ALIAS_DOMAIN}"
if not cls.get_by(email=email) and not DeletedAlias.get_by(email=email):
break
@ -1494,22 +1487,6 @@ class Alias(Base, ModelMixin):
else:
return self.user.email
def unsubscribe_link(self, contact: Optional["Contact"] = None) -> (str, bool):
"""
return the unsubscribe link along with whether this is via email (mailto:) or Http POST
The mailto: method is preferred
"""
if contact:
if UNSUBSCRIBER:
return f"mailto:{UNSUBSCRIBER}?subject={contact.id}_", True
else:
return f"{URL}/dashboard/block_contact/{contact.id}", False
else:
if UNSUBSCRIBER:
return f"mailto:{UNSUBSCRIBER}?subject={self.id}=", True
else:
return f"{URL}/dashboard/unsubscribe/{self.id}", False
def __repr__(self):
return f"<Alias {self.id} {self.email}>"
@ -1580,10 +1557,12 @@ class ClientUser(Base, ModelMixin):
elif scope == Scope.AVATAR_URL:
if self.user.profile_picture_id:
if self.default_avatar:
res[Scope.AVATAR_URL.value] = URL + "/static/default-avatar.png"
res[Scope.AVATAR_URL.value] = (
config.URL + "/static/default-avatar.png"
)
else:
res[Scope.AVATAR_URL.value] = self.user.profile_picture.get_url(
AVATAR_URL_EXPIRATION
config.AVATAR_URL_EXPIRATION
)
else:
res[Scope.AVATAR_URL.value] = None
@ -1680,7 +1659,7 @@ class Contact(Base, ModelMixin):
website_email = sanitize_email(website_email)
# make sure contact.website_email isn't a reverse alias
if website_email != NOREPLY:
if website_email != config.NOREPLY:
orig_contact = Contact.get_by(reply_email=website_email)
if orig_contact:
raise CannotCreateContactForReverseAlias(str(orig_contact))
@ -1878,7 +1857,7 @@ class EmailLog(Base, ModelMixin):
return "forward"
def get_dashboard_url(self):
return f"{URL}/dashboard/refused_email?highlight_id={self.id}"
return f"{config.URL}/dashboard/refused_email?highlight_id={self.id}"
def __repr__(self):
return f"<EmailLog {self.id}>"
@ -2159,7 +2138,7 @@ class CustomDomain(Base, ModelMixin):
return Alias.filter_by(custom_domain_id=self.id).count()
def get_trash_url(self):
return URL + f"/dashboard/domains/{self.id}/trash"
return config.URL + f"/dashboard/domains/{self.id}/trash"
def get_ownership_dns_txt_value(self):
return f"sl-verification={self.ownership_txt_token}"
@ -2545,7 +2524,7 @@ class Referral(Base, ModelMixin):
return res
def link(self):
return f"{LANDING_PAGE_URL}?slref={self.code}"
return f"{config.LANDING_PAGE_URL}?slref={self.code}"
def __repr__(self):
return f"<Referral {self.code}>"
@ -2690,14 +2669,14 @@ class Notification(Base, ModelMixin):
@staticmethod
def render(template_name, **kwargs) -> str:
templates_dir = os.path.join(ROOT_DIR, "templates")
templates_dir = os.path.join(config.ROOT_DIR, "templates")
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template(template_name)
return template.render(
URL=URL,
LANDING_PAGE_URL=LANDING_PAGE_URL,
URL=config.URL,
LANDING_PAGE_URL=config.LANDING_PAGE_URL,
YEAR=arrow.now().year,
**kwargs,
)
@ -3165,7 +3144,7 @@ class PartnerApiToken(Base, ModelMixin):
def hmac_token(token: str) -> str:
as_str = base64.b64encode(
hmac.new(
PARTNER_API_TOKEN_SECRET.encode("utf-8"),
config.PARTNER_API_TOKEN_SECRET.encode("utf-8"),
token.encode("utf-8"),
hashlib.sha3_256,
).digest()

View File

@ -142,6 +142,8 @@ from app.handler.provider_complaint import (
handle_hotmail_complaint,
handle_yahoo_complaint,
)
from app.handler.unsubscribe_generator import UnsubscribeGenerator
from app.handler.unsubscribe_handler import UnsubscribeHandler
from app.log import LOG, set_message_id
from app.mail_sender import sl_sendmail
from app.message_utils import message_to_bytes
@ -892,16 +894,7 @@ def forward_email_to_mailbox(
add_alias_to_header_if_needed(msg, alias)
# add List-Unsubscribe header
if user.one_click_unsubscribe_block_sender:
unsubscribe_link, via_email = alias.unsubscribe_link(contact)
else:
unsubscribe_link, via_email = alias.unsubscribe_link()
add_or_replace_header(msg, headers.LIST_UNSUBSCRIBE, f"<{unsubscribe_link}>")
if not via_email:
add_or_replace_header(
msg, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
)
msg = UnsubscribeGenerator().add_header_to_message(alias, contact, msg)
add_dkim_signature(msg, EMAIL_DOMAIN)
@ -1748,127 +1741,6 @@ def is_bounce(envelope: Envelope, msg: Message):
)
def handle_unsubscribe(envelope: Envelope, msg: Message) -> str:
"""return the SMTP status"""
# format: alias_id:
subject = msg[headers.SUBJECT]
alias, contact = None, None
try:
# subject has the format {alias.id}=
if subject.endswith("="):
alias_id = int(subject[:-1])
alias = Alias.get(alias_id)
# {contact.id}_
elif subject.endswith("_"):
contact_id = int(subject[:-1])
contact = Contact.get(contact_id)
if contact:
alias = contact.alias
# {user.id}*
elif subject.endswith("*"):
user_id = int(subject[:-1])
return handle_unsubscribe_user(user_id, envelope.mail_from)
# some email providers might strip off the = suffix
else:
alias_id = int(subject)
alias = Alias.get(alias_id)
except Exception:
LOG.w("Wrong format subject %s", msg[headers.SUBJECT])
return status.E507
if not alias:
LOG.w("Cannot get alias from subject %s", subject)
return status.E508
mail_from = envelope.mail_from
# Only alias's owning mailbox can send the unsubscribe request
mailbox = get_mailbox_from_mail_from(mail_from, alias)
if not mailbox:
LOG.d(
"%s cannot disable alias %s. Alias authorized addresses:%s",
envelope.mail_from,
alias,
alias.authorized_addresses,
)
return status.E509
user = alias.user
if contact:
contact.block_forward = True
Session.commit()
unblock_contact_url = (
URL
+ f"/dashboard/alias_contact_manager/{alias.id}?highlight_contact_id={contact.id}"
)
for mailbox in alias.mailboxes:
send_email(
mailbox.email,
f"Emails from {contact.website_email} to {alias.email} are now blocked",
render(
"transactional/unsubscribe-block-contact.txt.jinja2",
user=user,
alias=alias,
contact=contact,
unblock_contact_url=unblock_contact_url,
),
)
else:
alias.enabled = False
Session.commit()
enable_alias_url = URL + f"/dashboard/?highlight_alias_id={alias.id}"
for mailbox in alias.mailboxes:
send_email(
mailbox.email,
f"Alias {alias.email} has been disabled successfully",
render(
"transactional/unsubscribe-disable-alias.txt",
user=user,
alias=alias.email,
enable_alias_url=enable_alias_url,
),
render(
"transactional/unsubscribe-disable-alias.html",
user=user,
alias=alias.email,
enable_alias_url=enable_alias_url,
),
)
return status.E202
def handle_unsubscribe_user(user_id: int, mail_from: str) -> str:
"""return the SMTP status"""
user = User.get(user_id)
if not user:
LOG.w("No such user %s %s", user_id, mail_from)
return status.E510
if mail_from != user.email:
LOG.w("Unauthorized mail_from %s %s", user, mail_from)
return status.E511
user.notification = False
Session.commit()
send_email(
user.email,
"You have been unsubscribed from SimpleLogin newsletter",
render(
"transactional/unsubscribe-newsletter.txt",
user=user,
),
render(
"transactional/unsubscribe-newsletter.html",
user=user,
),
)
return status.E202
def handle_transactional_bounce(
envelope: Envelope, msg, rcpt_to, transactional_id=None
):
@ -2066,7 +1938,7 @@ def handle(envelope: Envelope, msg: Message) -> str:
# unsubscribe request
if UNSUBSCRIBER and (rcpt_tos == [UNSUBSCRIBER] or rcpt_tos == [OLD_UNSUBSCRIBER]):
LOG.d("Handle unsubscribe request from %s", mail_from)
return handle_unsubscribe(envelope, msg)
return UnsubscribeHandler().handle_unsubscribe_from_message(envelope, msg)
# region mail sent to VERP
verp_info = get_verp_info_from_email(rcpt_tos[0])

View File

@ -5,7 +5,7 @@ Hi
{{ contact.website_email }} can no longer send emails to {{ alias.email }}
{{ contact.website_email }} is blocked thanks to the "One-click unsubscribe provided by your mailbox service.
{{ contact.website_email }} is blocked thanks to the "One-click unsubscribe" provided by your mailbox service.
When you click on this button on a forwarded email, the sender will be automatically blocked.

View File

@ -62,7 +62,7 @@ def test_create_directory_in_trash(flask_client):
def test_create_directory_out_of_quota(flask_client):
user = login(flask_client)
for i in range(MAX_NB_DIRECTORY):
for i in range(MAX_NB_DIRECTORY - Directory.count()):
Directory.create(name=f"test{i}", user_id=user.id, commit=True)
assert Directory.count() == MAX_NB_DIRECTORY

View File

@ -0,0 +1,84 @@
import pytest
from app import config
from app.handler.unsubscribe_encoder import (
UnsubscribeData,
UnsubscribeAction,
UnsubscribeEncoder,
)
legacy_subject_test_data = [
("3=", UnsubscribeData(UnsubscribeAction.DisableAlias, 3)),
("438_", UnsubscribeData(UnsubscribeAction.DisableContact, 438)),
("4325*", UnsubscribeData(UnsubscribeAction.UnsubscribeNewsletter, 4325)),
]
@pytest.mark.parametrize("expected_subject, expected_deco", legacy_subject_test_data)
def test_legacy_unsub_subject(expected_subject, expected_deco):
info = UnsubscribeEncoder.decode_subject(expected_subject)
assert expected_deco == info
subject = UnsubscribeEncoder.encode_subject(
expected_deco.action, expected_deco.data
)
assert expected_subject == subject
legacy_url_test_data = [
(
f"{config.URL}/dashboard/unsubscribe/3",
UnsubscribeData(UnsubscribeAction.DisableAlias, 3),
),
(
f"{config.URL}/dashboard/block_contact/5",
UnsubscribeData(UnsubscribeAction.DisableContact, 5),
),
]
@pytest.mark.parametrize("expected_url, unsub_data", legacy_url_test_data)
def test_encode_decode_unsub_subject(expected_url, unsub_data):
url = UnsubscribeEncoder.encode_url(unsub_data.action, unsub_data.data)
assert expected_url == url
legacy_mail_or_link_test_data = [
(
f"{config.URL}/dashboard/unsubscribe/3",
False,
UnsubscribeData(UnsubscribeAction.DisableAlias, 3),
),
(
"mailto:me@nowhere.net?subject=9=",
True,
UnsubscribeData(UnsubscribeAction.DisableAlias, 9),
),
(
f"{config.URL}/dashboard/block_contact/8",
False,
UnsubscribeData(UnsubscribeAction.DisableContact, 8),
),
(
"mailto:me@nowhere.net?subject=8_",
True,
UnsubscribeData(UnsubscribeAction.DisableContact, 8),
),
(
"mailto:me@nowhere.net?subject=83*",
True,
UnsubscribeData(UnsubscribeAction.UnsubscribeNewsletter, 83),
),
]
@pytest.mark.parametrize(
"expected_link, via_mail, unsub_data", legacy_mail_or_link_test_data
)
def test_encode_legacy_link(expected_link, via_mail, unsub_data):
if via_mail:
config.UNSUBSCRIBER = "me@nowhere.net"
else:
config.UNSUBSCRIBER = None
link_info = UnsubscribeEncoder.encode(unsub_data.action, unsub_data.data)
assert via_mail == link_info.via_email
assert expected_link == link_info.link

View File

@ -0,0 +1,126 @@
from email.message import Message
from typing import Iterable
import pytest
from app import config
from app.db import Session
from app.email import headers
from app.handler.unsubscribe_generator import UnsubscribeGenerator
from app.models import Alias, Contact
from tests.utils import create_new_user
TEST_UNSUB_EMAIL = "unsub@sl.com"
def generate_sl_unsub_block_sender_data() -> Iterable:
user = create_new_user()
user.one_click_unsubscribe_block_sender = True
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email="contact@example.com",
reply_email="rep@sl.local",
commit=True,
)
yield (
alias.id,
contact.id,
True,
"<https://lol.com>, <mailto:somewhere@not.net>",
f"<mailto:{TEST_UNSUB_EMAIL}?subject={contact.id}_>",
)
yield (
alias.id,
contact.id,
False,
"<https://lol.com>, <mailto:somewhere@not.net>",
f"<{config.URL}/dashboard/block_contact/{contact.id}>",
)
yield (
alias.id,
contact.id,
False,
None,
f"<{config.URL}/dashboard/block_contact/{contact.id}>",
)
@pytest.mark.parametrize(
"alias_id, contact_id, unsub_via_mail, original_header, expected_header",
generate_sl_unsub_block_sender_data(),
)
def test_sl_unsub_block_sender_data(
alias_id, contact_id, unsub_via_mail, original_header, expected_header
):
alias = Alias.get(alias_id)
contact = Contact.get(contact_id)
config.UNSUBSCRIBER = TEST_UNSUB_EMAIL if unsub_via_mail else None
message = Message()
message[headers.LIST_UNSUBSCRIBE] = original_header
message = UnsubscribeGenerator().add_header_to_message(alias, contact, message)
assert expected_header == message[headers.LIST_UNSUBSCRIBE]
if not expected_header or expected_header.find("<http") == -1:
assert message[headers.LIST_UNSUBSCRIBE_POST] is None
else:
assert "List-Unsubscribe=One-Click" == message[headers.LIST_UNSUBSCRIBE_POST]
def generate_sl_unsub_not_block_sender_data() -> Iterable:
user = create_new_user()
user.one_click_unsubscribe_block_sender = False
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email="contact@example.com",
reply_email="rep@sl.local",
commit=True,
)
yield (
alias.id,
contact.id,
True,
"<https://lol.com>, <mailto:somewhere@not.net>",
f"<mailto:{TEST_UNSUB_EMAIL}?subject={alias.id}=>",
)
yield (
alias.id,
contact.id,
False,
"<https://lol.com>, <mailto:somewhere@not.net>",
f"<{config.URL}/dashboard/unsubscribe/{alias.id}>",
)
yield (
alias.id,
contact.id,
False,
None,
f"<{config.URL}/dashboard/encoded_unsubscribe?request={alias.id}>",
)
@pytest.mark.parametrize(
"alias_id, contact_id, unsub_via_mail, original_header, expected_header",
generate_sl_unsub_block_sender_data(),
)
def test_sl_unsub_not_block_sender_data(
alias_id, contact_id, unsub_via_mail, original_header, expected_header
):
alias = Alias.get(alias_id)
contact = Contact.get(contact_id)
config.UNSUBSCRIBER = TEST_UNSUB_EMAIL if unsub_via_mail else None
message = Message()
message[headers.LIST_UNSUBSCRIBE] = original_header
message = UnsubscribeGenerator().add_header_to_message(alias, contact, message)
assert expected_header == message[headers.LIST_UNSUBSCRIBE]
if not expected_header or expected_header.find("<http") == -1:
assert message[headers.LIST_UNSUBSCRIBE_POST] is None
else:
assert "List-Unsubscribe=One-Click" == message[headers.LIST_UNSUBSCRIBE_POST]

View File

@ -0,0 +1,55 @@
from email.message import Message
from random import random
from typing import Iterable
from aiosmtpd.smtp import Envelope
from app.db import Session
from app.email import headers, status
from app.handler.unsubscribe_handler import (
UnsubscribeHandler,
)
from app.mail_sender import mail_sender
from app.models import Alias, Contact, User
from tests.utils import create_new_user
def test_unsub_email_old_subject() -> Iterable:
mail_sender.store_emails_instead_of_sending()
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email="contact@example.com",
reply_email=f"{random()}@sl.local",
block_forward=False,
commit=True,
)
envelope = Envelope()
envelope.mail_from = user.email
# Disable alias
message = Message()
message[headers.SUBJECT] = f"{alias.id}="
mail_sender.purge_stored_emails()
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert not Alias.get(alias.id).enabled
assert 1 == len(mail_sender.get_stored_emails())
# Disable contact
message = Message()
message[headers.SUBJECT] = f"{contact.id}_"
mail_sender.purge_stored_emails()
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert Contact.get(contact.id).block_forward
assert 1 == len(mail_sender.get_stored_emails())
# Disable newsletter
message = Message()
message[headers.SUBJECT] = f"{user.id}*"
mail_sender.purge_stored_emails()
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert not User.get(user.id).notification
assert 1 == len(mail_sender.get_stored_emails())