Allow users to keep the original unsub behaviour (#1148)

* Feature: Preserve original unsubscribe request

* Updated tests

* Updated settings

* PR comments

* reduced prefix length

* Include migrate users for new unsub behaviour

* PR comments

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
Adrià Casajús 2022-07-19 17:25:21 +02:00 committed by GitHub
parent 750b6f9038
commit f3d47a1eaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 732 additions and 112 deletions

View File

@ -167,6 +167,7 @@ if not FLASK_SECRET:
SESSION_COOKIE_NAME = "slapp"
MAILBOX_SECRET = FLASK_SECRET + "mailbox"
CUSTOM_ALIAS_SECRET = FLASK_SECRET + "custom_alias"
UNSUBSCRIBE_SECRET = FLASK_SECRET + "unsub"
# AWS
AWS_REGION = os.environ.get("AWS_REGION") or "eu-west-3"

View File

@ -50,6 +50,7 @@ from app.models import (
AppleSubscription,
PartnerUser,
PartnerSubscription,
UnsubscribeBehaviourEnum,
)
from app.proton.utils import is_connect_with_proton_enabled, get_proton_partner
from app.utils import random_string, sanitize_email
@ -328,11 +329,16 @@ def setting():
flash("Your preference has been updated", "success")
return redirect(url_for("dashboard.setting"))
elif request.form.get("form-name") == "one-click-unsubscribe":
choose = request.form.get("enable")
if choose == "on":
current_user.one_click_unsubscribe_block_sender = True
choose = request.form.get("unsubscribe-behaviour")
if choose == UnsubscribeBehaviourEnum.PreserveOriginal.name:
current_user.unsub_behaviour = UnsubscribeBehaviourEnum.PreserveOriginal
elif choose == UnsubscribeBehaviourEnum.DisableAlias.name:
current_user.unsub_behaviour = UnsubscribeBehaviourEnum.DisableAlias
elif choose == UnsubscribeBehaviourEnum.BlockContact.name:
current_user.unsub_behaviour = UnsubscribeBehaviourEnum.BlockContact
else:
current_user.one_click_unsubscribe_block_sender = False
flash("There was an error. Please try again", "warning")
return redirect(url_for("dashboard.setting"))
Session.commit()
flash("Your preference has been updated", "success")
return redirect(url_for("dashboard.setting"))
@ -397,6 +403,7 @@ def setting():
change_email_form=change_email_form,
pending_email=pending_email,
AliasGeneratorEnum=AliasGeneratorEnum,
UnsubscribeBehaviourEnum=UnsubscribeBehaviourEnum,
manual_sub=manual_sub,
partner_sub=partner_sub,
partner_name=partner_name,

View File

@ -9,10 +9,12 @@ from flask import redirect, url_for, flash, request, render_template
from flask_login import login_required, current_user
from app.dashboard.base import dashboard_bp
from app.handler.unsubscribe_encoder import UnsubscribeAction
from app.handler.unsubscribe_handler import UnsubscribeHandler
from app.models import Alias, Contact
@dashboard_bp.route("/unsubscribe/<alias_id>", methods=["GET", "POST"])
@dashboard_bp.route("/unsubscribe/<int:alias_id>", methods=["GET", "POST"])
@login_required
def unsubscribe(alias_id):
alias = Alias.get(alias_id)
@ -38,7 +40,7 @@ def unsubscribe(alias_id):
return render_template("dashboard/unsubscribe.html", alias=alias.email)
@dashboard_bp.route("/block_contact/<contact_id>", methods=["GET", "POST"])
@dashboard_bp.route("/block_contact/<int:contact_id>", methods=["GET", "POST"])
@login_required
def block_contact(contact_id):
contact = Contact.get(contact_id)
@ -68,3 +70,44 @@ def block_contact(contact_id):
)
else: # ask user confirmation
return render_template("dashboard/block_contact.html", contact=contact)
@dashboard_bp.route("/unsubscribe/encoded/<encoded_request>", methods=["GET"])
@login_required
def encoded_unsubscribe(encoded_request: str):
unsub_data = UnsubscribeHandler().handle_unsubscribe_from_request(
current_user, encoded_request
)
if not unsub_data:
flash(f"Invalid unsubscribe request", "error")
return redirect(url_for("dashboard.index"))
if unsub_data.action == UnsubscribeAction.DisableAlias:
alias = Alias.get(unsub_data.data)
flash(f"Alias {alias.email} has been blocked", "success")
return redirect(url_for("dashboard.index", highlight_alias_id=alias.id))
if unsub_data.action == UnsubscribeAction.DisableContact:
contact = Contact.get(unsub_data.data)
flash(f"Emails sent from {contact.website_email} are now blocked", "success")
return redirect(
url_for(
"dashboard.alias_contact_manager",
alias_id=contact.alias_id,
highlight_contact_id=contact.id,
)
)
if unsub_data.action == UnsubscribeAction.UnsubscribeNewsletter:
flash(f"You've unsubscribed from the newsletter", "success")
return redirect(
url_for(
"dashboard.index",
)
)
if unsub_data.action == UnsubscribeAction.OriginalUnsubscribeMailto:
flash(f"The original unsubscribe request has been forwarded", "success")
return redirect(
url_for(
"dashboard.index",
)
)
return redirect(url_for("dashboard.index"))

View File

@ -1,20 +1,36 @@
import base64
import enum
import hashlib
import json
from dataclasses import dataclass
from typing import Optional
from typing import Optional, Union
import itsdangerous
from app import config
from app.log import LOG
UNSUB_PREFIX = "un"
class UnsubscribeAction(enum.Enum):
UnsubscribeNewsletter = 1
DisableAlias = 2
DisableContact = 3
OriginalUnsubscribeMailto = 4
@dataclass
class UnsubscribeOriginalData:
alias_id: int
recipient: str
subject: str
@dataclass
class UnsubscribeData:
action: UnsubscribeAction
data: int
data: Union[UnsubscribeOriginalData, int]
@dataclass
@ -25,52 +41,109 @@ class UnsubscribeLink:
class UnsubscribeEncoder:
@staticmethod
def encode(action: UnsubscribeAction, data: int) -> UnsubscribeLink:
def encode(
action: UnsubscribeAction, data: Union[int, UnsubscribeOriginalData]
) -> UnsubscribeLink:
if config.UNSUBSCRIBER:
return UnsubscribeLink(UnsubscribeEncoder.encode_mailto(action, data), True)
return UnsubscribeLink(UnsubscribeEncoder.encode_url(action, data), False)
@staticmethod
def encode_subject(action: UnsubscribeAction, data: int) -> str:
if action == UnsubscribeAction.DisableAlias:
return f"{data}="
if action == UnsubscribeAction.DisableContact:
return f"{data}_"
if action == UnsubscribeAction.UnsubscribeNewsletter:
return f"{data}*"
@classmethod
def encode_subject(
cls, action: UnsubscribeAction, data: Union[int, UnsubscribeOriginalData]
) -> str:
if (
action != UnsubscribeAction.OriginalUnsubscribeMailto
and type(data) is not int
):
raise ValueError(f"Data has to be an int for an action of type {action}")
if action == UnsubscribeAction.OriginalUnsubscribeMailto:
if type(data) is not UnsubscribeOriginalData:
raise ValueError(
f"Data has to be an UnsubscribeOriginalData for an action of type {action}"
)
# Initial 0 is the version number. If we need to add support for extra use-cases we can bump up this number
data = (0, data.alias_id, data.recipient, data.subject)
payload = (action.value, data)
serialized_data = (
base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
.rstrip(b"=")
.decode("utf-8")
)
signed_data = cls._get_signer().sign(serialized_data).decode("utf-8")
encoded_request = f"{UNSUB_PREFIX}.{signed_data}"
if len(encoded_request) > 256:
LOG.e("Encoded request is longer than 256 chars")
return encoded_request
@staticmethod
def encode_mailto(action: UnsubscribeAction, data: int) -> str:
def encode_mailto(
action: UnsubscribeAction, data: Union[int, UnsubscribeOriginalData]
) -> str:
subject = UnsubscribeEncoder.encode_subject(action, data)
return f"mailto:{config.UNSUBSCRIBER}?subject={subject}"
@staticmethod
def encode_url(action: UnsubscribeAction, data: int) -> str:
def encode_url(
action: UnsubscribeAction, data: Union[int, UnsubscribeOriginalData]
) -> str:
if action == UnsubscribeAction.DisableAlias:
return f"{config.URL}/dashboard/unsubscribe/{data}"
if action == UnsubscribeAction.DisableContact:
return f"{config.URL}/dashboard/block_contact/{data}"
if action == UnsubscribeAction.UnsubscribeNewsletter:
raise Exception("Cannot encode url to disable newsletter")
if action in (
UnsubscribeAction.UnsubscribeNewsletter,
UnsubscribeAction.OriginalUnsubscribeMailto,
):
encoded = UnsubscribeEncoder.encode_subject(action, data)
return f"{config.URL}/dashboard/unsubscribe/encoded?data={encoded}"
@staticmethod
def decode_subject(data: str) -> Optional[UnsubscribeData]:
def _get_signer() -> itsdangerous.Signer:
return itsdangerous.Signer(
config.UNSUBSCRIBE_SECRET, digest_method=hashlib.sha3_224
)
@classmethod
def decode_subject(cls, data: str) -> Optional[UnsubscribeData]:
if data.find(UNSUB_PREFIX) == -1:
try:
# subject has the format {alias.id}=
if data.endswith("="):
alias_id = int(data[:-1])
return UnsubscribeData(UnsubscribeAction.DisableAlias, alias_id)
# {contact.id}_
elif data.endswith("_"):
contact_id = int(data[:-1])
return UnsubscribeData(UnsubscribeAction.DisableContact, contact_id)
# {user.id}*
elif data.endswith("*"):
user_id = int(data[:-1])
return UnsubscribeData(
UnsubscribeAction.UnsubscribeNewsletter, user_id
)
else:
# some email providers might strip off the = suffix
alias_id = int(data)
return UnsubscribeData(UnsubscribeAction.DisableAlias, alias_id)
except ValueError:
return None
signer = cls._get_signer()
try:
# subject has the format {alias.id}=
if data.endswith("="):
alias_id = int(data[:-1])
return UnsubscribeData(UnsubscribeAction.DisableAlias, alias_id)
# {contact.id}_
elif data.endswith("_"):
contact_id = int(data[:-1])
return UnsubscribeData(UnsubscribeAction.DisableContact, contact_id)
# {user.id}*
elif data.endswith("*"):
user_id = int(data[:-1])
return UnsubscribeData(UnsubscribeAction.UnsubscribeNewsletter, user_id)
else:
# some email providers might strip off the = suffix
alias_id = int(data)
return UnsubscribeData(UnsubscribeAction.DisableAlias, alias_id)
verified_data = signer.unsign(data[len(UNSUB_PREFIX) + 1 :])
except itsdangerous.BadSignature:
return None
try:
padded_data = verified_data + (b"=" * (-len(verified_data) % 4))
payload = json.loads(base64.urlsafe_b64decode(padded_data))
except ValueError:
return None
action = UnsubscribeAction(payload[0])
action_data = payload[1]
if action == UnsubscribeAction.OriginalUnsubscribeMailto:
# Skip version number in action_data[0] for now it's always 0
action_data = UnsubscribeOriginalData(
action_data[1], action_data[2], action_data[3]
)
return UnsubscribeData(action, action_data)

View File

@ -1,30 +1,78 @@
import urllib
from email.message import Message
from app.email import headers
from app.email_utils import add_or_replace_header
from app.email_utils import add_or_replace_header, delete_header
from app.handler.unsubscribe_encoder import (
UnsubscribeEncoder,
UnsubscribeAction,
UnsubscribeData,
UnsubscribeOriginalData,
)
from app.models import Alias, Contact
from app.models import Alias, Contact, UnsubscribeBehaviourEnum
class UnsubscribeGenerator:
def add_header_to_message(
self, alias: Alias, contact: Contact, message: Message
def _generate_header_with_original_behaviour(
self, alias: Alias, message: Message
) -> Message:
"""
Add List-Unsubscribe header
Generate a header that will encode the original unsub request. To do so
1. Look if there's an original List_Unsubscribe headers, otherwise do nothing
2. Header has the form <method1>, <method2>, .. where each method is either
- mailto:s@b.c?subject=something
- http(s)://somewhere.com
3. Check if there are http unsub requests in the header. If there are, reserve them and remove all mailto
methods to avoid leaking the real mailbox. We forward the message with only http(s) methods.
4. If there aren't neither https nor mailto methods, strip the header from the message and that's it.
It could happen if the header is malformed.
5. Encode in our unsub request the first original mail and subject to unsub, and use that as our unsub header.
"""
user = alias.user
if user.one_click_unsubscribe_block_sender:
unsub_link = UnsubscribeEncoder.encode(
UnsubscribeAction.DisableContact, contact.id
unsubscribe_data = message[headers.LIST_UNSUBSCRIBE]
if not unsubscribe_data:
return message
raw_methods = [method.strip() for method in unsubscribe_data.split(",")]
mailto_unsubs = None
other_unsubs = []
for raw_method in raw_methods:
start = raw_method.find("<")
end = raw_method.rfind(">")
if start == -1 or end == -1 or start >= end:
continue
method = raw_method[start + 1 : end]
url_data = urllib.parse.urlparse(method)
if url_data.scheme == "mailto":
query_data = urllib.parse.parse_qs(url_data.query)
mailto_unsubs = (url_data.path, query_data.get("subject", [""])[0])
else:
other_unsubs.append(method)
# If there are non mailto unsubscribe methods, use those in the header
if other_unsubs:
add_or_replace_header(
message,
headers.LIST_UNSUBSCRIBE,
", ".join([f"<{method}>" for method in other_unsubs]),
)
else:
unsub_link = UnsubscribeEncoder.encode(
UnsubscribeAction.DisableAlias, alias.id
add_or_replace_header(
message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
)
return message
if not mailto_unsubs:
message = delete_header(message, headers.LIST_UNSUBSCRIBE)
message = delete_header(message, headers.LIST_UNSUBSCRIBE_POST)
return message
return self._add_unsubscribe_header(
message,
UnsubscribeData(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, mailto_unsubs[0], mailto_unsubs[1]),
),
)
def _add_unsubscribe_header(
self, message: Message, unsub: UnsubscribeData
) -> Message:
unsub_link = UnsubscribeEncoder.encode(unsub.action, unsub.data)
add_or_replace_header(message, headers.LIST_UNSUBSCRIBE, f"<{unsub_link.link}>")
if not unsub_link.via_email:
@ -32,3 +80,19 @@ class UnsubscribeGenerator:
message, headers.LIST_UNSUBSCRIBE_POST, "List-Unsubscribe=One-Click"
)
return message
def add_header_to_message(
self, alias: Alias, contact: Contact, message: Message
) -> Message:
"""
Add List-Unsubscribe header based on the user preference.
"""
unsub_behaviour = alias.user.unsub_behaviour
if unsub_behaviour == UnsubscribeBehaviourEnum.PreserveOriginal:
return self._generate_header_with_original_behaviour(alias, message)
elif unsub_behaviour == UnsubscribeBehaviourEnum.DisableAlias:
unsub = UnsubscribeData(UnsubscribeAction.DisableAlias, alias.id)
return self._add_unsubscribe_header(message, unsub)
else:
unsub = UnsubscribeData(UnsubscribeAction.DisableContact, contact.id)
return self._add_unsubscribe_header(message, unsub)

View File

@ -1,4 +1,5 @@
from email.message import Message
from email.message import Message, EmailMessage
from email.utils import make_msgid, formatdate
from typing import Optional
from aiosmtpd.smtp import Envelope
@ -6,14 +7,30 @@ from aiosmtpd.smtp import Envelope
from app import config
from app.db import Session
from app.email import headers, status
from app.email_utils import send_email, render
from app.email_utils import (
send_email,
render,
get_email_domain_part,
add_dkim_signature,
generate_verp_email,
)
from app.handler.unsubscribe_encoder import (
UnsubscribeData,
UnsubscribeEncoder,
UnsubscribeAction,
UnsubscribeOriginalData,
)
from app.log import LOG
from app.models import Alias, Contact, User, Mailbox
from app.mail_sender import sl_sendmail
from app.models import (
Alias,
Contact,
User,
Mailbox,
TransactionalEmail,
VerpType,
)
from app.utils import sanitize_email
class UnsubscribeHandler:
@ -41,9 +58,34 @@ class UnsubscribeHandler:
return self._disable_contact(unsub_data.data, mailbox.user, mailbox)
elif unsub_data.action == UnsubscribeAction.UnsubscribeNewsletter:
return self._unsubscribe_user_from_newsletter(unsub_data.data, mailbox.user)
elif unsub_data.action == UnsubscribeAction.OriginalUnsubscribeMailto:
return self._unsubscribe_original_behaviour(unsub_data.data, mailbox.user)
else:
raise Exception(f"Unknown unsubscribe action {unsub_data.action}")
def handle_unsubscribe_from_request(
self, user: User, unsub_request: str
) -> Optional[UnsubscribeData]:
unsub_data = UnsubscribeEncoder.decode_subject(unsub_request)
if not unsub_data:
LOG.w("Wrong request %s", unsub_request)
return None
if unsub_data.action == UnsubscribeAction.DisableAlias:
response_code = self._disable_alias(unsub_data.data, user)
elif unsub_data.action == UnsubscribeAction.DisableContact:
response_code = self._disable_contact(unsub_data.data, user)
elif unsub_data.action == UnsubscribeAction.UnsubscribeNewsletter:
response_code = self._unsubscribe_user_from_newsletter(
unsub_data.data, user
)
elif unsub_data.action == UnsubscribeAction.OriginalUnsubscribeMailto:
response_code = self._unsubscribe_original_behaviour(unsub_data.data, user)
else:
raise Exception(f"Unknown unsubscribe action {unsub_data.action}")
if response_code == status.E202:
return unsub_data
return None
def _disable_alias(
self, alias_id: int, user: User, mailbox: Optional[Mailbox] = None
) -> str:
@ -173,3 +215,36 @@ class UnsubscribeHandler:
alias.authorized_addresses,
)
return False
def _unsubscribe_original_behaviour(
self, original_unsub_data: UnsubscribeOriginalData, user: User
) -> str:
alias = Alias.get(original_unsub_data.alias_id)
if not alias:
return status.E508
if alias.user_id != user.id:
return status.E509
email_domain = get_email_domain_part(alias.email)
to_email = sanitize_email(original_unsub_data.recipient)
msg = EmailMessage()
msg[headers.TO] = to_email
msg[headers.SUBJECT] = original_unsub_data.subject
msg[headers.FROM] = alias.email
msg[headers.MESSAGE_ID] = make_msgid(domain=email_domain)
msg[headers.DATE] = formatdate()
msg[headers.CONTENT_TYPE] = "text/plain"
msg[headers.MIME_VERSION] = "1.0"
msg.set_payload("")
add_dkim_signature(msg, email_domain)
transaction = TransactionalEmail.create(email=to_email, commit=True)
sl_sendmail(
generate_verp_email(
VerpType.transactional, transaction.id, sender_domain=email_domain
),
to_email,
msg,
retries=3,
ignore_smtp_error=True,
)
return status.E202

View File

@ -252,6 +252,26 @@ class JobState(EnumE):
error = 3
class UnsubscribeBehaviourEnum(EnumE):
DisableAlias = 0
BlockContact = 1
PreserveOriginal = 2
class IntEnumType(sa.types.TypeDecorator):
impl = sa.Integer
def __init__(self, enumtype, *args, **kwargs):
super().__init__(*args, **kwargs)
self._enum_type = enumtype
def process_bind_param(self, enum_obj, dialect):
return enum_obj.value
def process_result_value(self, enum_value, dialect):
return self._enum_type(enum_value)
class Hibp(Base, ModelMixin):
__tablename__ = "hibp"
name = sa.Column(sa.String(), nullable=False, unique=True, index=True)
@ -493,6 +513,14 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
nullable=False,
)
# Keep original unsub behaviour
unsub_behaviour = sa.Column(
IntEnumType(UnsubscribeBehaviourEnum),
default=UnsubscribeBehaviourEnum.PreserveOriginal,
server_default=str(UnsubscribeBehaviourEnum.PreserveOriginal.value),
nullable=False,
)
@property
def directory_quota(self):
return min(

View File

@ -0,0 +1,31 @@
"""Add unsubscribe behaviour
Revision ID: b0101a66bb77
Revises: bd7d032087b2
Create Date: 2022-07-06 19:52:04.324761
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'b0101a66bb77'
down_revision = 'bd7d032087b2'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('users', sa.Column('unsub_behaviour', sa.Integer(), default=2, server_default='2', nullable=False))
op.execute("UPDATE users SET unsub_behaviour=0 WHERE one_click_unsubscribe_block_sender=false")
op.execute("UPDATE users SET unsub_behaviour=1 WHERE one_click_unsubscribe_block_sender=true")
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('users', 'unsub_behaviour')
# ### end Alembic commands ###

View File

@ -11,7 +11,7 @@
margin-bottom: 3px;
}
.highlighted{
border: solid 2px #5675E2;
border: solid 2px #5675E2;
}
</style>
@ -560,23 +560,33 @@
<a href="https://simplelogin.io/docs/getting-started/one-click-unsubscribe/">
One-click unsubscribe
</a>
button,
clicking on it will disable the alias that receives the emails.
button, clicking on it will allow you to do one of these actions:
<br />
You can choose to block the sender instead of disabling the alias.
<b>Original action:</b> If a message sent to your alias has an unsubscribe header, that same action
will be preserved in the email forwarded to your mailbox.
<br>
<b>Disable alias:</b> The unsubscribe action will disable the alias that received the email and you will stop receiving emails.
<br>
<b>Block contact:</b> The sender of the email won't be allowed to send emails to this alias.
</div>
<form method="post" action="#one-click-unsubscribe-section">
<form method="post"
action="#one-click-unsubscribe-section"
class="form-inline">
<input type="hidden" name="form-name" value="one-click-unsubscribe">
<div class="form-check">
<input type="checkbox"
id="one-click-unsubscribe"
name="enable"
{% if current_user.one_click_unsubscribe_block_sender %} checked{% endif %}
class="form-check-input">
<label for="one-click-unsubscribe">
Block sender instead
</label>
</div>
<select class="form-control mr-sm-2" name="unsubscribe-behaviour">
<option value="{{ UnsubscribeBehaviourEnum.PreserveOriginal.name }}" {% if current_user.unsub_behaviour.value == UnsubscribeBehaviourEnum.PreserveOriginal.value %}
selected="selected" {% endif %}>
Preserve the original unsubscribe request in the original email
</option>
<option value="{{ UnsubscribeBehaviourEnum.DisableAlias.name }}" {% if current_user.unsub_behaviour.value == UnsubscribeBehaviourEnum.DisableAlias.value %}
selected="selected" {% endif %}>
Disable the alias that received the email
</option>
<option value="{{ UnsubscribeBehaviourEnum.BlockContact.name }}" {% if current_user.unsub_behaviour.value == UnsubscribeBehaviourEnum.BlockContact.value %}
selected="selected" {% endif %}>
Block the sender that sent the original email
</option>
</select>
<button type="submit" class="btn btn-outline-primary">
Update
</button>
@ -724,8 +734,8 @@
{% endblock %}
{% block script %}
<script>
let anchor = window.location.hash;
$(anchor).addClass("highlighted")
let anchor = window.location.hash;
$(anchor).addClass("highlighted")
</script>

View File

@ -5,6 +5,7 @@ from app.handler.unsubscribe_encoder import (
UnsubscribeData,
UnsubscribeAction,
UnsubscribeEncoder,
UnsubscribeOriginalData,
)
legacy_subject_test_data = [
@ -17,11 +18,7 @@ legacy_subject_test_data = [
@pytest.mark.parametrize("expected_subject, expected_deco", legacy_subject_test_data)
def test_legacy_unsub_subject(expected_subject, expected_deco):
info = UnsubscribeEncoder.decode_subject(expected_subject)
assert expected_deco == info
subject = UnsubscribeEncoder.encode_subject(
expected_deco.action, expected_deco.data
)
assert expected_subject == subject
assert info == expected_deco
legacy_url_test_data = [
@ -49,7 +46,7 @@ legacy_mail_or_link_test_data = [
UnsubscribeData(UnsubscribeAction.DisableAlias, 3),
),
(
"mailto:me@nowhere.net?subject=9=",
"mailto:me@nowhere.net?subject=un.WzIsIDld.ONeJMiTW6CosJg4PMR1MPcDs-6GWoTOQFMfA2A",
True,
UnsubscribeData(UnsubscribeAction.DisableAlias, 9),
),
@ -59,15 +56,31 @@ legacy_mail_or_link_test_data = [
UnsubscribeData(UnsubscribeAction.DisableContact, 8),
),
(
"mailto:me@nowhere.net?subject=8_",
"mailto:me@nowhere.net?subject=un.WzMsIDhd.eo_Ynk0eNyPtsHXMpTqw7HMFgYmm1Up_wWUc3g",
True,
UnsubscribeData(UnsubscribeAction.DisableContact, 8),
),
(
"mailto:me@nowhere.net?subject=83*",
"mailto:me@nowhere.net?subject=un.WzEsIDgzXQ.NZAWqfpCmLEszwc5nWuQwDSLJ3TXO3rcOe_73Q",
True,
UnsubscribeData(UnsubscribeAction.UnsubscribeNewsletter, 83),
),
(
f"{config.URL}/dashboard/unsubscribe/encoded?data=un.WzQsIFswLCAxLCAiYUBiLmMiLCAic3ViamVjdCJdXQ.aU3T5XNzJIG4LDm6-pqJk4vxxJxpgVYzc9MEFQ",
False,
UnsubscribeData(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(1, "a@b.c", "subject"),
),
),
(
"mailto:me@nowhere.net?subject=un.WzQsIFswLCAxLCAiYUBiLmMiLCAic3ViamVjdCJdXQ.aU3T5XNzJIG4LDm6-pqJk4vxxJxpgVYzc9MEFQ",
True,
UnsubscribeData(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(1, "a@b.c", "subject"),
),
),
]
@ -82,3 +95,22 @@ def test_encode_legacy_link(expected_link, via_mail, unsub_data):
link_info = UnsubscribeEncoder.encode(unsub_data.action, unsub_data.data)
assert via_mail == link_info.via_email
assert expected_link == link_info.link
encode_decode_test_data = [
UnsubscribeData(UnsubscribeAction.DisableContact, 3),
UnsubscribeData(UnsubscribeAction.DisableContact, 10),
UnsubscribeData(UnsubscribeAction.DisableAlias, 101),
UnsubscribeData(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(323, "a@b.com", "some subject goes here"),
),
]
@pytest.mark.parametrize("unsub_data", encode_decode_test_data)
def test_encode_decode_unsub(unsub_data):
encoded = UnsubscribeEncoder.encode_subject(unsub_data.action, unsub_data.data)
decoded = UnsubscribeEncoder.decode_subject(encoded)
assert unsub_data.action == decoded.action
assert unsub_data.data == decoded.data

View File

@ -6,17 +6,22 @@ import pytest
from app import config
from app.db import Session
from app.email import headers
from app.handler.unsubscribe_encoder import (
UnsubscribeAction,
UnsubscribeEncoder,
UnsubscribeOriginalData,
)
from app.handler.unsubscribe_generator import UnsubscribeGenerator
from app.models import Alias, Contact
from app.models import Alias, Contact, UnsubscribeBehaviourEnum
from tests.utils import create_new_user
TEST_UNSUB_EMAIL = "unsub@sl.com"
def generate_sl_unsub_block_sender_data() -> Iterable:
def generate_unsub_block_contact_data() -> Iterable:
user = create_new_user()
user.one_click_unsubscribe_block_sender = True
user.unsub_behaviour = UnsubscribeBehaviourEnum.BlockContact
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
@ -27,12 +32,15 @@ def generate_sl_unsub_block_sender_data() -> Iterable:
commit=True,
)
subject = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.DisableContact, contact.id
)
yield (
alias.id,
contact.id,
True,
"<https://lol.com>, <mailto:somewhere@not.net>",
f"<mailto:{TEST_UNSUB_EMAIL}?subject={contact.id}_>",
f"<mailto:{TEST_UNSUB_EMAIL}?subject={subject}>",
)
yield (
alias.id,
@ -52,9 +60,9 @@ def generate_sl_unsub_block_sender_data() -> Iterable:
@pytest.mark.parametrize(
"alias_id, contact_id, unsub_via_mail, original_header, expected_header",
generate_sl_unsub_block_sender_data(),
generate_unsub_block_contact_data(),
)
def test_sl_unsub_block_sender_data(
def test_unsub_disable_contact(
alias_id, contact_id, unsub_via_mail, original_header, expected_header
):
alias = Alias.get(alias_id)
@ -70,9 +78,9 @@ def test_sl_unsub_block_sender_data(
assert "List-Unsubscribe=One-Click" == message[headers.LIST_UNSUBSCRIBE_POST]
def generate_sl_unsub_not_block_sender_data() -> Iterable:
def generate_unsub_disable_alias_data() -> Iterable:
user = create_new_user()
user.one_click_unsubscribe_block_sender = False
user.unsub_behaviour = UnsubscribeBehaviourEnum.DisableAlias
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
@ -83,12 +91,15 @@ def generate_sl_unsub_not_block_sender_data() -> Iterable:
commit=True,
)
subject = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.DisableAlias, alias.id
)
yield (
alias.id,
contact.id,
True,
"<https://lol.com>, <mailto:somewhere@not.net>",
f"<mailto:{TEST_UNSUB_EMAIL}?subject={alias.id}=>",
f"<mailto:{TEST_UNSUB_EMAIL}?subject={subject}>",
)
yield (
alias.id,
@ -102,15 +113,84 @@ def generate_sl_unsub_not_block_sender_data() -> Iterable:
contact.id,
False,
None,
f"<{config.URL}/dashboard/encoded_unsubscribe?request={alias.id}>",
f"<{config.URL}/dashboard/unsubscribe/{alias.id}>",
)
@pytest.mark.parametrize(
"alias_id, contact_id, unsub_via_mail, original_header, expected_header",
generate_sl_unsub_block_sender_data(),
generate_unsub_disable_alias_data(),
)
def test_sl_unsub_not_block_sender_data(
def test_unsub_disable_alias(
alias_id, contact_id, unsub_via_mail, original_header, expected_header
):
alias = Alias.get(alias_id)
contact = Contact.get(contact_id)
config.UNSUBSCRIBER = TEST_UNSUB_EMAIL if unsub_via_mail else None
message = Message()
message[headers.LIST_UNSUBSCRIBE] = original_header
message = UnsubscribeGenerator().add_header_to_message(alias, contact, message)
assert expected_header == message[headers.LIST_UNSUBSCRIBE]
if not expected_header or expected_header.find("<http") == -1:
assert message[headers.LIST_UNSUBSCRIBE_POST] is None
else:
assert "List-Unsubscribe=One-Click" == message[headers.LIST_UNSUBSCRIBE_POST]
def generate_unsub_preserve_original_data() -> Iterable:
user = create_new_user()
user.unsub_behaviour = UnsubscribeBehaviourEnum.PreserveOriginal
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email="contact@example.com",
reply_email="rep@sl.local",
commit=True,
)
yield (
alias.id,
contact.id,
True,
"<https://lol.com>, <mailto:somewhere@not.net>",
"<https://lol.com>",
)
yield (
alias.id,
contact.id,
False,
"<https://lol.com>, <mailto:somewhere@not.net>",
"<https://lol.com>",
)
unsub_data = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, "test@test.com", "hello"),
)
yield (
alias.id,
contact.id,
True,
"<mailto:test@test.com?subject=hello>",
f"<mailto:{TEST_UNSUB_EMAIL}?subject={unsub_data}>",
)
yield (
alias.id,
contact.id,
False,
"<mailto:test@test.com?subject=hello>",
f"<{config.URL}/dashboard/unsubscribe/encoded?data={unsub_data}>",
)
yield (alias.id, contact.id, True, None, None)
yield (alias.id, contact.id, False, None, None)
@pytest.mark.parametrize(
"alias_id, contact_id, unsub_via_mail, original_header, expected_header",
generate_unsub_preserve_original_data(),
)
def test_unsub_preserve_original(
alias_id, contact_id, unsub_via_mail, original_header, expected_header
):
alias = Alias.get(alias_id)

View File

@ -1,21 +1,47 @@
from email.message import Message
from random import random
from typing import Iterable
from aiosmtpd.smtp import Envelope
from flask import url_for
from app.db import Session
from app.email import headers, status
from app.email_utils import parse_full_address
from app.handler.unsubscribe_encoder import (
UnsubscribeEncoder,
UnsubscribeAction,
UnsubscribeOriginalData,
)
from app.handler.unsubscribe_handler import (
UnsubscribeHandler,
)
from app.mail_sender import mail_sender
from app.models import Alias, Contact, User
from tests.utils import create_new_user
from tests.utils import create_new_user, login
def test_unsub_email_old_subject() -> Iterable:
mail_sender.store_emails_instead_of_sending()
def _get_envelope_and_message(user: User, subject: str) -> (Envelope, Message):
envelope = Envelope()
envelope.mail_from = user.email
message = Message()
message[headers.SUBJECT] = subject
return envelope, message
@mail_sender.store_emails_test_decorator
def test_old_subject_disable_alias():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
envelope, message = _get_envelope_and_message(user, f"{alias.id}=")
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert not Alias.get(alias.id).enabled
assert 1 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_old_subject_block_contact():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
@ -27,29 +53,179 @@ def test_unsub_email_old_subject() -> Iterable:
block_forward=False,
commit=True,
)
envelope = Envelope()
envelope.mail_from = user.email
# Disable alias
message = Message()
message[headers.SUBJECT] = f"{alias.id}="
mail_sender.purge_stored_emails()
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert not Alias.get(alias.id).enabled
assert 1 == len(mail_sender.get_stored_emails())
# Disable contact
message = Message()
message[headers.SUBJECT] = f"{contact.id}_"
mail_sender.purge_stored_emails()
envelope, message = _get_envelope_and_message(user, f"{contact.id}_")
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert Contact.get(contact.id).block_forward
assert 1 == len(mail_sender.get_stored_emails())
# Disable newsletter
message = Message()
message[headers.SUBJECT] = f"{user.id}*"
mail_sender.purge_stored_emails()
@mail_sender.store_emails_test_decorator
def test_old_subject_disable_newsletter():
user = create_new_user()
envelope, message = _get_envelope_and_message(user, f"{user.id}*")
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert not User.get(user.id).notification
assert 1 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_new_subject_disable_alias():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
header = UnsubscribeEncoder.encode_subject(UnsubscribeAction.DisableAlias, alias.id)
envelope, message = _get_envelope_and_message(user, header)
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert not Alias.get(alias.id).enabled
assert 1 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_new_subject_block_contact():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email="contact@example.com",
reply_email=f"{random()}@sl.local",
block_forward=False,
commit=True,
)
header = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.DisableContact, contact.id
)
envelope, message = _get_envelope_and_message(user, header)
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert Contact.get(contact.id).block_forward
assert 1 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_new_subject_disable_newsletter():
user = create_new_user()
header = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.UnsubscribeNewsletter, user.id
)
envelope, message = _get_envelope_and_message(user, header)
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert not User.get(user.id).notification
assert 1 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_new_subject_original_unsub():
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
envelope = Envelope()
envelope.mail_from = user.email
message = Message()
original_recipient = f"{random()}@out.com"
original_subject = f"Unsubsomehow{random()}"
message[headers.SUBJECT] = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, original_recipient, original_subject),
)
response = UnsubscribeHandler().handle_unsubscribe_from_message(envelope, message)
assert status.E202 == response
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
assert mail_sent.envelope_to == original_recipient
name, address = parse_full_address(mail_sent.msg[headers.FROM])
assert name == ""
assert alias.email == address
assert mail_sent.msg[headers.TO] == original_recipient
assert mail_sent.msg[headers.SUBJECT] == original_subject
@mail_sender.store_emails_test_decorator
def test_request_disable_alias(flask_client):
user = login(flask_client)
alias = Alias.create_new_random(user)
Session.commit()
req_data = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.DisableAlias, alias.id
)
req = flask_client.get(
url_for("dashboard.encoded_unsubscribe", encoded_request=req_data),
follow_redirects=True,
)
assert 200 == req.status_code
assert not Alias.get(alias.id).enabled
assert 1 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_request_disable_contact(flask_client):
user = login(flask_client)
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email="contact@example.com",
reply_email=f"{random()}@sl.local",
block_forward=False,
commit=True,
)
req_data = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.DisableContact, contact.id
)
req = flask_client.get(
url_for("dashboard.encoded_unsubscribe", encoded_request=req_data),
follow_redirects=True,
)
assert 200 == req.status_code
assert Contact.get(contact.id).block_forward
assert 1 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_request_disable_newsletter(flask_client):
user = login(flask_client)
req_data = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.UnsubscribeNewsletter, user.id
)
req = flask_client.get(
url_for("dashboard.encoded_unsubscribe", encoded_request=req_data),
follow_redirects=True,
)
assert 200 == req.status_code
assert not User.get(user.id).notification
assert 1 == len(mail_sender.get_stored_emails())
@mail_sender.store_emails_test_decorator
def test_request_original_unsub(flask_client):
user = login(flask_client)
alias = Alias.create_new_random(user)
Session.commit()
original_recipient = f"{random()}@out.com"
original_subject = f"Unsubsomehow{random()}"
mail_sender.purge_stored_emails()
req_data = UnsubscribeEncoder.encode_subject(
UnsubscribeAction.OriginalUnsubscribeMailto,
UnsubscribeOriginalData(alias.id, original_recipient, original_subject),
)
req = flask_client.get(
url_for("dashboard.encoded_unsubscribe", encoded_request=req_data),
follow_redirects=True,
)
assert 200 == req.status_code
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
assert mail_sent.envelope_to == original_recipient
name, address = parse_full_address(mail_sent.msg[headers.FROM])
assert name == ""
assert alias.email == address
assert mail_sent.msg[headers.TO] == original_recipient
assert mail_sent.msg[headers.SUBJECT] == original_subject