-
Notifications
You must be signed in to change notification settings - Fork 0
fix: event replay service #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughEventReplayService replaces ReplayService across providers, API routes, admin service, and tests; EventReplayService now accepts ReplayMetrics (not Settings) and exposes explicit session lifecycle APIs and new result/error types; legacy ReplayService and ReplayConfig progress-callback were removed. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as "Replay API"
participant Service as "EventReplayService"
participant Repo as "ReplayRepository"
participant Kafka as "KafkaProducer"
participant Metrics as "ReplayMetrics"
Client->>API: POST /replay (config)
API->>Service: create_session_from_config(config)
Service->>Repo: persist session (CREATED)
Service->>Metrics: record session_created
Service-->>API: ReplayOperationResult (session_id, CREATED)
API-->>Client: 201 ReplayResponse
Client->>API: POST /replay/{id}/start
API->>Service: start_session(session_id)
Service->>Repo: update session -> RUNNING
Service->>Kafka: stream replay events
Service->>Metrics: record replay_started
Service-->>API: ReplayOperationResult (session_id, RUNNING)
API-->>Client: 200 ReplayResponse
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issues found across 2 files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 333-349: The cleanup_old_sessions implementation uses
total_removed = max(removed_memory, removed_db) which under-reports when there
are in-memory-only sessions; update cleanup_old_sessions to compute the true
removed count by combining both sources (e.g., sum removed_memory and removed_db
or dedupe by collecting deleted IDs from self._sessions and from await
self._repository.delete_old_sessions) and then set total_removed accordingly,
update the info log extra and the returned
CleanupResult(removed_sessions=total_removed, ...) so removed_memory, removed_db
and total_removed reflect the real numbers (use symbols: cleanup_old_sessions,
self._sessions, removed_memory, removed_db,
self._repository.delete_old_sessions, CleanupResult).
- Around line 351-366: The create_session_from_config flow can leave an
in-memory session in self._sessions if _repository.save_session raises; update
create_session_from_config to ensure cleanup: declare session_id = None before
the try, call create_replay_session as you do, and in the except/finally block
remove the orphaned session from self._sessions (e.g., pop(session_id, None))
when persistence fails, then log and raise ReplayOperationError as before;
reference create_session_from_config, create_replay_session, self._sessions,
_repository.save_session, and ReplayOperationError when making the change.
- Around line 367-397: The
start_session/pause_session/resume_session/cancel_session methods call
start_replay/pause_replay/resume_replay/cancel_replay and then call
self._repository.update_session_status but do not check its return value;
update_session_status can return False when the DB record is missing, causing
in-memory vs persisted divergence. Modify each wrapper to verify the boolean
result from update_session_status and if it is False, raise an exception or
return a ReplayOperationResult indicating failure (e.g., status unchanged and an
error message) instead of returning success; use the existing
ReplayOperationResult type and include the session_id, the intended status on
success, and a clear error message on failure so callers observe the persisted
update failure immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 1 file (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/event_replay/replay_service.py">
<violation number="1" location="backend/app/services/event_replay/replay_service.py:270">
P2: Synchronous file writes inside this async method will block the event loop and can degrade replay throughput. Use a non-blocking approach (executor or async file I/O) for file writes.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/services/event_replay/replay_service.py (2)
218-234: Metrics not recorded for exception-handled failures.When an exception is caught at line 220-224 and
skip_errorsisTrue, executioncontinues without reachingrecord_event_replayed. This means exception-based failures won't appear in replay metrics.📊 Proposed fix to record metrics for exception failures
except Exception as e: await self._handle_replay_error(session, event, e) + self._metrics.record_event_replayed( + session.config.replay_type, event.event_type, "failed" + ) if not session.config.skip_errors: raise continue
264-271: Replace synchronous file I/O with async operations.The
open()andf.write()calls block the event loop, stalling event processing. Usesetdefault()for cleaner lock initialization and addaiofilesto handle file I/O asynchronously:🔧 Proposed fix using aiofiles
+import aiofiles + async def _write_event_to_file(self, event: DomainEvent, file_path: str) -> None: - if file_path not in self._file_locks: - self._file_locks[file_path] = asyncio.Lock() + lock = self._file_locks.setdefault(file_path, asyncio.Lock()) line = json.dumps(event.model_dump(), default=str) + "\n" - async with self._file_locks[file_path]: - with open(file_path, "a") as f: - f.write(line) + async with lock: + async with aiofiles.open(file_path, "a") as f: + await f.write(line)Also add
aiofilestobackend/pyproject.tomldependencies.
🧹 Nitpick comments (2)
backend/app/services/event_replay/replay_service.py (2)
355-355: Use structured logging for consistency.Other log statements in this file use
extra={}for structured logging. This line uses f-string formatting.📝 Suggested fix
- self.logger.error(f"Failed to create replay session: {e}") + self.logger.error("Failed to create replay session", extra={"error": str(e)})
390-403: Consider warning-level log or metrics for DB update failures.Swallowing DB update exceptions prevents replay interruption, but silent failures can lead to DB/memory state divergence. A warning-level log or metric increment would improve observability without breaking the replay.
📊 Suggested improvement
except Exception as e: - self.logger.error(f"Failed to update session in database: {e}") + self.logger.warning( + "Failed to update session in database, continuing replay", + extra={"session_id": session.session_id, "error": str(e)}, + )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 issues found across 3 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/event_replay/replay_service.py">
<violation number="1" location="backend/app/services/event_replay/replay_service.py:63">
P2: resume_session persists RUNNING even when the session isn’t PAUSED, which can desync in-memory state from the database and misreport session status. Guard invalid states before updating.</violation>
<violation number="2" location="backend/app/services/event_replay/replay_service.py:63">
P2: pause_session persists PAUSED even when the session isn’t RUNNING, which can desync in-memory state from the database and incorrectly report a paused session. Guard invalid states and only update when RUNNING.</violation>
</file>
<file name="backend/pyproject.toml">
<violation number="1" location="backend/pyproject.toml:127">
P2: Runtime dependencies are consistently pinned to exact versions; using ">=" here makes builds non-deterministic and can introduce unexpected upgrades. Pin aiofiles to a specific version to match the rest of the dependency policy.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 99-110: The cancel_session flow can race with _run_replay which
may still mark the session COMPLETED in memory; update _run_replay to re-check
the session's persisted/in-memory status (via get_session or repository lookup)
right before you set COMPLETED and bail out if status == ReplayStatus.CANCELLED,
and also ensure you explicitly catch asyncio.CancelledError (or allow it to
propagate) instead of relying on except Exception so cancellation is handled
consistently; update references in _run_replay where it sets session.status =
ReplayStatus.COMPLETED and in the final exception handling to reflect this check
and avoid overwriting a CANCELLED status persisted by cancel_session or
repository.update_session_status.
- Around line 79-97: The pause logic currently sets session.status = PAUSED in
pause_session, but _run_replay and _process_batch treat any status != RUNNING as
a loop-break and later mark the session COMPLETED; change the replay loop in
_run_replay (and apply the same pattern in _process_batch) to explicitly handle
ReplayStatus.PAUSED by awaiting (e.g., loop/wait until status becomes RUNNING)
instead of breaking so the function does not fall through to the completion
path; ensure you check session.status (from get_session or the session object)
in the loop and only break/mark COMPLETED when status == CANCELLED or when
there's a true terminal condition, and keep repository updates
(update_session_status) consistent when entering/exiting PAUSED so
resume_session can succeed.
🧹 Nitpick comments (2)
backend/app/services/event_replay/replay_service.py (2)
257-257: Consider batching session updates for better performance.
_update_session_in_dbis called after every event, which could create significant database load during high-throughput replays. Consider updating periodically (e.g., every N events or every few seconds) instead of per-event.
301-312: DB update failures are silently logged.Exceptions in
_update_session_in_dbare caught and logged but not propagated. This prevents replay interruption but means in-memory state may diverge from persisted state without the caller knowing. Consider tracking update failures in session state or metrics.🔧 Suggested enhancement: track update failures
async def _update_session_in_db(self, session: ReplaySessionState) -> None: try: session_update = ReplaySessionUpdate( status=session.status, replayed_events=session.replayed_events, failed_events=session.failed_events, skipped_events=session.skipped_events, completed_at=session.completed_at, ) await self._repository.update_replay_session(session_id=session.session_id, updates=session_update) except Exception as e: self.logger.error(f"Failed to update session in database: {e}") + self._metrics.record_db_update_failure()
…t DB, decrement metrics, pop dicts, log) was duplicated across 3 code paths — try success, except CancelledError, and except Exception. Each path had its own DB persist, timestamp, and log call. After: Each block does only its job: - try: Run the replay loop. Set COMPLETED on success. return on non-RUNNING (no break + post-loop guard). - except CancelledError: One line — set CANCELLED. - except Exception: Set FAILED, append error, record error metric, log the error. - finally: ALL shared finalization — completed_at, duration metric (only for COMPLETED), DB persist, decrement active replays, pop from dicts, single log line with final status. _replay_event — match/case Replaced the if/elif/elif/else chain with match/case on config.target. Single return True after the match block instead of duplicated return True in each branch. _process_batch Changed break to return on non-RUNNING status for consistency with _run_replay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/event_replay/replay_service.py (1)
326-336: Silent error handling masks DB persistence failures.
_update_session_in_dbcatches exceptions and logs them but never re-raises or returns a failure indicator. Callers (like line 283 in_process_batch) have no way to detect or react to persistence failures, which could lead to unnoticed data loss.Additionally, line 336 uses an f-string while other error logs use
extra={}for structured logging.🛡️ Suggested improvement
-async def _update_session_in_db(self, session: ReplaySessionState) -> None: +async def _update_session_in_db(self, session: ReplaySessionState) -> bool: + """Update session in DB. Returns True on success, False on failure.""" try: session_update = ReplaySessionUpdate( status=session.status, replayed_events=session.replayed_events, failed_events=session.failed_events, skipped_events=session.skipped_events, completed_at=session.completed_at, ) await self._repository.update_replay_session(session_id=session.session_id, updates=session_update) + return True except Exception as e: - self.logger.error(f"Failed to update session in database: {e}") + self.logger.error( + "Failed to update session in database", + extra={"session_id": session.session_id, "error": str(e)}, + ) + return False
🧹 Nitpick comments (1)
backend/app/services/event_replay/replay_service.py (1)
279-283: Consider batching DB updates for performance.Line 283 calls
_update_session_in_dbafter every single event. For large replays, this creates significant DB write pressure. Consider updating the DB periodically (e.g., every N events or every few seconds) instead of after each event.♻️ Suggested approach
+# In _process_batch, track events since last persist +events_since_persist = 0 +PERSIST_INTERVAL = 10 # or make configurable for event in batch: # ... existing event processing ... session.last_event_at = event.timestamp - await self._update_session_in_db(session) + events_since_persist += 1 + if events_since_persist >= PERSIST_INTERVAL: + await self._update_session_in_db(session) + events_since_persist = 0 + +# Persist any remaining state after batch +if events_since_persist > 0: + await self._update_session_in_db(session)
|



Summary by cubic
Consolidated replay logic by removing ReplayService and moving session operations into EventReplayService. Injected ReplayMetrics via DI to use the shared metrics instance and decouple setup.
Written for commit c78ae9a. Summary will update on new commits.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
Chores
✏️ Tip: You can customize this high-level summary in your review settings.