From 71ce0f6253e34ce26335543ece78f59b839c83a6 Mon Sep 17 00:00:00 2001 From: Carlos Quintana <74399022+cquintana92@users.noreply.github.com> Date: Tue, 23 Jul 2024 16:11:16 +0200 Subject: [PATCH] chore: add retry counter to event (#2159) --- app/models.py | 4 ++- event_listener.py | 17 +++++++++-- events/event_source.py | 7 ++++- events/runner.py | 4 +++ ...08955fcab_add_retry_count_to_sync_event.py | 28 +++++++++++++++++++ tests/test_models.py | 12 +++++++- 6 files changed, 67 insertions(+), 5 deletions(-) create mode 100644 migrations/versions/2024_071908_56d08955fcab_add_retry_count_to_sync_event.py diff --git a/app/models.py b/app/models.py index d0acc3a4..4701d483 100644 --- a/app/models.py +++ b/app/models.py @@ -3730,6 +3730,7 @@ class SyncEvent(Base, ModelMixin): taken_time = sa.Column( ArrowType, default=None, nullable=True, server_default=None, index=True ) + retry_count = sa.Column(sa.Integer, default=0, nullable=False, server_default="0") __table_args__ = ( sa.Index("ix_sync_event_created_at", "created_at"), @@ -3751,7 +3752,7 @@ class SyncEvent(Base, ModelMixin): return res.rowcount > 0 @classmethod - def get_dead_letter(cls, older_than: Arrow) -> [SyncEvent]: + def get_dead_letter(cls, older_than: Arrow, max_retries: int) -> [SyncEvent]: return ( SyncEvent.filter( ( @@ -3764,6 +3765,7 @@ class SyncEvent(Base, ModelMixin): & (SyncEvent.created_at < older_than) ) ) + & (SyncEvent.retry_count < max_retries) ) .order_by(SyncEvent.id) .limit(100) diff --git a/event_listener.py b/event_listener.py index 93e23edf..4261384e 100644 --- a/event_listener.py +++ b/event_listener.py @@ -8,6 +8,8 @@ from events.runner import Runner from events.event_source import DeadLetterEventSource, PostgresEventSource from events.event_sink import ConsoleEventSink, HttpEventSink +_DEFAULT_MAX_RETRIES = 100 + class Mode(Enum): DEAD_LETTER = "dead_letter" @@ -23,7 +25,7 @@ class Mode(Enum): raise ValueError(f"Invalid mode: {value}") -def main(mode: Mode, dry_run: bool): +def main(mode: Mode, dry_run: bool, max_retries: int): if mode == Mode.DEAD_LETTER: LOG.i("Using DeadLetterEventSource") source = DeadLetterEventSource() @@ -51,6 +53,13 @@ def args(): help="Mode to run", choices=[Mode.DEAD_LETTER.value, Mode.LISTENER.value], ) + parser.add_argument( + "max_retries", + help="Max retries to consider an event as error and not try to process it again", + type=int, + required=False, + default=_DEFAULT_MAX_RETRIES, + ) parser.add_argument("--dry-run", help="Dry run mode", action="store_true") return parser.parse_args() @@ -61,4 +70,8 @@ if __name__ == "__main__": exit(1) args = args() - main(Mode.from_str(args.mode), args.dry_run) + main( + mode=Mode.from_str(args.mode), + dry_run=args.dry_run, + max_retries=args.max_retries, + ) diff --git a/events/event_source.py b/events/event_source.py index f4f89372..4e812ec1 100644 --- a/events/event_source.py +++ b/events/event_source.py @@ -76,6 +76,9 @@ class PostgresEventSource(EventSource): class DeadLetterEventSource(EventSource): + def __init__(self, max_retries: int): + self.__max_retries = max_retries + @newrelic.agent.background_task() def run(self, on_event: Callable[[SyncEvent], NoReturn]): while True: @@ -83,7 +86,9 @@ class DeadLetterEventSource(EventSource): threshold = arrow.utcnow().shift( minutes=-_DEAD_LETTER_THRESHOLD_MINUTES ) - events = SyncEvent.get_dead_letter(older_than=threshold) + events = SyncEvent.get_dead_letter( + older_than=threshold, max_retries=self.__max_retries + ) if events: LOG.info(f"Got {len(events)} dead letter events") if events: diff --git a/events/runner.py b/events/runner.py index d6f9c2e0..0fe7bff9 100644 --- a/events/runner.py +++ b/events/runner.py @@ -2,6 +2,7 @@ import arrow import newrelic.agent from app.log import LOG +from app.db import Session from app.models import SyncEvent from events.event_sink import EventSink from events.event_source import EventSource @@ -37,6 +38,9 @@ class Runner: "Custom/sync_event_elapsed_time", time_between_taken_and_created.total_seconds(), ) + else: + event.retry_count = event.retry_count + 1 + Session.commit() except Exception as e: LOG.warn(f"Exception processing event [id={event.id}]: {e}") newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1) diff --git a/migrations/versions/2024_071908_56d08955fcab_add_retry_count_to_sync_event.py b/migrations/versions/2024_071908_56d08955fcab_add_retry_count_to_sync_event.py new file mode 100644 index 00000000..a283b876 --- /dev/null +++ b/migrations/versions/2024_071908_56d08955fcab_add_retry_count_to_sync_event.py @@ -0,0 +1,28 @@ +"""add retry count to sync event + +Revision ID: 56d08955fcab +Revises: d608b8e48082 +Create Date: 2024-07-19 08:21:19.979973 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '56d08955fcab' +down_revision = 'd608b8e48082' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('sync_event', sa.Column('retry_count', sa.Integer(), server_default='0', nullable=False, default=0)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('sync_event', 'retry_count') + # ### end Alembic commands ### diff --git a/tests/test_models.py b/tests/test_models.py index eaaacb10..b84488d0 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -365,12 +365,22 @@ def test_sync_event_dead_letter(): commit=True, ) + # create event with too many retries + max_retries = 5 + e5 = SyncEvent.create( + content=b"content", + retry_count=max_retries + 1, + created_at=arrow.now(), + commit=True, + ) + # get dead letter events dead_letter_events = SyncEvent.get_dead_letter( - older_than=arrow.now().shift(minutes=-10) + older_than=arrow.now().shift(minutes=-10), max_retries=max_retries ) assert len(dead_letter_events) == 2 assert e1 in dead_letter_events assert e2 in dead_letter_events assert e3 not in dead_letter_events assert e4 not in dead_letter_events + assert e5 not in dead_letter_events