From 57991f4d6bc61e536eb81e1c5a2a6261b46f5237 Mon Sep 17 00:00:00 2001 From: Carlos Quintana <74399022+cquintana92@users.noreply.github.com> Date: Wed, 21 Aug 2024 12:35:08 +0200 Subject: [PATCH] feat: add command to debug sync events (#2190) --- event_listener.py | 70 ++++++++++++++++++++++++++++++---------- events/event_debugger.py | 43 ++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 17 deletions(-) create mode 100644 events/event_debugger.py diff --git a/event_listener.py b/event_listener.py index 81b6767d..cc601d54 100644 --- a/event_listener.py +++ b/event_listener.py @@ -4,6 +4,7 @@ from sys import argv, exit from app.config import EVENT_LISTENER_DB_URI from app.log import LOG +from events import event_debugger from events.runner import Runner from events.event_source import DeadLetterEventSource, PostgresEventSource from events.event_sink import ConsoleEventSink, HttpEventSink @@ -46,32 +47,67 @@ def main(mode: Mode, dry_run: bool, max_retries: int): runner.run() +def debug_event(event_id: str): + LOG.i(f"Debugging event {event_id}") + try: + event_id_int = int(event_id) + except ValueError: + raise ValueError(f"Invalid event id: {event_id}") + event_debugger.debug_event(event_id_int) + + +def run_event(event_id: str, delete_on_success: bool): + LOG.i(f"Running event {event_id}") + try: + event_id_int = int(event_id) + except ValueError: + raise ValueError(f"Invalid event id: {event_id}") + event_debugger.run_event(event_id_int, delete_on_success) + + def args(): parser = argparse.ArgumentParser(description="Run event listener") - parser.add_argument( - "mode", - help="Mode to run", - choices=[Mode.DEAD_LETTER.value, Mode.LISTENER.value], + subparsers = parser.add_subparsers(dest="command") + + listener_parser = subparsers.add_parser(Mode.LISTENER.value) + listener_parser.add_argument( + "--max-retries", type=int, default=_DEFAULT_MAX_RETRIES ) - parser.add_argument( - "max_retries", - help="Max retries to consider an event as error and not try to process it again", - type=int, - nargs="?", - default=_DEFAULT_MAX_RETRIES, + listener_parser.add_argument("--dry-run", action="store_true") + + dead_letter_parser = subparsers.add_parser(Mode.DEAD_LETTER.value) + dead_letter_parser.add_argument( + "--max-retries", type=int, default=_DEFAULT_MAX_RETRIES ) - parser.add_argument("--dry-run", help="Dry run mode", action="store_true") + dead_letter_parser.add_argument("--dry-run", action="store_true") + + debug_parser = subparsers.add_parser("debug") + debug_parser.add_argument("event_id", help="ID of the event to debug") + + run_parser = subparsers.add_parser("run") + run_parser.add_argument("event_id", help="ID of the event to run") + run_parser.add_argument("--delete-on-success", action="store_true") + return parser.parse_args() if __name__ == "__main__": if len(argv) < 2: - print("Invalid usage. Pass 'listener' or 'dead_letter' as argument") + print("Invalid usage. Pass a valid subcommand as argument") exit(1) args = args() - main( - mode=Mode.from_str(args.mode), - dry_run=args.dry_run, - max_retries=args.max_retries, - ) + + if args.command in [Mode.LISTENER.value, Mode.DEAD_LETTER.value]: + main( + mode=Mode.from_str(args.command), + dry_run=args.dry_run, + max_retries=args.max_retries, + ) + elif args.command == "debug": + debug_event(args.event_id) + elif args.command == "run": + run_event(args.event_id, args.delete_on_success) + else: + print("Invalid command") + exit(1) diff --git a/events/event_debugger.py b/events/event_debugger.py new file mode 100644 index 00000000..5f0c5a23 --- /dev/null +++ b/events/event_debugger.py @@ -0,0 +1,43 @@ +from app.events.generated import event_pb2 +from app.models import SyncEvent +from events.event_sink import HttpEventSink + + +def debug_event(event_id: int): + event = SyncEvent.get_by(id=event_id) + if not event: + print("Event not found") + return + + print(f"Info for event {event_id}") + print(f"- Created at: {event.created_at}") + print(f"- Updated at: {event.updated_at}") + print(f"- Taken time: {event.taken_time}") + print(f"- Retry count: {event.retry_count}") + + print() + print("Event contents") + event_contents = event.content + parsed = event_pb2.Event.FromString(event_contents) + + print(f"- UserID: {parsed.user_id}") + print(f"- ExternalUserID: {parsed.external_user_id}") + print(f"- PartnerID: {parsed.partner_id}") + + content = parsed.content + print(f"Content: {content}") + + +def run_event(event_id: int, delete_on_success: bool = True): + event = SyncEvent.get_by(id=event_id) + if not event: + print("Event not found") + return + + print(f"Processing event {event_id}") + sink = HttpEventSink() + res = sink.process(event) + if res: + print(f"Processed event {event_id}") + if delete_on_success: + SyncEvent.delete(event_id, commit=True)