Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Jan 29, 2026


Summary by cubic

Enforce CPU and memory via Kubernetes pod requests/limits and simplify scheduling. The coordinator now owns the priority queue and publishes CreatePodCommand; no separate resource or queue manager.

  • Refactors

    • Removed ResourceManager and QueueManager; queue logic lives inside ExecutionCoordinator.
    • Reactive scheduling only; schedule on queue-front or on completion/failure/cancel.
    • Switched event/request priority from int to QueuePriority enum.
    • DI updates: added CoordinatorProvider; run_coordinator resolves UnifiedConsumer for lifecycle.
    • Metrics cleanup: dropped coordinator.processing.time; simplified coordinator metrics.
    • Tests updated: removed resource/queue manager tests; added coordinator queue tests.
  • Docs

    • Updated architecture and worker docs for K8s-enforced limits and a queue-only coordinator.
    • Revised metrics reference and examples; removed resource allocation content and clarified priority handling.

Written for commit 2fe458d. Summary will update on new commits.

Summary by CodeRabbit

  • Refactor

    • Coordinator redesigned to use an in-memory priority queue with per-user limits and event-driven scheduling; coordinator is now stateless and its consumer lifecycle is DI-managed; resource enforcement moved to Kubernetes.
  • New Features

    • Introduced typed queue priorities (enum) across APIs and events.
    • Queue-full and per-user limit handling surfaced via explicit errors/events.
  • Documentation

    • Architecture, components, lifecycle, and metrics docs updated for queue-based scheduling.
  • Tests

    • Added coordinator queue unit tests; removed legacy resource/queue-manager tests.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

📝 Walkthrough

Walkthrough

Coordinator rewired from a resource-centric, polling service to an event-driven, in-memory priority queue: ResourceManager and QueueManager removed; ExecutionCoordinator simplified and accepts injected producer/dispatcher/queue collaborators; DI providers now expose per-service @provide factories and manage the coordinator consumer lifecycle; docs and tests updated to match.

Changes

Cohort / File(s) Summary
Provider Wiring & DI
backend/app/core/providers.py, backend/app/core/container.py
Replaced constructor registration with explicit @provide methods: get_coordinator_dispatcher, get_execution_coordinator, get_coordinator_consumer; added CoordinatorProvider() to container.
Coordinator Implementation
backend/app/services/coordinator/coordinator.py, backend/app/services/coordinator/__init__.py
Major rewrite: ExecutionCoordinator moved to in-process priority queue (per-user limits, stale sweeping, on-demand scheduling); removed external queue/resource/Kafka dependencies; added QueueRejectError and new queue APIs.
Removed Queue & Resource Modules
backend/app/services/coordinator/queue_manager.py (deleted), backend/app/services/coordinator/resource_manager.py (deleted)
Deleted QueueManager and ResourceManager subsystems and their public types/APIs (QueuedExecution, ResourceAllocation, etc.).
Domain Events & Enums
backend/app/domain/enums/execution.py, backend/app/domain/events/typed.py, backend/app/domain/enums/__init__.py
Added QueuePriority enum, exported it, and changed event priority fields from int to QueuePriority (default NORMAL).
Metrics & Instrumentation
backend/app/core/metrics/coordinator.py
Pruned many coordinator metrics; renamed/reshaped API (e.g., record_coordinator_scheduling_duration, update_coordinator_active_executions), added record_coordinator_execution_scheduled, removed processing-time metric.
Runtime / Worker Startup
backend/workers/run_coordinator.py
Replaced manual coordinator instantiation and polling loop with DI-triggered UnifiedConsumer startup and simplified shutdown wait.
Tests: add/remove/adjust
backend/tests/unit/services/coordinator/test_coordinator_queue.py, backend/tests/*, backend/tests/unit/services/coordinator/test_resource_manager.py (removed), backend/tests/unit/services/coordinator/test_queue_manager.py (removed)
Added unit tests for ExecutionCoordinator queue behavior; removed resource- and queue_manager-focused tests; updated tests/fixtures to use QueuePriority.
Docs & Architecture
docs/architecture/*, docs/components/*, docs/operations/metrics-reference.md
Docs updated to reflect reactive, queue-backed scheduling, removal of ResourceManager, simplified config/metrics, and DI-managed consumer lifecycle.
Misc (execution API / fixtures)
backend/app/services/execution_service.py, backend/tests/conftest.py, backend/tests/e2e/*
execute_script and test fixtures updated to accept/use QueuePriority enum (default NORMAL); call sites adjusted accordingly.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application/API
    participant Coord as ExecutionCoordinator
    participant Queue as InProcessQueue
    participant Disp as EventDispatcher
    participant Prod as Producer
    participant Worker as Kubernetes Worker

    Note right of Queue: rgba(200,200,255,0.5)

    App->>Coord: ExecutionRequested(event)
    Coord->>Queue: enqueue(event, priority)
    Queue-->>Coord: enqueued(position)
    alt front_of_queue && not active
        Coord->>Disp: Emit CreatePodCommandEvent
        Disp->>Prod: publish(command)
        Prod-->>Worker: Pod created / worker starts execution
    end
    Worker->>Coord: ExecutionCompleted / Failed / Cancelled
    Coord->>Queue: attempt schedule next
Loading
sequenceDiagram
    participant DI as DependencyInjector
    participant CP as CoordinatorProvider
    participant Disp as EventDispatcher
    participant Coord as ExecutionCoordinator
    participant UC as UnifiedConsumer

    Note right of CP: rgba(200,255,200,0.5)

    DI->>CP: get_coordinator_consumer(...)
    CP->>CP: get_coordinator_dispatcher(...)
    CP-->>Disp: EventDispatcher
    CP->>CP: get_execution_coordinator(prod, disp, ...)
    CP-->>Coord: ExecutionCoordinator
    CP->>UC: construct UnifiedConsumer(coord, disp, ...)
    UC->>UC: start consumer
    UC-->>DI: yield running consumer
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐇 I hopped from pools to queues today,
Events now guide where jobs should play.
No more counting cores or spinning loops—
Just priorities, hops, and tidy hoops.
A joyful nibble for cleaner ways!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 61.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title mentions 'k8s limits enforce stuff' and 'no need for separate overhead', which accurately reflects the main architectural change: removing separate resource management and relying on Kubernetes pod limits instead.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@backend/app/core/providers.py`:
- Around line 743-752: The generator currently starts coordinator.queue_manager
with coordinator.queue_manager.start() then calls consumer.start() but if
consumer.start() raises, the finally block after yield never runs and
queue_manager remains started; wrap the consumer.start() call in a try/except
that on exception calls await coordinator.queue_manager.stop() (and optionally
logs) and then re-raises the exception so the queue manager is cleaned up,
keeping the existing finally block (which calls consumer.stop() and
coordinator.queue_manager.stop()) for normal teardown after yield.

In `@backend/app/services/coordinator/coordinator.py`:
- Around line 60-74: The wrapper methods (_handle_requested_wrapper,
_handle_completed_wrapper, _handle_failed_wrapper, _handle_cancelled_wrapper)
currently use assert for runtime type validation which is stripped under Python
-O; replace each assert isinstance(...) with an explicit isinstance check and
raise a clear exception (e.g., TypeError or ValueError) when the event is not
the expected type (ExecutionRequestedEvent, ExecutionCompletedEvent,
ExecutionFailedEvent, ExecutionCancelledEvent), then proceed to call the
existing handler (_handle_execution_requested, _handle_execution_completed,
_handle_execution_failed, _handle_execution_cancelled).

In `@docs/components/workers/coordinator.md`:
- Line 34: The doc statement in coordinator.md claiming "When resources are
unavailable, executions are requeued with reduced priority" is inaccurate
because the coordinator no longer tracks resources; update that sentence to
reflect the actual requeue trigger (for example: when pod creation fails or the
scheduler rejects a pod) or remove the assertion entirely; specifically edit the
phrase "requeued with reduced priority" in the coordinator documentation to say
something like "requeued with reduced priority when pod creation fails or when
the scheduler rejects the pod" or delete the line if the coordinator no longer
performs requeueing for those cases, and ensure the coordinator is referenced
consistently in the updated text.
🧹 Nitpick comments (1)
backend/app/services/coordinator/coordinator.py (1)

179-186: Consider retry or requeue for transient scheduling failures.

When _publish_execution_started fails, the execution is removed from _active_executions and marked as failed, but it was already popped from the queue. If the failure is transient (e.g., temporary Kafka unavailability), the execution is permanently lost rather than retried.

This fail-fast approach is valid for simplicity, but consider whether transient failures should trigger a requeue with backoff rather than immediate failure, especially for user-initiated executions.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 15 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/core/providers.py">

<violation number="1" location="backend/app/core/providers.py:743">
P2: If consumer.start fails, the queue manager cleanup task stays running because it’s started before any error handling. Ensure queue_manager.stop() is called when consumer startup fails to avoid orphaned tasks.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 16 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/core/providers.py">

<violation number="1" location="backend/app/core/providers.py:743">
P2: Queue manager is started before the `try/finally`, so a failure in `consumer.start(...)` leaves it running without cleanup. Wrap startup in a try/except to stop the queue manager on startup failures.</violation>
</file>

<file name="docs/architecture/kafka-topic-architecture.md">

<violation number="1" location="docs/architecture/kafka-topic-architecture.md:27">
P3: This new queue-based validation conflicts with the following sentence about executions "waiting for resources," which is now outdated and confusing after removing resource management. Update the later sentence to align with queue-based scheduling.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 issues found across 29 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/coordinator/coordinator.py">

<violation number="1" location="backend/app/services/coordinator/coordinator.py:282">
P2: `_find_position` returns the heap array index, not the actual priority-sorted queue position. Since heapq only guarantees that index 0 is the minimum, positions > 0 will be incorrect. Users receive misleading queue position information via `ExecutionAcceptedEvent`.</violation>
</file>

<file name="docs/architecture/execution-queue.md">

<violation number="1" location="docs/architecture/execution-queue.md:32">
P3: The priority-levels snippet points to coordinator.py lines 32–37, which define _QueuedExecution rather than the QueuePriority enum. Update the include to the enum so the docs show the actual priority values.</violation>
</file>

<file name="backend/app/services/execution_service.py">

<violation number="1" location="backend/app/services/execution_service.py:134">
P3: The `priority` parameter is now a `QueuePriority` enum, but the docstring still describes it as a 1–10 integer. This makes the API contract unclear for callers; update the docstring to reflect enum usage or valid values.</violation>
</file>

<file name="docs/architecture/kafka-topic-architecture.md">

<violation number="1" location="docs/architecture/kafka-topic-architecture.md:27">
P3: The new queue-capacity check conflicts with the nearby text that still says executions wait for resources. Update the wording to reflect queue capacity/scheduling rather than resource availability.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/execution_service.py (1)

134-146: Update the docstring to reflect QueuePriority, not a numeric 1–10 range.

The signature now expects QueuePriority, but Line 145 still describes a numeric range. This is misleading for callers.

✏️ Suggested docstring tweak
-            priority: Execution priority (1-10, lower is higher priority).
+            priority: Execution priority (QueuePriority; lower value is higher priority).
🤖 Fix all issues with AI agents
In `@backend/app/core/metrics/coordinator.py`:
- Around line 41-57: Guard against negative inputs by clamping the incoming
count/size to zero before computing deltas: in
update_coordinator_active_executions, replace usage of the raw count with count
= max(0, count) (then compute delta vs _active_current and update
coordinator_active_executions, set _active_current); do the same in
update_execution_request_queue_size (clamp size = max(0, size), compute delta vs
_queue_depth_current, update execution_request_queue_depth, set
_queue_depth_current). This prevents negative deltas from decreasing the
counters below zero while preserving the rest of the logic.

In `@backend/app/domain/events/typed.py`:
- Around line 11-12: BaseEvent's ConfigDict is missing use_enum_values=True
which causes the QueuePriority IntEnum (priority field) to be emitted as an Avro
enum instead of a numeric value; update BaseEvent's model_config to include
use_enum_values=True (mirror EventMetadata's configuration) so the priority
field uses its underlying int value in generated schemas, ensuring schema
compatibility with existing int-based representations.

In `@backend/app/services/coordinator/coordinator.py`:
- Around line 194-196: The code currently records queue wait time twice—once in
_pop_next() using the queue enqueue timestamp and again in _schedule_execution()
using event.timestamp—causing double-counting; pick the enqueue timestamp as the
single source of truth and remove the duplicate metric emission from
_schedule_execution(), ensuring only the dequeue path (_pop_next()) calls
metrics.record_coordinator_queue_time with the enqueue timestamp and the
event.priority.name.
- Around line 228-233: Finding the item by scanning the raw heap array
(self._queue) via _find_position yields wrong queue positions because heap order
is not a full sorted order; instead, after pushing the new _QueuedExecution (via
heapq.heappush), compute the queue position by creating a sorted view of
self._queue (sort by priority then timestamp or use the same comparison used by
_QueuedExecution), then find the index of the entry with event.execution_id and
publish that index+1 (or 0-based as intended) in the ExecutionAcceptedEvent;
apply the same change to the other occurrence around where position is computed
(the block at ~282-287) so both places derive queue_position from the sorted
order rather than walking the raw heap.
- Around line 244-256: Currently _pop_next calls _untrack_user as soon as an
item is dequeued which removes the user from tracking and makes
max_executions_per_user only apply to queued items; stop untracking on dequeue
and instead transition the item from "queued" to "active" by incrementing an
active-execution counter for the user (or mark the user as active) when popping
in _pop_next and only call _untrack_user when the execution actually completes;
update any other place that currently calls _untrack_user on dequeue (the
similar code around the other dequeue block) and ensure the per-user check that
enforces max_executions_per_user considers queued + active counts (use the same
user-tracking data structure used by _track_user/_untrack_user or add an active
map) and update metrics accordingly.

In `@docs/operations/metrics-reference.md`:
- Around line 36-44: The table shows a duplicate metric entry for
execution.queue.depth in both Execution Metrics and Coordinator Metrics; remove
the redundant row (the `execution.queue.depth` UpDownCounter entry) from the
Coordinator Metrics section or, if duplication is intentional, add a one-line
note next to `execution.queue.depth` explaining why it appears in both places;
update the Coordinator table so the duplicate row is deleted or annotated and
ensure the remaining metrics and labels (e.g., `coordinator.queue.wait_time`,
`coordinator.executions.scheduled.total`) keep their original order and
descriptions.
🧹 Nitpick comments (2)
backend/tests/unit/services/coordinator/test_coordinator_queue.py (1)

34-35: Make per-user limit test explicit about user identity.
The test currently uses the default user_id=None, which can blur whether the per-user limit is actually being exercised. Passing an explicit user ID clarifies intent.

♻️ Suggested tweak
-def _ev(execution_id: str, priority: QueuePriority = QueuePriority.NORMAL) -> ExecutionRequestedEvent:
-    return make_execution_requested_event(execution_id=execution_id, priority=priority)
+def _ev(
+    execution_id: str,
+    priority: QueuePriority = QueuePriority.NORMAL,
+    user_id: str | None = None,
+) -> ExecutionRequestedEvent:
+    return make_execution_requested_event(
+        execution_id=execution_id,
+        priority=priority,
+        user_id=user_id,
+    )
-    await coord._add_to_queue(_ev("a", priority=QueuePriority.NORMAL))  # noqa: SLF001
+    await coord._add_to_queue(_ev("a", priority=QueuePriority.NORMAL, user_id="u1"))  # noqa: SLF001
     with pytest.raises(QueueRejectError, match="User execution limit exceeded"):
-        await coord._add_to_queue(_ev("b", priority=QueuePriority.NORMAL))  # noqa: SLF001
+        await coord._add_to_queue(_ev("b", priority=QueuePriority.NORMAL, user_id="u1"))  # noqa: SLF001

Also applies to: 53-58

backend/app/core/metrics/coordinator.py (1)

59-60: Constrain status to a bounded set to prevent high-cardinality metrics.
Free-form status strings can explode cardinality and degrade observability backends. Consider validating against a known set (or using an enum).

🔧 Example pattern (replace with your actual statuses)
 class CoordinatorMetrics(BaseMetrics):
     """Metrics for coordinator and scheduling operations."""
+    _SCHEDULE_STATUSES = {"scheduled", "failed", "cancelled", "skipped"}

     def record_coordinator_execution_scheduled(self, status: str) -> None:
-        self.coordinator_executions_scheduled.add(1, attributes={"status": status})
+        if status not in self._SCHEDULE_STATUSES:
+            status = "unknown"
+        self.coordinator_executions_scheduled.add(1, attributes={"status": status})

@HardMax71 HardMax71 merged commit 6e45c2f into main Jan 29, 2026
23 checks passed
@HardMax71 HardMax71 deleted the fix/coordinator branch January 29, 2026 21:03
@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants