diff --git a/app/alias_utils.py b/app/alias_utils.py index 40cc607c..d4cafa18 100644 --- a/app/alias_utils.py +++ b/app/alias_utils.py @@ -25,6 +25,8 @@ from app.email_utils import ( render, ) from app.errors import AliasInTrashError +from app.events.event_dispatcher import EventDispatcher +from app.events.generated.event_pb2 import AliasDeleted, AliasStatusChange, EventContent from app.log import LOG from app.models import ( Alias, @@ -334,6 +336,10 @@ def delete_alias(alias: Alias, user: User): Alias.filter(Alias.id == alias.id).delete() Session.commit() + EventDispatcher.send_event( + user, EventContent(alias_deleted=AliasDeleted(alias_id=alias.id)) + ) + def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]: """ @@ -459,3 +465,15 @@ def transfer_alias(alias, new_user, new_mailboxes: [Mailbox]): alias.pinned = False Session.commit() + + +def change_alias_status(alias: Alias, enabled: bool, commit: bool = False): + alias.enabled = enabled + + event = AliasStatusChange( + alias_id=alias.id, alias_email=alias.email, enabled=enabled + ) + EventDispatcher.send_event(alias.user, EventContent(alias_status_change=event)) + + if commit: + Session.commit() diff --git a/app/api/views/alias.py b/app/api/views/alias.py index 1b553c85..bd4587cf 100644 --- a/app/api/views/alias.py +++ b/app/api/views/alias.py @@ -184,7 +184,7 @@ def toggle_alias(alias_id): if not alias or alias.user_id != user.id: return jsonify(error="Forbidden"), 403 - alias.enabled = not alias.enabled + alias_utils.change_alias_status(alias, enabled=not alias.enabled) Session.commit() return jsonify(enabled=alias.enabled), 200 diff --git a/app/config.py b/app/config.py index 45ba4a00..2af562be 100644 --- a/app/config.py +++ b/app/config.py @@ -581,3 +581,5 @@ UPCLOUD_PASSWORD = os.environ.get("UPCLOUD_PASSWORD", None) UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None) STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ + +EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None) diff --git a/app/dashboard/views/index.py b/app/dashboard/views/index.py index cadcaeea..a5eaed7f 100644 --- a/app/dashboard/views/index.py +++ b/app/dashboard/views/index.py @@ -146,7 +146,7 @@ def index(): alias_utils.delete_alias(alias, current_user) flash(f"Alias {email} has been deleted", "success") elif request.form.get("form-name") == "disable-alias": - alias.enabled = False + alias_utils.change_alias_status(alias, enabled=False) Session.commit() flash(f"Alias {alias.email} has been disabled", "success") diff --git a/app/dashboard/views/unsubscribe.py b/app/dashboard/views/unsubscribe.py index f3287441..47286f1d 100644 --- a/app/dashboard/views/unsubscribe.py +++ b/app/dashboard/views/unsubscribe.py @@ -8,6 +8,7 @@ from app.db import Session from flask import redirect, url_for, flash, request, render_template from flask_login import login_required, current_user +from app import alias_utils from app.dashboard.base import dashboard_bp from app.handler.unsubscribe_encoder import UnsubscribeAction from app.handler.unsubscribe_handler import UnsubscribeHandler @@ -31,7 +32,7 @@ def unsubscribe(alias_id): # automatic unsubscribe, according to https://tools.ietf.org/html/rfc8058 if request.method == "POST": - alias.enabled = False + alias_utils.change_alias_status(alias, False) flash(f"Alias {alias.email} has been blocked", "success") Session.commit() diff --git a/app/events/__init__.py b/app/events/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/app/events/event_dispatcher.py b/app/events/event_dispatcher.py new file mode 100644 index 00000000..c62bf425 --- /dev/null +++ b/app/events/event_dispatcher.py @@ -0,0 +1,63 @@ +from abc import ABC, abstractmethod +from app import config +from app.db import Session +from app.errors import ProtonPartnerNotSetUp +from app.events.generated import event_pb2 +from app.models import User, PartnerUser, SyncEvent +from app.proton.utils import get_proton_partner +from typing import Optional + +NOTIFICATION_CHANNEL = "simplelogin_sync_events" + + +class Dispatcher(ABC): + @abstractmethod + def send(self, event: bytes): + pass + + +class PostgresDispatcher(Dispatcher): + def send(self, event: bytes): + instance = SyncEvent.create(content=event, flush=True) + Session.execute(f"NOTIFY {NOTIFICATION_CHANNEL}, '{instance.id}';") + + @staticmethod + def get(): + return PostgresDispatcher() + + +class EventDispatcher: + @staticmethod + def send_event( + user: User, + content: event_pb2.EventContent, + dispatcher: Dispatcher = PostgresDispatcher.get(), + skip_if_webhook_missing: bool = True, + ): + if not config.EVENT_WEBHOOK and skip_if_webhook_missing: + return + + partner_user = EventDispatcher.__partner_user(user.id) + if not partner_user: + return + + event = event_pb2.Event( + user_id=user.id, + external_user_id=partner_user.external_user_id, + partner_id=partner_user.partner_id, + content=content, + ) + + serialized = event.SerializeToString() + dispatcher.send(serialized) + + @staticmethod + def __partner_user(user_id: int) -> Optional[PartnerUser]: + # Check if the current user has a partner_id + try: + proton_partner_id = get_proton_partner().id + except ProtonPartnerNotSetUp: + return None + + # It has. Retrieve the information for the PartnerUser + return PartnerUser.get_by(user_id=user_id, partner_id=proton_partner_id) diff --git a/app/events/generated/event_pb2.py b/app/events/generated/event_pb2.py new file mode 100644 index 00000000..23424f30 --- /dev/null +++ b/app/events/generated/event_pb2.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: event.proto +# Protobuf Python Version: 5.26.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x65vent.proto\x12\x12simplelogin_events\"\'\n\x0eUserPlanChange\x12\x15\n\rplan_end_time\x18\x01 \x01(\r\"\r\n\x0bUserDeleted\"Z\n\x0c\x41liasCreated\x12\x10\n\x08\x61lias_id\x18\x01 \x01(\r\x12\x13\n\x0b\x61lias_email\x18\x02 \x01(\t\x12\x12\n\nalias_note\x18\x03 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x04 \x01(\x08\"K\n\x11\x41liasStatusChange\x12\x10\n\x08\x61lias_id\x18\x01 \x01(\r\x12\x13\n\x0b\x61lias_email\x18\x02 \x01(\t\x12\x0f\n\x07\x65nabled\x18\x03 \x01(\x08\"5\n\x0c\x41liasDeleted\x12\x10\n\x08\x61lias_id\x18\x01 \x01(\r\x12\x13\n\x0b\x61lias_email\x18\x02 \x01(\t\"\xce\x02\n\x0c\x45ventContent\x12>\n\x10user_plan_change\x18\x01 \x01(\x0b\x32\".simplelogin_events.UserPlanChangeH\x00\x12\x37\n\x0cuser_deleted\x18\x02 \x01(\x0b\x32\x1f.simplelogin_events.UserDeletedH\x00\x12\x39\n\ralias_created\x18\x03 \x01(\x0b\x32 .simplelogin_events.AliasCreatedH\x00\x12\x44\n\x13\x61lias_status_change\x18\x04 \x01(\x0b\x32%.simplelogin_events.AliasStatusChangeH\x00\x12\x39\n\ralias_deleted\x18\x05 \x01(\x0b\x32 .simplelogin_events.AliasDeletedH\x00\x42\t\n\x07\x63ontent\"y\n\x05\x45vent\x12\x0f\n\x07user_id\x18\x01 \x01(\r\x12\x18\n\x10\x65xternal_user_id\x18\x02 \x01(\t\x12\x12\n\npartner_id\x18\x03 \x01(\r\x12\x31\n\x07\x63ontent\x18\x04 \x01(\x0b\x32 .simplelogin_events.EventContentb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'event_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_USERPLANCHANGE']._serialized_start=35 + _globals['_USERPLANCHANGE']._serialized_end=74 + _globals['_USERDELETED']._serialized_start=76 + _globals['_USERDELETED']._serialized_end=89 + _globals['_ALIASCREATED']._serialized_start=91 + _globals['_ALIASCREATED']._serialized_end=181 + _globals['_ALIASSTATUSCHANGE']._serialized_start=183 + _globals['_ALIASSTATUSCHANGE']._serialized_end=258 + _globals['_ALIASDELETED']._serialized_start=260 + _globals['_ALIASDELETED']._serialized_end=313 + _globals['_EVENTCONTENT']._serialized_start=316 + _globals['_EVENTCONTENT']._serialized_end=650 + _globals['_EVENT']._serialized_start=652 + _globals['_EVENT']._serialized_end=773 +# @@protoc_insertion_point(module_scope) diff --git a/app/events/generated/event_pb2.pyi b/app/events/generated/event_pb2.pyi new file mode 100644 index 00000000..78da50ee --- /dev/null +++ b/app/events/generated/event_pb2.pyi @@ -0,0 +1,71 @@ +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class UserPlanChange(_message.Message): + __slots__ = ("plan_end_time",) + PLAN_END_TIME_FIELD_NUMBER: _ClassVar[int] + plan_end_time: int + def __init__(self, plan_end_time: _Optional[int] = ...) -> None: ... + +class UserDeleted(_message.Message): + __slots__ = () + def __init__(self) -> None: ... + +class AliasCreated(_message.Message): + __slots__ = ("alias_id", "alias_email", "alias_note", "enabled") + ALIAS_ID_FIELD_NUMBER: _ClassVar[int] + ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int] + ALIAS_NOTE_FIELD_NUMBER: _ClassVar[int] + ENABLED_FIELD_NUMBER: _ClassVar[int] + alias_id: int + alias_email: str + alias_note: str + enabled: bool + def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ..., alias_note: _Optional[str] = ..., enabled: bool = ...) -> None: ... + +class AliasStatusChange(_message.Message): + __slots__ = ("alias_id", "alias_email", "enabled") + ALIAS_ID_FIELD_NUMBER: _ClassVar[int] + ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int] + ENABLED_FIELD_NUMBER: _ClassVar[int] + alias_id: int + alias_email: str + enabled: bool + def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ..., enabled: bool = ...) -> None: ... + +class AliasDeleted(_message.Message): + __slots__ = ("alias_id", "alias_email") + ALIAS_ID_FIELD_NUMBER: _ClassVar[int] + ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int] + alias_id: int + alias_email: str + def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ...) -> None: ... + +class EventContent(_message.Message): + __slots__ = ("user_plan_change", "user_deleted", "alias_created", "alias_status_change", "alias_deleted") + USER_PLAN_CHANGE_FIELD_NUMBER: _ClassVar[int] + USER_DELETED_FIELD_NUMBER: _ClassVar[int] + ALIAS_CREATED_FIELD_NUMBER: _ClassVar[int] + ALIAS_STATUS_CHANGE_FIELD_NUMBER: _ClassVar[int] + ALIAS_DELETED_FIELD_NUMBER: _ClassVar[int] + user_plan_change: UserPlanChange + user_deleted: UserDeleted + alias_created: AliasCreated + alias_status_change: AliasStatusChange + alias_deleted: AliasDeleted + def __init__(self, user_plan_change: _Optional[_Union[UserPlanChange, _Mapping]] = ..., user_deleted: _Optional[_Union[UserDeleted, _Mapping]] = ..., alias_created: _Optional[_Union[AliasCreated, _Mapping]] = ..., alias_status_change: _Optional[_Union[AliasStatusChange, _Mapping]] = ..., alias_deleted: _Optional[_Union[AliasDeleted, _Mapping]] = ...) -> None: ... + +class Event(_message.Message): + __slots__ = ("user_id", "external_user_id", "partner_id", "content") + USER_ID_FIELD_NUMBER: _ClassVar[int] + EXTERNAL_USER_ID_FIELD_NUMBER: _ClassVar[int] + PARTNER_ID_FIELD_NUMBER: _ClassVar[int] + CONTENT_FIELD_NUMBER: _ClassVar[int] + user_id: int + external_user_id: str + partner_id: int + content: EventContent + def __init__(self, user_id: _Optional[int] = ..., external_user_id: _Optional[str] = ..., partner_id: _Optional[int] = ..., content: _Optional[_Union[EventContent, _Mapping]] = ...) -> None: ... diff --git a/app/handler/unsubscribe_handler.py b/app/handler/unsubscribe_handler.py index 143cde6e..88e4831d 100644 --- a/app/handler/unsubscribe_handler.py +++ b/app/handler/unsubscribe_handler.py @@ -5,6 +5,7 @@ from typing import Optional from aiosmtpd.smtp import Envelope from app import config +from app import alias_utils from app.db import Session from app.email import headers, status from app.email_utils import ( @@ -101,7 +102,7 @@ class UnsubscribeHandler: mailbox.email, alias ): return status.E509 - alias.enabled = False + alias_utils.change_alias_status(alias, enabled=False) Session.commit() enable_alias_url = config.URL + f"/dashboard/?highlight_alias_id={alias.id}" for mailbox in alias.mailboxes: diff --git a/app/models.py b/app/models.py index ea3defdf..9783312b 100644 --- a/app/models.py +++ b/app/models.py @@ -657,6 +657,21 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle): return user + @classmethod + def delete(cls, obj_id, commit=False): + # Internal import to avoid global import cycles + from app.events.event_dispatcher import EventDispatcher + from app.events.generated.event_pb2 import UserDeleted, EventContent + + user: User = cls.get(obj_id) + EventDispatcher.send_event(user, EventContent(user_deleted=UserDeleted())) + + res = super(User, cls).delete(obj_id) + if commit: + Session.commit() + + return res + def get_active_subscription( self, include_partner_subscription: bool = True ) -> Optional[ @@ -1619,6 +1634,18 @@ class Alias(Base, ModelMixin): Session.add(new_alias) DailyMetric.get_or_create_today_metric().nb_alias += 1 + # Internal import to avoid global import cycles + from app.events.event_dispatcher import EventDispatcher + from app.events.generated.event_pb2 import AliasCreated, EventContent + + event = AliasCreated( + alias_id=new_alias.id, + alias_email=new_alias.email, + alias_note=new_alias.note, + enabled=True, + ) + EventDispatcher.send_event(user, EventContent(alias_created=event)) + if commit: Session.commit() @@ -3648,3 +3675,49 @@ class ApiToCookieToken(Base, ModelMixin): code = secrets.token_urlsafe(32) return super().create(code=code, **kwargs) + + +class SyncEvent(Base, ModelMixin): + """This model holds the events that need to be sent to the webhook""" + + __tablename__ = "sync_event" + content = sa.Column(sa.LargeBinary, unique=False, nullable=False) + taken_time = sa.Column( + ArrowType, default=None, nullable=True, server_default=None, index=True + ) + + __table_args__ = ( + sa.Index("ix_sync_event_created_at", "created_at"), + sa.Index("ix_sync_event_taken_time", "taken_time"), + ) + + def mark_as_taken(self) -> bool: + sql = """ + UPDATE sync_event + SET taken_time = :taken_time + WHERE id = :sync_event_id + AND taken_time IS NULL + """ + args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id} + res = Session.execute(sql, args) + return res.rowcount > 0 + + @classmethod + def get_dead_letter(cls, older_than: Arrow) -> [SyncEvent]: + return ( + SyncEvent.filter( + ( + ( + SyncEvent.taken_time.isnot(None) + & (SyncEvent.taken_time < older_than) + ) + | ( + SyncEvent.taken_time.is_(None) + & (SyncEvent.created_at < older_than) + ) + ) + ) + .order_by(SyncEvent.id) + .limit(100) + .all() + ) diff --git a/app/subscription_webhook.py b/app/subscription_webhook.py index 70a447ed..4b4ce075 100644 --- a/app/subscription_webhook.py +++ b/app/subscription_webhook.py @@ -2,6 +2,8 @@ import requests from requests import RequestException from app import config +from app.events.event_dispatcher import EventDispatcher +from app.events.generated.event_pb2 import EventContent, UserPlanChange from app.log import LOG from app.models import User @@ -31,3 +33,6 @@ def execute_subscription_webhook(user: User): ) except RequestException as e: LOG.error(f"Subscription request exception: {e}") + + event = UserPlanChange(plan_end_time=sl_subscription_end) + EventDispatcher.send_event(user, EventContent(user_plan_change=event)) diff --git a/email_handler.py b/email_handler.py index 303db53a..0319cf4d 100644 --- a/email_handler.py +++ b/email_handler.py @@ -53,7 +53,7 @@ from flanker.addresslib.address import EmailAddress from sqlalchemy.exc import IntegrityError from app import pgp_utils, s3, config -from app.alias_utils import try_auto_create +from app.alias_utils import try_auto_create, change_alias_status from app.config import ( EMAIL_DOMAIN, URL, @@ -1585,7 +1585,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog): LOG.w( f"Disable alias {alias} because {reason}. {alias.mailboxes} {alias.user}. Last contact {contact}" ) - alias.enabled = False + change_alias_status(alias, enabled=False) Notification.create( user_id=user.id, diff --git a/event_listener.py b/event_listener.py new file mode 100644 index 00000000..440f106d --- /dev/null +++ b/event_listener.py @@ -0,0 +1,59 @@ +import argparse +from enum import Enum +from sys import argv, exit + +from app.config import DB_URI +from events.runner import Runner +from events.event_source import DeadLetterEventSource, PostgresEventSource +from events.event_sink import ConsoleEventSink + + +class Mode(Enum): + DEAD_LETTER = "dead_letter" + LISTENER = "listener" + + @staticmethod + def from_str(value: str): + if value == Mode.DEAD_LETTER.value: + return Mode.DEAD_LETTER + elif value == Mode.LISTENER.value: + return Mode.LISTENER + else: + raise ValueError(f"Invalid mode: {value}") + + +def main(mode: Mode, dry_run: bool): + if mode == Mode.DEAD_LETTER: + source = DeadLetterEventSource() + elif mode == Mode.LISTENER: + source = PostgresEventSource(DB_URI) + else: + raise ValueError(f"Invalid mode: {mode}") + + if dry_run: + sink = ConsoleEventSink() + else: + sink = ConsoleEventSink() + + runner = Runner(source=source, sink=sink) + runner.run() + + +def args(): + parser = argparse.ArgumentParser(description="Run event listener") + parser.add_argument( + "mode", + help="Mode to run", + choices=[Mode.DEAD_LETTER.value, Mode.LISTENER.value], + ) + parser.add_argument("--dry-run", help="Dry run mode", action="store_true") + return parser.parse_args() + + +if __name__ == "__main__": + if len(argv) < 2: + print("Invalid usage. Pass 'listener' or 'dead_letter' as argument") + exit(1) + + args = args() + main(Mode.from_str(args.mode), args.dry_run) diff --git a/events/__init__.py b/events/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/events/event_sink.py b/events/event_sink.py new file mode 100644 index 00000000..ce5ae986 --- /dev/null +++ b/events/event_sink.py @@ -0,0 +1,19 @@ +from abc import ABC, abstractmethod +from app.log import LOG +from app.models import SyncEvent + + +class EventSink(ABC): + @abstractmethod + def process(self, event: SyncEvent): + pass + + +class HttpEventSink(EventSink): + def process(self, event: SyncEvent): + pass + + +class ConsoleEventSink(EventSink): + def process(self, event: SyncEvent): + LOG.info(f"Handling event {event.id}") diff --git a/events/event_source.py b/events/event_source.py new file mode 100644 index 00000000..f23ea3f3 --- /dev/null +++ b/events/event_source.py @@ -0,0 +1,75 @@ +import arrow +import newrelic.agent +import psycopg2 +import select + +from abc import ABC, abstractmethod +from app.log import LOG +from app.models import SyncEvent +from app.events.event_dispatcher import NOTIFICATION_CHANNEL +from time import sleep +from typing import Callable, NoReturn + +_DEAD_LETTER_THRESHOLD_MINUTES = 10 +_DEAD_LETTER_INTERVAL_SECONDS = 30 + + +class EventSource(ABC): + @abstractmethod + def run(self, on_event: Callable[[SyncEvent], NoReturn]): + pass + + +class PostgresEventSource(EventSource): + def __init__(self, connection_string: str): + self.__connection = psycopg2.connect(connection_string) + + def run(self, on_event: Callable[[SyncEvent], NoReturn]): + self.__connection.set_isolation_level( + psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT + ) + + cursor = self.__connection.cursor() + cursor.execute(f"LISTEN {NOTIFICATION_CHANNEL};") + + while True: + if select.select([self.__connection], [], [], 5) != ([], [], []): + self.__connection.poll() + while self.__connection.notifies: + notify = self.__connection.notifies.pop(0) + LOG.debug( + f"Got NOTIFY: pid={notify.pid} channel={notify.channel} payload={notify.payload}" + ) + try: + webhook_id = int(notify.payload) + event = SyncEvent.get_by(id=webhook_id) + if event is not None: + on_event(event) + else: + LOG.info(f"Could not find event with id={notify.payload}") + except Exception as e: + LOG.warn(f"Error getting event: {e}") + + +class DeadLetterEventSource(EventSource): + @newrelic.agent.background_task() + def run(self, on_event: Callable[[SyncEvent], NoReturn]): + while True: + try: + threshold = arrow.utcnow().shift( + minutes=-_DEAD_LETTER_THRESHOLD_MINUTES + ) + events = SyncEvent.get_dead_letter(older_than=threshold) + if events: + LOG.info(f"Got {len(events)} dead letter events") + if events: + newrelic.agent.record_custom_metric( + "Custom/dead_letter_events_to_process", len(events) + ) + for event in events: + on_event(event) + else: + LOG.debug("No dead letter events") + sleep(_DEAD_LETTER_INTERVAL_SECONDS) + except Exception as e: + LOG.warn(f"Error getting dead letter event: {e}") diff --git a/events/runner.py b/events/runner.py new file mode 100644 index 00000000..7d0b4b70 --- /dev/null +++ b/events/runner.py @@ -0,0 +1,45 @@ +import arrow +import newrelic.agent + +from app.log import LOG +from app.models import SyncEvent +from events.event_sink import EventSink +from events.event_source import EventSource + + +class Runner: + def __init__(self, source: EventSource, sink: EventSink): + self.__source = source + self.__sink = sink + + def run(self): + self.__source.run(self.__on_event) + + @newrelic.agent.background_task() + def __on_event(self, event: SyncEvent): + try: + can_process = event.mark_as_taken() + if can_process: + event_created_at = event.created_at + start_time = arrow.now() + self.__sink.process(event) + event_id = event.id + SyncEvent.delete(event.id, commit=True) + LOG.info(f"Marked {event_id} as done") + + end_time = arrow.now() - start_time + time_between_taken_and_created = start_time - event_created_at + + newrelic.agent.record_custom_metric("Custom/sync_event_processed", 1) + newrelic.agent.record_custom_metric( + "Custom/sync_event_process_time", end_time.total_seconds() + ) + newrelic.agent.record_custom_metric( + "Custom/sync_event_elapsed_time", + time_between_taken_and_created.total_seconds(), + ) + else: + LOG.info(f"{event.id} was handled by another runner") + except Exception as e: + LOG.warn(f"Exception processing event [id={event.id}]: {e}") + newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1) diff --git a/migrations/versions/2024_051713_06a9a7133445_.py b/migrations/versions/2024_051713_06a9a7133445_.py new file mode 100644 index 00000000..61379e88 --- /dev/null +++ b/migrations/versions/2024_051713_06a9a7133445_.py @@ -0,0 +1,38 @@ +"""Create sync_event table + +Revision ID: 06a9a7133445 +Revises: fa2f19bb4e5a +Create Date: 2024-05-17 13:11:20.402259 + +""" +import sqlalchemy_utils +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '06a9a7133445' +down_revision = 'fa2f19bb4e5a' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('sync_event', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('created_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=False), + sa.Column('updated_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True), + sa.Column('content', sa.LargeBinary(), nullable=False), + sa.Column('taken_time', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_sync_event_created_at'), 'sync_event', ['created_at'], unique=False) + op.create_index(op.f('ix_sync_event_taken_time'), 'sync_event', ['taken_time'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('sync_event') + # ### end Alembic commands ### diff --git a/proto/event.proto b/proto/event.proto new file mode 100644 index 00000000..66d3cf2a --- /dev/null +++ b/proto/event.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package simplelogin_events; + +message UserPlanChange { + uint32 plan_end_time = 1; +} + +message UserDeleted { +} + +message AliasCreated { + uint32 alias_id = 1; + string alias_email = 2; + string alias_note = 3; + bool enabled = 4; +} + +message AliasStatusChange { + uint32 alias_id = 1; + string alias_email = 2; + bool enabled = 3; +} + +message AliasDeleted { + uint32 alias_id = 1; + string alias_email = 2; +} + +message EventContent { + oneof content { + UserPlanChange user_plan_change = 1; + UserDeleted user_deleted = 2; + AliasCreated alias_created = 3; + AliasStatusChange alias_status_change = 4; + AliasDeleted alias_deleted = 5; + } +} + +message Event { + uint32 user_id = 1; + string external_user_id = 2; + uint32 partner_id = 3; + EventContent content = 4; +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 9a6dff89..8abae46d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,13 +14,14 @@ exclude = ''' | build | dist | migrations # migrations/ is generated by alembic + | app/events/generated )/ ) ''' [tool.ruff] ignore-init-module-imports = true -exclude = [".venv", "migrations"] +exclude = [".venv", "migrations", "app/events/generated"] [tool.djlint] indent = 2 diff --git a/scripts/generate-proto-files.sh b/scripts/generate-proto-files.sh new file mode 100755 index 00000000..8d8b43c1 --- /dev/null +++ b/scripts/generate-proto-files.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +set -euxo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" || exit 1; pwd -P)" +REPO_ROOT=$(echo "${SCRIPT_DIR}" | sed 's:scripts::g') + +DEST_DIR="${REPO_ROOT}/app/events/generated" + +PROTOC=${PROTOC:-"protoc"} + +if ! eval "${PROTOC} --version" &> /dev/null ; then + echo "Cannot find $PROTOC" + exit 1 +fi + +rm -rf "${DEST_DIR}" +mkdir -p "${DEST_DIR}" + +pushd $REPO_ROOT || exit 1 + +eval "${PROTOC} --proto_path=proto --python_out=\"${DEST_DIR}\" --pyi_out=\"${DEST_DIR}\" proto/event.proto" + +popd || exit 1 diff --git a/tests/events/__init__.py b/tests/events/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/events/test_event_dispatcher.py b/tests/events/test_event_dispatcher.py new file mode 100644 index 00000000..810e147e --- /dev/null +++ b/tests/events/test_event_dispatcher.py @@ -0,0 +1,56 @@ +from app.events.event_dispatcher import EventDispatcher, Dispatcher +from app.events.generated.event_pb2 import EventContent, UserDeleted +from app.models import PartnerUser, User +from app.proton.utils import get_proton_partner +from tests.utils import create_new_user, random_token +from typing import Tuple + + +class OnMemoryDispatcher(Dispatcher): + def __init__(self): + self.memory = [] + + def send(self, event: bytes): + self.memory.append(event) + + +def _create_unlinked_user() -> User: + return create_new_user() + + +def _create_linked_user() -> Tuple[User, PartnerUser]: + user = _create_unlinked_user() + partner_user = PartnerUser.create( + partner_id=get_proton_partner().id, + user_id=user.id, + external_user_id=random_token(10), + flush=True, + ) + + return user, partner_user + + +def test_event_dispatcher_stores_events(): + dispatcher = OnMemoryDispatcher() + + (user, partner) = _create_linked_user() + content = EventContent(user_deleted=UserDeleted()) + EventDispatcher.send_event(user, content, dispatcher, skip_if_webhook_missing=False) + assert len(dispatcher.memory) == 1 + + content = EventContent(user_deleted=UserDeleted()) + EventDispatcher.send_event(user, content, dispatcher, skip_if_webhook_missing=False) + assert len(dispatcher.memory) == 2 + + +def test_event_dispatcher_does_not_send_event_if_user_not_linked(): + dispatcher = OnMemoryDispatcher() + + user = _create_unlinked_user() + content = EventContent(user_deleted=UserDeleted()) + EventDispatcher.send_event(user, content, dispatcher, skip_if_webhook_missing=False) + assert len(dispatcher.memory) == 0 + + content = EventContent(user_deleted=UserDeleted()) + EventDispatcher.send_event(user, content, dispatcher, skip_if_webhook_missing=False) + assert len(dispatcher.memory) == 0 diff --git a/tests/test_models.py b/tests/test_models.py index 21b72c0b..eaaacb10 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -17,6 +17,7 @@ from app.models import ( Subscription, PlanEnum, PADDLE_SUBSCRIPTION_GRACE_DAYS, + SyncEvent, ) from tests.utils import login, create_new_user, random_token @@ -325,3 +326,51 @@ def test_user_can_send_receive(): user.disabled = False user.delete_on = arrow.now() assert not user.can_send_or_receive() + + +def test_sync_event_dead_letter(): + # remove all SyncEvents before the test + all_events = SyncEvent.all() + for event in all_events: + SyncEvent.delete(event.id, commit=True) + + # create an expired not taken event + e1 = SyncEvent.create( + content=b"content", + created_at=arrow.now().shift(minutes=-15), + taken_time=None, + commit=True, + ) + + # create an expired taken event (but too long ago) + e2 = SyncEvent.create( + content=b"content", + created_at=arrow.now().shift(minutes=-15), + taken_time=arrow.now().shift(minutes=-14), + commit=True, + ) + + # create an expired taken event (but recently) + e3 = SyncEvent.create( + content=b"content", + created_at=arrow.now().shift(minutes=-15), + taken_time=arrow.now().shift(minutes=-1), + commit=True, + ) + + # create a normal event + e4 = SyncEvent.create( + content=b"content", + created_at=arrow.now(), + commit=True, + ) + + # get dead letter events + dead_letter_events = SyncEvent.get_dead_letter( + older_than=arrow.now().shift(minutes=-10) + ) + assert len(dead_letter_events) == 2 + assert e1 in dead_letter_events + assert e2 in dead_letter_events + assert e3 not in dead_letter_events + assert e4 not in dead_letter_events