feat: implement HTTP event sink (#2116)
* feat: implement HTTP event sink * Update events/event_sink.py --------- Co-authored-by: Adrià Casajús <acasajus@users.noreply.github.com>
This commit is contained in:
parent
3e0b7bb369
commit
8eccb05e33
|
@ -583,3 +583,6 @@ UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None)
|
|||
STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ
|
||||
|
||||
EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
|
||||
|
||||
# We want it disabled by default, so only skip if defined
|
||||
EVENT_WEBHOOK_SKIP_VERIFY_SSL = "EVENT_WEBHOOK_SKIP_VERIFY_SSL" in os.environ
|
||||
|
|
|
@ -3,9 +3,10 @@ from enum import Enum
|
|||
from sys import argv, exit
|
||||
|
||||
from app.config import DB_URI
|
||||
from app.log import LOG
|
||||
from events.runner import Runner
|
||||
from events.event_source import DeadLetterEventSource, PostgresEventSource
|
||||
from events.event_sink import ConsoleEventSink
|
||||
from events.event_sink import ConsoleEventSink, HttpEventSink
|
||||
|
||||
|
||||
class Mode(Enum):
|
||||
|
@ -24,16 +25,20 @@ class Mode(Enum):
|
|||
|
||||
def main(mode: Mode, dry_run: bool):
|
||||
if mode == Mode.DEAD_LETTER:
|
||||
LOG.i("Using DeadLetterEventSource")
|
||||
source = DeadLetterEventSource()
|
||||
elif mode == Mode.LISTENER:
|
||||
LOG.i("Using PostgresEventSource")
|
||||
source = PostgresEventSource(DB_URI)
|
||||
else:
|
||||
raise ValueError(f"Invalid mode: {mode}")
|
||||
|
||||
if dry_run:
|
||||
LOG.i("Starting with ConsoleEventSink")
|
||||
sink = ConsoleEventSink()
|
||||
else:
|
||||
sink = ConsoleEventSink()
|
||||
LOG.i("Starting with HttpEventSink")
|
||||
sink = HttpEventSink()
|
||||
|
||||
runner = Runner(source=source, sink=sink)
|
||||
runner.run()
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
import requests
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from app.config import EVENT_WEBHOOK, EVENT_WEBHOOK_SKIP_VERIFY_SSL
|
||||
from app.log import LOG
|
||||
from app.models import SyncEvent
|
||||
|
||||
|
@ -11,7 +14,23 @@ class EventSink(ABC):
|
|||
|
||||
class HttpEventSink(EventSink):
|
||||
def process(self, event: SyncEvent):
|
||||
pass
|
||||
if not EVENT_WEBHOOK:
|
||||
LOG.warning("Skipping sending event because there is no webhook configured")
|
||||
return
|
||||
LOG.info(f"Sending event {event.id} to {EVENT_WEBHOOK}")
|
||||
|
||||
res = requests.post(
|
||||
url=EVENT_WEBHOOK,
|
||||
data=event.content,
|
||||
headers={"Content-Type": "application/x-protobuf"},
|
||||
verify=not EVENT_WEBHOOK_SKIP_VERIFY_SSL,
|
||||
)
|
||||
if res.status_code != 200:
|
||||
LOG.warning(
|
||||
f"Failed to send event to webhook: {res.status_code} {res.text}"
|
||||
)
|
||||
else:
|
||||
LOG.info(f"Event {event.id} sent successfully to webhook")
|
||||
|
||||
|
||||
class ConsoleEventSink(EventSink):
|
||||
|
|
Loading…
Reference in New Issue