app-MAIL-temp/app/handler/unsubscribe_handler.py
Adrià Casajús 21feced342
Refactor unsubscribe handling (#1090)
* Refactor unsubscribe email handling

* MR comments

* Moved all unsub logic to the encoder

* remove unused

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
2022-06-30 11:40:01 +02:00

176 lines
6.2 KiB
Python

from email.message import Message
from typing import Optional
from aiosmtpd.smtp import Envelope
from app import config
from app.db import Session
from app.email import headers, status
from app.email_utils import send_email, render
from app.handler.unsubscribe_encoder import (
UnsubscribeData,
UnsubscribeEncoder,
UnsubscribeAction,
)
from app.log import LOG
from app.models import Alias, Contact, User, Mailbox
class UnsubscribeHandler:
def _extract_unsub_info_from_message(
self, message: Message
) -> Optional[UnsubscribeData]:
header_value = message[headers.SUBJECT]
if not header_value:
return None
return UnsubscribeEncoder.decode_subject(header_value)
def handle_unsubscribe_from_message(self, envelope: Envelope, msg: Message) -> str:
unsub_data = self._extract_unsub_info_from_message(msg)
if not unsub_data:
LOG.w("Wrong format subject %s", msg[headers.SUBJECT])
return status.E507
mailbox = Mailbox.get_by(email=envelope.mail_from)
if not mailbox:
LOG.w("Unknown mailbox %s", msg[headers.SUBJECT])
return status.E507
if unsub_data.action == UnsubscribeAction.DisableAlias:
return self._disable_alias(unsub_data.data, mailbox.user, mailbox)
elif unsub_data.action == UnsubscribeAction.DisableContact:
return self._disable_contact(unsub_data.data, mailbox.user, mailbox)
elif unsub_data.action == UnsubscribeAction.UnsubscribeNewsletter:
return self._unsubscribe_user_from_newsletter(unsub_data.data, mailbox.user)
else:
raise Exception(f"Unknown unsubscribe action {unsub_data.action}")
def _disable_alias(
self, alias_id: int, user: User, mailbox: Optional[Mailbox] = None
) -> str:
alias = Alias.get(alias_id)
if not alias:
return status.E508
if alias.user_id != user.id:
LOG.w("Alias doesn't belong to user")
return status.E508
# Only alias's owning mailbox can send the unsubscribe request
if mailbox and not self._check_email_is_authorized_for_alias(
mailbox.email, alias
):
return status.E509
alias.enabled = False
Session.commit()
enable_alias_url = config.URL + f"/dashboard/?highlight_alias_id={alias.id}"
for mailbox in alias.mailboxes:
send_email(
mailbox.email,
f"Alias {alias.email} has been disabled successfully",
render(
"transactional/unsubscribe-disable-alias.txt",
user=alias.user,
alias=alias.email,
enable_alias_url=enable_alias_url,
),
render(
"transactional/unsubscribe-disable-alias.html",
user=alias.user,
alias=alias.email,
enable_alias_url=enable_alias_url,
),
)
return status.E202
def _disable_contact(
self, contact_id: int, user: User, mailbox: Optional[Mailbox] = None
) -> str:
contact = Contact.get(contact_id)
if not contact:
return status.E508
if contact.user_id != user.id:
LOG.w("Contact doesn't belong to user")
return status.E508
# Only contact's owning mailbox can send the unsubscribe request
if mailbox and not self._check_email_is_authorized_for_alias(
mailbox.email, contact.alias
):
return status.E509
alias = contact.alias
contact.block_forward = True
Session.commit()
unblock_contact_url = (
config.URL
+ f"/dashboard/alias_contact_manager/{alias.id}?highlight_contact_id={contact.id}"
)
for mailbox in alias.mailboxes:
send_email(
mailbox.email,
f"Emails from {contact.website_email} to {alias.email} are now blocked",
render(
"transactional/unsubscribe-block-contact.txt.jinja2",
user=alias.user,
alias=alias,
contact=contact,
unblock_contact_url=unblock_contact_url,
),
)
return status.E202
def _unsubscribe_user_from_newsletter(
self, user_id: int, request_user: User
) -> str:
"""return the SMTP status"""
user = User.get(user_id)
if not user:
LOG.w("No such user %s", user_id)
return status.E510
if user.id != request_user.id:
LOG.w("Unauthorized unsubscribe user from", request_user)
return status.E511
user.notification = False
Session.commit()
send_email(
user.email,
"You have been unsubscribed from SimpleLogin newsletter",
render(
"transactional/unsubscribe-newsletter.txt",
user=user,
),
render(
"transactional/unsubscribe-newsletter.html",
user=user,
),
)
return status.E202
def _check_email_is_authorized_for_alias(
self, email_address: str, alias: Alias
) -> bool:
"""return if the email_address is authorized to unsubscribe from an alias or block a contact
Usually the mail_from=mailbox.email but it can also be one of the authorized address
"""
for mailbox in alias.mailboxes:
if mailbox.email == email_address:
return True
for authorized_address in mailbox.authorized_addresses:
if authorized_address.email == email_address:
LOG.d(
"Found an authorized address for %s %s %s",
alias,
mailbox,
authorized_address,
)
return True
LOG.d(
"%s cannot disable alias %s. Alias authorized addresses:%s",
email_address,
alias,
alias.authorized_addresses,
)
return False