Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Jan 29, 2026


Summary by cubic

Made IdempotencyManager stateless and DI-managed, and moved idempotency enforcement into an IdempotentEventDispatcher that wraps handlers. Removed async lifecycle, background stats, and consumer/decorator wrappers; metrics now use increment/decrement counters.

  • Refactors

    • Added EventDispatcher._wrap_handler hook; introduced IdempotentEventDispatcher that auto-wraps handlers; removed IdempotentConsumerWrapper and the idempotent_handler decorator.
    • Providers now supply IdempotentEventDispatcher to coordinator, k8s worker, and result processor (saga uses plain dispatcher); consumers use UnifiedConsumer directly.
    • Removed initialize/close and the create_idempotency_manager factory; dropped remove(), get_stats(), IdempotencyStats, and repository aggregate_status_counts; inlined key generation and switched DatabaseMetrics to increment/decrement.
  • Migration

    • Delete calls to manager.initialize()/manager.close() and remove create_idempotency_manager.
    • Replace IdempotentConsumerWrapper/@idempotent_handler with IdempotentEventDispatcher; pass EventDispatcher into services.
    • Swap update_idempotency_keys_active with increment_idempotency_keys/decrement_idempotency_keys; remove uses of remove(), get_stats(), IdempotencyStats, and aggregate_status_counts.

Written for commit bb4aee1. Summary will update on new commits.

Summary by CodeRabbit

  • Refactor

    • Idempotency reworked to a dispatcher-based model; per-handler lifecycle wrappers removed
    • Active idempotency key tracking simplified to explicit increment/decrement; aggregate statistics removed
    • Idempotency manager lifecycle simplified and made synchronous
  • Documentation

    • Architecture docs updated to reflect the dispatcher-based idempotency design
  • Tests

    • Tests adapted for synchronous lifecycle and dispatcher usage; several idempotency-specific tests trimmed or removed

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

📝 Walkthrough

Walkthrough

Reworks idempotency: replaces delta-based metrics with explicit increment/decrement methods, removes lifecycle-managed IdempotencyManager factory and stats aggregation, and introduces IdempotentEventDispatcher that wraps handlers for idempotency instead of per-consumer wrappers. Providers, services, tests, and docs updated accordingly.

Changes

Cohort / File(s) Summary
Metrics API Simplification
backend/app/core/metrics/database.py, backend/tests/unit/core/metrics/test_database_and_dlq_metrics.py
Removed update_idempotency_keys_active(count, prefix) and added increment_idempotency_keys(prefix) / decrement_idempotency_keys(prefix); tests updated to call new methods.
Provider & Worker Wiring
backend/app/core/providers.py, backend/workers/run_k8s_worker.py, backend/workers/run_result_processor.py
Replaced async idempotency manager factory with synchronous construction; DI now wires IdempotentEventDispatcher into services; consumers switched from IdempotentConsumerWrapper to UnifiedConsumer.
Dispatcher / Middleware
backend/app/events/core/dispatcher.py, backend/app/services/idempotency/middleware.py
Added _wrap_handler hook to EventDispatcher; introduced IdempotentEventDispatcher and simplified IdempotentEventHandler; removed decorator-based API and many handler-management utilities.
Idempotency Manager & Repo
backend/app/services/idempotency/idempotency_manager.py, backend/app/services/idempotency/redis_repository.py, backend/app/services/idempotency/__init__.py
Removed lifecycle methods, stats aggregation, and IdempotencyKeyStrategy indirection; inlined key generation; removed aggregate_status_counts() and IdempotencyStats export; adjusted public exports.
Service Consumers / Coordinators
backend/app/services/coordinator/coordinator.py, backend/app/services/result_processor/processor.py, backend/app/services/saga/saga_orchestrator.py
Removed per-consumer idempotency wrapper and idempotency manager lifecycle from start/stop flows; constructors now accept/use dispatcher and UnifiedConsumer directly; startup/shutdown simplified.
Domain Model & Public API
backend/app/domain/idempotency/__init__.py, backend/app/domain/idempotency/models.py, backend/app/services/idempotency/__init__.py
Removed IdempotencyStats dataclass and EXPIRED enum member; updated module exports to reflect dispatcher-centered API.
Tests & Documentation
backend/tests/..., docs/architecture/*
Deleted lifecycle- and aggregation-dependent tests and some decorator tests; fixtures changed to return managers synchronously; tests adjusted to inspect internal handler storage; docs updated to describe IdempotentEventDispatcher and new wiring.

Sequence Diagram(s)

sequenceDiagram
    participant Kafka as Kafka
    participant Consumer as UnifiedConsumer
    participant Dispatcher as IdempotentEventDispatcher
    participant IM as IdempotencyManager
    participant Handler as EventHandler

    Kafka->>Consumer: deliver message
    Consumer->>Dispatcher: dispatch(event)
    Dispatcher->>IM: check_and_reserve(key)
    alt reserved (first time)
        Dispatcher->>Handler: invoke(event)
        Handler-->>Dispatcher: success
        Dispatcher->>IM: mark_completed(key)
    else duplicate
        Dispatcher->>IM: skip/ack duplicate
        Dispatcher-->>Consumer: ack/skip
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I hopped through handlers, wrapped each one,
Swapped lifecycle loops for simpler fun.
Keys now climb and tumble, one by one,
Consumers lean, dispatchers on the run.
A carrot cheers the cleaner sun! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately reflects the main architectural change: removing statefulness from IdempotencyManager and shifting to dependency injection with related test updates.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 13 files

  - EventDispatcher: template method _wrap_handler() (identity by default)
  - IdempotentEventDispatcher(EventDispatcher): overrides _wrap_handler() to create IdempotentEventHandler
  - DI providers create the right subclass, services just accept EventDispatcher (base type)
  - No factories, no closures, no reflection, no nullable
Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 20 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/events/core/dispatcher.py">

<violation number="1" location="backend/app/events/core/dispatcher.py:62">
P2: Wrapping handlers on registration breaks `remove_handler` because it still compares the original handler object. If `_wrap_handler` returns a different callable, handlers become effectively non-removable. Consider storing a mapping from original to wrapped, or adjusting removal logic to account for wrapping.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
backend/app/events/core/dispatcher.py (1)

40-76: Ensure wrapped handlers remain removable.

Line 62/Line 76 now store wrapped handlers, but remove_handler still looks for the original handler and logs handler.__name__. That makes removal unreliable (and can throw if the wrapper lacks __name__). Consider tracking original→wrapped mapping and normalizing logging.

🔧 Suggested fix
 class EventDispatcher:
     def __init__(self, logger: logging.Logger) -> None:
         self.logger = logger
@@
         # Map event types to their handlers
         self._handlers: dict[EventType, list[Callable[[DomainEvent], Awaitable[None]]]] = defaultdict(list)
+        self._wrapped_handlers: dict[EventType, dict[EventHandler, EventHandler]] = defaultdict(dict)
@@
         def decorator(handler: Callable[[T], Awaitable[None]]) -> Callable[[T], Awaitable[None]]:
             self.logger.info(f"Registering handler '{handler.__name__}' for event type '{event_type}'")
-            self._handlers[event_type].append(self._wrap_handler(handler))  # type: ignore[arg-type]
+            wrapped = self._wrap_handler(handler)  # type: ignore[arg-type]
+            self._handlers[event_type].append(wrapped)
+            self._wrapped_handlers[event_type][handler] = wrapped
             return handler
@@
     def register_handler(self, event_type: EventType, handler: EventHandler) -> None:
@@
-        self._handlers[event_type].append(self._wrap_handler(handler))
+        wrapped = self._wrap_handler(handler)
+        self._handlers[event_type].append(wrapped)
+        self._wrapped_handlers[event_type][handler] = wrapped
@@
     def remove_handler(self, event_type: EventType, handler: EventHandler) -> bool:
@@
-        if event_type in self._handlers and handler in self._handlers[event_type]:
-            self._handlers[event_type].remove(handler)
-            self.logger.info(f"Removed handler '{handler.__name__}' for event type '{event_type}'")
+        target = self._wrapped_handlers.get(event_type, {}).get(handler, handler)
+        if event_type in self._handlers and target in self._handlers[event_type]:
+            self._handlers[event_type].remove(target)
+            self._wrapped_handlers.get(event_type, {}).pop(handler, None)
+            name = getattr(handler, "__name__", handler.__class__.__name__)
+            self.logger.info(f"Removed handler '{name}' for event type '{event_type}'")
             # Clean up empty lists
             if not self._handlers[event_type]:
                 del self._handlers[event_type]
             return True
docs/architecture/idempotency.md (1)

126-130: Fix markdown table alignment (MD060).

markdownlint flags a column alignment issue at Line 130 in the “Key Files” table. Please reformat the table so pipes align (or run markdownlint --fix).

🤖 Fix all issues with AI agents
In `@backend/app/services/idempotency/middleware.py`:
- Around line 62-88: IdempotentEventDispatcher's _wrap_handler returns new
IdempotentEventHandler instances so remove_handler fails to find the original
handler; fix by maintaining a mapping of original handlers to their wrapped
counterparts (e.g., self._handler_map keyed by event_type then original handler)
when _wrap_handler wraps a handler, store the mapping, and override
remove_handler to look up the wrapped instance (via
self._handler_map[event_type][original_handler]) and delegate removal to the
parent EventDispatcher (and clean up the mapping entry when removed).

In `@backend/app/services/saga/saga_orchestrator.py`:
- Around line 50-51: The constructor stores an unused idempotency_manager in
SagaOrchestrator and the class instantiates a plain EventDispatcher instead of
the idempotent variant; fix by either (A) switching the dispatcher creation to
IdempotentEventDispatcher and wiring the stored idempotency_manager into that
instantiation so saga events are processed idempotently (match ResultProcessor
behavior), or (B) remove the idempotency_manager parameter and its assignment
from the SagaOrchestrator constructor and corresponding factory if saga
idempotency is not needed; update any references in the class (e.g., where
EventDispatcher is created) accordingly.
🧹 Nitpick comments (2)
backend/app/services/saga/saga_orchestrator.py (1)

141-153: Consider using IdempotentEventDispatcher for consistency with other services.

The saga orchestrator creates a plain EventDispatcher while ResultProcessor and ExecutionCoordinator use IdempotentEventDispatcher. This means saga trigger events (e.g., EXECUTION_REQUESTED) could be processed multiple times if Kafka redelivers them.

If idempotency is intentionally handled elsewhere (e.g., the get_saga_by_execution_and_name check in _start_saga), consider documenting this. Otherwise, consider wiring an IdempotentEventDispatcher here as well.

♻️ Suggested refactor to use IdempotentEventDispatcher
+from app.domain.idempotency import KeyStrategy
+from app.services.idempotency.middleware import IdempotentEventDispatcher

...

-        dispatcher = EventDispatcher(logger=self.logger)
+        dispatcher = IdempotentEventDispatcher(
+            logger=self.logger,
+            idempotency_manager=self._idempotency_manager,
+            key_strategy=KeyStrategy.EVENT_BASED,
+            ttl_seconds=3600,
+        )
backend/app/services/coordinator/coordinator.py (1)

138-138: Stale comment references old wrapping pattern.

The comment "Register handlers with EventDispatcher BEFORE wrapping with idempotency" references the old IdempotentConsumerWrapper approach. With the new dispatcher-based model, idempotency wrapping happens inside IdempotentEventDispatcher._wrap_handler() when handlers are registered, not as a separate step.

📝 Suggested comment update
-        # Register handlers with EventDispatcher BEFORE wrapping with idempotency
+        # Register handlers with EventDispatcher (idempotency applied by dispatcher if using IdempotentEventDispatcher)

  - EventDispatcher: template method _wrap_handler() (identity by default)
  - IdempotentEventDispatcher(EventDispatcher): overrides _wrap_handler() to create IdempotentEventHandler
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@backend/app/events/core/dispatcher.py`:
- Around line 89-92: The dispatch currently calls asyncio.gather(*tasks,
return_exceptions=True) which silences handler failures; change it to preserve
concurrency but surface errors by either removing return_exceptions or by
collecting results and re-raising any exceptions: call asyncio.gather(*tasks) so
gather will raise on the first handler exception, or keep return_exceptions=True
and iterate over the gather results in dispatch (or the method using
self._execute_handler) to detect Exception instances and re-raise (or aggregate)
them so failures from _execute_handler are not silently swallowed.

@sonarqubecloud
Copy link

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 4 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/events/core/dispatcher.py">

<violation number="1" location="backend/app/events/core/dispatcher.py:91">
P2: Removing `return_exceptions=True` means one handler exception will cancel the remaining handlers for the event, which is a behavior regression from running all handlers concurrently. If you still want all handlers to run and only log failures, keep `return_exceptions=True` or handle exceptions per-task.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@backend/app/core/providers.py`:
- Around line 778-783: The current finally block awaits
worker.wait_for_active_creations() before calling await consumer.stop(), so if
wait_for_active_creations() raises the consumer.stop() call is skipped; change
the cleanup to ensure consumer.stop() always runs by invoking
worker.wait_for_active_creations() inside its own try/except (or
try/except/finally) and then calling await consumer.stop() unconditionally
(e.g., separate the two so consumer.stop() runs in the outer finally),
referencing the existing worker.wait_for_active_creations(), consumer.stop(),
and consumer variables to locate and update the cleanup logic.

@HardMax71 HardMax71 merged commit 6deaa02 into main Jan 29, 2026
23 checks passed
@HardMax71 HardMax71 deleted the fix/idempotency branch January 29, 2026 10:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants