mirror of
https://github.com/simple-login/app.git
synced 2024-11-13 23:51:13 +01:00
6862ed3602
* fix: commit transaction after taking event * feat: allow to reconnect to postgres for event listener * chore: log sync events pending to process to metrics * fix: make dead_letter runner able to process events without needing to have lock on the event * chore: close Session after reconnect * refactor: make EventSource emit only events that can be processed
100 lines
3.5 KiB
Python
100 lines
3.5 KiB
Python
import arrow
|
|
import newrelic.agent
|
|
import psycopg2
|
|
import select
|
|
|
|
from abc import ABC, abstractmethod
|
|
from app.log import LOG
|
|
from app.models import SyncEvent
|
|
from app.events.event_dispatcher import NOTIFICATION_CHANNEL
|
|
from time import sleep
|
|
from typing import Callable, NoReturn
|
|
|
|
_DEAD_LETTER_THRESHOLD_MINUTES = 10
|
|
_DEAD_LETTER_INTERVAL_SECONDS = 30
|
|
|
|
_POSTGRES_RECONNECT_INTERVAL_SECONDS = 5
|
|
|
|
|
|
class EventSource(ABC):
|
|
@abstractmethod
|
|
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
|
|
pass
|
|
|
|
|
|
class PostgresEventSource(EventSource):
|
|
def __init__(self, connection_string: str):
|
|
self.__connection_string = connection_string
|
|
self.__connect()
|
|
|
|
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
|
|
while True:
|
|
try:
|
|
self.__listen(on_event)
|
|
except Exception as e:
|
|
LOG.warn(f"Error listening to events: {e}")
|
|
sleep(_POSTGRES_RECONNECT_INTERVAL_SECONDS)
|
|
self.__connect()
|
|
|
|
def __listen(self, on_event: Callable[[SyncEvent], NoReturn]):
|
|
self.__connection.set_isolation_level(
|
|
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
|
|
)
|
|
|
|
cursor = self.__connection.cursor()
|
|
cursor.execute(f"LISTEN {NOTIFICATION_CHANNEL};")
|
|
|
|
while True:
|
|
if select.select([self.__connection], [], [], 5) != ([], [], []):
|
|
self.__connection.poll()
|
|
while self.__connection.notifies:
|
|
notify = self.__connection.notifies.pop(0)
|
|
LOG.debug(
|
|
f"Got NOTIFY: pid={notify.pid} channel={notify.channel} payload={notify.payload}"
|
|
)
|
|
try:
|
|
webhook_id = int(notify.payload)
|
|
event = SyncEvent.get_by(id=webhook_id)
|
|
if event is not None:
|
|
if event.mark_as_taken():
|
|
on_event(event)
|
|
else:
|
|
LOG.info(
|
|
f"Event {event.id} was handled by another runner"
|
|
)
|
|
else:
|
|
LOG.info(f"Could not find event with id={notify.payload}")
|
|
except Exception as e:
|
|
LOG.warn(f"Error getting event: {e}")
|
|
|
|
def __connect(self):
|
|
self.__connection = psycopg2.connect(self.__connection_string)
|
|
|
|
from app.db import Session
|
|
|
|
Session.close()
|
|
|
|
|
|
class DeadLetterEventSource(EventSource):
|
|
@newrelic.agent.background_task()
|
|
def run(self, on_event: Callable[[SyncEvent], NoReturn]):
|
|
while True:
|
|
try:
|
|
threshold = arrow.utcnow().shift(
|
|
minutes=-_DEAD_LETTER_THRESHOLD_MINUTES
|
|
)
|
|
events = SyncEvent.get_dead_letter(older_than=threshold)
|
|
if events:
|
|
LOG.info(f"Got {len(events)} dead letter events")
|
|
if events:
|
|
newrelic.agent.record_custom_metric(
|
|
"Custom/dead_letter_events_to_process", len(events)
|
|
)
|
|
for event in events:
|
|
on_event(event)
|
|
else:
|
|
LOG.debug("No dead letter events")
|
|
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
|
|
except Exception as e:
|
|
LOG.warn(f"Error getting dead letter event: {e}")
|
|
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
|