diff --git a/app/config.py b/app/config.py index 2af562be..4513d60d 100644 --- a/app/config.py +++ b/app/config.py @@ -583,3 +583,6 @@ UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None) STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None) + +# We want it disabled by default, so only skip if defined +EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ diff --git a/event_listener.py b/event_listener.py index 440f106d..93e23edf 100644 --- a/event_listener.py +++ b/event_listener.py @@ -3,9 +3,10 @@ from enum import Enum from sys import argv, exit from app.config import DB_URI +from app.log import LOG from events.runner import Runner from events.event_source import DeadLetterEventSource, PostgresEventSource -from events.event_sink import ConsoleEventSink +from events.event_sink import ConsoleEventSink, HttpEventSink class Mode(Enum): @@ -24,16 +25,20 @@ class Mode(Enum): def main(mode: Mode, dry_run: bool): if mode == Mode.DEAD_LETTER: + LOG.i("Using DeadLetterEventSource") source = DeadLetterEventSource() elif mode == Mode.LISTENER: + LOG.i("Using PostgresEventSource") source = PostgresEventSource(DB_URI) else: raise ValueError(f"Invalid mode: {mode}") if dry_run: + LOG.i("Starting with ConsoleEventSink") sink = ConsoleEventSink() else: - sink = ConsoleEventSink() + LOG.i("Starting with HttpEventSink") + sink = HttpEventSink() runner = Runner(source=source, sink=sink) runner.run() diff --git a/events/event_sink.py b/events/event_sink.py index ce5ae986..a51b9e8c 100644 --- a/events/event_sink.py +++ b/events/event_sink.py @@ -1,4 +1,7 @@ +import requests + from abc import ABC, abstractmethod +from app.config import EVENT_WEBHOOK, EVENT_WEBHOOK_SKIP_VERIFY_SSL from app.log import LOG from app.models import SyncEvent @@ -11,7 +14,23 @@ class EventSink(ABC): class HttpEventSink(EventSink): def process(self, event: SyncEvent): - pass + if not EVENT_WEBHOOK: + LOG.warning("Skipping sending event because there is no webhook configured") + return + LOG.info(f"Sending event {event.id} to {EVENT_WEBHOOK}") + + res = requests.post( + url=EVENT_WEBHOOK, + data=event.content, + headers={"Content-Type": "application/x-protobuf"}, + verify=not EVENT_WEBHOOK_SKIP_VERIFY_SSL, + ) + if res.status_code != 200: + LOG.warning( + f"Failed to send event to webhook: {res.status_code} {res.text}" + ) + else: + LOG.info(f"Event {event.id} sent successfully to webhook") class ConsoleEventSink(EventSink):