-
Notifications
You must be signed in to change notification settings - Fork 710
refactor: Make the Runtime and DistributedRuntime fields private #4193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis PR migrates error handling from crate-local Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Areas requiring extra attention:
Poem
Pre-merge checks❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
lib/runtime/src/component/endpoint.rs (1)
131-148: Use the fully qualified subject when registering health targetsWe’re registering the health check target under
endpoint_name, but SystemHealth keys (and all downstream lookups inHealthCheckManager) expect the fully qualified subject. Using only the bare endpoint name collapses endpoints across namespaces/components and even multiple instances of the same endpoint, so the second registration silently overwrites the first and the health-check task map treats them as duplicates. Result: only one of the endpoints ever gets monitored and the other loses its health notifier/status updates—a correctness blocker for multi-component deployments.Store the target under the fully qualified subject instead (and fetch the notifier by that same subject).
- guard.register_health_check_target( - &endpoint_name, + guard.register_health_check_target( + &subject, instance, health_check_payload.clone(), ); - if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint_name) { + if let Some(notifier) = guard.get_endpoint_health_check_notifier(&subject) { handler.set_endpoint_health_check_notifier(notifier)?; }lib/runtime/src/runtime.rs (1)
148-230: spawn_blocking never reaches the runtime worker threads
tokio::task::spawn_blockingexecutes closures on the dedicated blocking thread pool, not on the runtime’s worker threads. That means the barrier here synchronizes only those blocking threads, and every call toinitialize_contextruns on the wrong pool. As a result, none of the actual runtime worker threads ever receive the compute thread-local setup, and any logic that depends on those TLS entries will still see them unset when code later runs on the real executor threads. The detection helper has the same flaw—because it probes viaspawn_blocking, it counts blocking threads rather than worker threads—so the barrier can even hang if the blocking pool doesn’t spin upnum_workersthreads immediately. Please rework this to execute on the actual worker threads (e.g., by installing the initializer with the runtime builder’son_thread_starthook or by spawning lightweight async tasks on the primary handle that stay on worker threads). (docs.rs)
🧹 Nitpick comments (1)
lib/llm/src/mocker/engine.rs (1)
113-113: Consider bounded channels to prevent memory issues.The use of
mpsc::unbounded_channelat lines 113, 264, and 274 could lead to unbounded memory growth if producers outpace consumers. For the mock engine, this risk may be low due to controlled token generation rates, but bounded channels with appropriate capacities would provide better backpressure and resource guarantees.Consider replacing unbounded channels with bounded alternatives:
-let (output_tx, mut output_rx) = mpsc::unbounded_channel::<OutputSignal>(); +let (output_tx, mut output_rx) = mpsc::channel::<OutputSignal>(256);-let (request_tx, mut request_rx) = mpsc::unbounded_channel::<OutputSignal>(); +let (request_tx, mut request_rx) = mpsc::channel::<OutputSignal>(128);-let (stream_tx, stream_rx) = mpsc::unbounded_channel::<LLMEngineOutput>(); +let (stream_tx, stream_rx) = mpsc::channel::<LLMEngineOutput>(128);Also applies to: 264-264, 274-274
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (40)
lib/bindings/python/rust/engine.rs(3 hunks)lib/bindings/python/rust/http.rs(1 hunks)lib/bindings/python/rust/lib.rs(1 hunks)lib/llm/src/kv_router/publisher.rs(3 hunks)lib/llm/src/mocker/engine.rs(1 hunks)lib/runtime/src/component.rs(10 hunks)lib/runtime/src/component/client.rs(2 hunks)lib/runtime/src/component/component.rs(2 hunks)lib/runtime/src/component/endpoint.rs(4 hunks)lib/runtime/src/component/namespace.rs(2 hunks)lib/runtime/src/component/registry.rs(1 hunks)lib/runtime/src/config.rs(1 hunks)lib/runtime/src/discovery/mock.rs(1 hunks)lib/runtime/src/discovery/mod.rs(3 hunks)lib/runtime/src/distributed.rs(4 hunks)lib/runtime/src/health_check.rs(14 hunks)lib/runtime/src/lib.rs(2 hunks)lib/runtime/src/pipeline/network/egress/addressed_router.rs(7 hunks)lib/runtime/src/pipeline/network/tcp/client.rs(1 hunks)lib/runtime/src/pipeline/network/tcp/server.rs(1 hunks)lib/runtime/src/pipeline/nodes/sinks/base.rs(1 hunks)lib/runtime/src/pipeline/nodes/sinks/pipeline.rs(1 hunks)lib/runtime/src/pipeline/nodes/sinks/segment.rs(1 hunks)lib/runtime/src/protocols/annotated.rs(1 hunks)lib/runtime/src/runnable.rs(1 hunks)lib/runtime/src/runtime.rs(6 hunks)lib/runtime/src/service.rs(1 hunks)lib/runtime/src/system_status_server.rs(9 hunks)lib/runtime/src/traits.rs(1 hunks)lib/runtime/src/traits/events.rs(1 hunks)lib/runtime/src/transports/etcd.rs(5 hunks)lib/runtime/src/transports/etcd/connector.rs(2 hunks)lib/runtime/src/transports/etcd/lock.rs(1 hunks)lib/runtime/src/transports/nats.rs(1 hunks)lib/runtime/src/transports/zmq.rs(6 hunks)lib/runtime/src/utils/typed_prefix_watcher.rs(1 hunks)lib/runtime/src/worker.rs(8 hunks)lib/runtime/tests/lifecycle.rs(1 hunks)lib/runtime/tests/pipeline.rs(2 hunks)lib/runtime/tests/soak.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (28)
📓 Common learnings
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
📚 Learning: 2025-06-16T20:02:54.935Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1236
File: lib/llm/src/mocker/protocols.rs:85-112
Timestamp: 2025-06-16T20:02:54.935Z
Learning: When using derive_builder::Builder macro, the macro generates the builder struct and its methods, but does NOT generate a `builder()` method on the original struct. A manual `impl StructName { pub fn builder() -> StructNameBuilder { StructNameBuilder::default() } }` is required to provide the convenient `StructName::builder()` API pattern.
Applied to files:
lib/runtime/src/config.rs
📚 Learning: 2025-07-14T21:25:56.930Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from `Send + Sync + Debug` to `Send + Debug` because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.
Applied to files:
lib/runtime/src/traits.rslib/runtime/src/component/client.rslib/runtime/tests/pipeline.rslib/runtime/src/component/namespace.rslib/runtime/src/pipeline/network/egress/addressed_router.rslib/runtime/src/traits/events.rslib/runtime/src/transports/zmq.rslib/runtime/src/system_status_server.rslib/llm/src/mocker/engine.rslib/bindings/python/rust/engine.rs
📚 Learning: 2025-08-18T20:51:51.324Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/pipeline/network/egress/push_router.rs:0-0
Timestamp: 2025-08-18T20:51:51.324Z
Learning: The runtime crate cannot depend on the llm crate due to architectural dependency constraints, preventing imports from lib/llm into lib/runtime.
Applied to files:
lib/runtime/src/traits.rslib/runtime/src/runnable.rslib/runtime/src/component/component.rslib/runtime/src/component/namespace.rslib/runtime/src/distributed.rslib/runtime/src/runtime.rslib/runtime/src/lib.rs
📚 Learning: 2025-06-13T22:07:24.843Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
Applied to files:
lib/runtime/src/runnable.rslib/runtime/src/transports/nats.rslib/runtime/src/pipeline/nodes/sinks/pipeline.rslib/runtime/src/service.rslib/runtime/src/transports/etcd.rslib/runtime/tests/pipeline.rslib/runtime/src/transports/etcd/lock.rslib/runtime/src/pipeline/network/egress/addressed_router.rslib/runtime/src/traits/events.rslib/runtime/src/transports/zmq.rslib/runtime/src/pipeline/nodes/sinks/base.rslib/bindings/python/rust/http.rslib/runtime/src/protocols/annotated.rslib/runtime/src/pipeline/network/tcp/client.rslib/runtime/src/component.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, `PrefixWatcher` uses `#[derive(Dissolve)]` to generate a `dissolve()` method. The pattern `let (_, _watcher, mut events_rx) = prefix_watcher.dissolve();` is the standard and intended usage throughout the codebase. The `mpsc::Receiver<WatchEvent>` maintains the etcd watch stream independently, so the `Watcher` handle can be safely dropped. This pattern is used consistently in critical infrastructure modules like component/client.rs, utils/leader_worker_barrier.rs, and entrypoint/input/http.rs.
Applied to files:
lib/runtime/src/utils/typed_prefix_watcher.rslib/runtime/src/component/client.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, when using `kv_get_and_watch_prefix()` and `dissolve()`, the returned `events_rx` receiver maintains the etcd watch stream independently. The watcher handle can be safely dropped (using `_watcher`) without terminating the stream, as the receiver keeps the connection alive internally. This is a consistent pattern used throughout the codebase in multiple critical modules.
Applied to files:
lib/runtime/src/utils/typed_prefix_watcher.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, when using `kv_get_and_watch_prefix()` and `dissolve()`, the returned `events_rx` receiver maintains the etcd watch stream independently. The watcher handle can be safely dropped (using `_watcher`) without terminating the stream, as the receiver keeps the connection alive internally.
Applied to files:
lib/runtime/src/utils/typed_prefix_watcher.rs
📚 Learning: 2025-09-11T03:24:47.820Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 3004
File: lib/runtime/src/pipeline/network/ingress/push_handler.rs:271-277
Timestamp: 2025-09-11T03:24:47.820Z
Learning: In lib/runtime/src/pipeline/network/ingress/push_handler.rs, the maintainer prefers to keep the existing error comparison logic using format!("{:?}", err) == STREAM_ERR_MSG unchanged until proper error types are implemented, even though it has technical debt. Avoid suggesting changes to working legacy code that will be refactored later.
Applied to files:
lib/runtime/src/pipeline/nodes/sinks/pipeline.rslib/runtime/src/transports/etcd.rslib/runtime/tests/pipeline.rslib/runtime/src/pipeline/network/tcp/server.rslib/runtime/src/pipeline/network/egress/addressed_router.rslib/runtime/src/transports/zmq.rslib/runtime/src/pipeline/nodes/sinks/base.rslib/runtime/src/transports/etcd/connector.rslib/runtime/src/pipeline/nodes/sinks/segment.rslib/runtime/src/pipeline/network/tcp/client.rs
📚 Learning: 2025-07-16T12:41:12.543Z
Learnt from: grahamking
Repo: ai-dynamo/dynamo PR: 1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.
Applied to files:
lib/runtime/src/component/client.rslib/runtime/src/health_check.rslib/runtime/src/component/registry.rslib/runtime/src/transports/etcd/lock.rslib/runtime/src/worker.rslib/runtime/src/distributed.rslib/bindings/python/rust/lib.rslib/runtime/src/system_status_server.rslib/runtime/src/runtime.rslib/runtime/src/component.rslib/runtime/src/lib.rs
📚 Learning: 2025-11-05T08:41:06.483Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 4070
File: lib/discovery/src/systems/etcd/peer.rs:151-188
Timestamp: 2025-11-05T08:41:06.483Z
Learning: In lib/discovery/src/systems/etcd/peer.rs, the register_instance method intentionally captures the lease_id before entering the OperationExecutor closure. If the lease is revoked or fails, the operation should hard-fail rather than retry with a new lease, because the system does not track which entries were registered under which lease. Retrying with a fresh lease would create inconsistent state.
Applied to files:
lib/runtime/src/component/client.rslib/runtime/src/component/endpoint.rslib/runtime/src/transports/etcd/connector.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.
Applied to files:
lib/runtime/src/component/client.rslib/runtime/src/pipeline/network/egress/addressed_router.rslib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-05-29T06:20:12.901Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.
Applied to files:
lib/runtime/src/component/client.rslib/runtime/src/transports/zmq.rslib/runtime/src/worker.rs
📚 Learning: 2025-09-21T01:40:52.456Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3155
File: components/backends/vllm/src/dynamo/vllm/main.py:228-233
Timestamp: 2025-09-21T01:40:52.456Z
Learning: In the dynamo codebase, error handling for distributed runtime client initialization (like runtime.namespace().component().endpoint().client()) is handled at the Rust level in the distributed runtime bindings, so Python-level try/catch blocks are not needed and would be redundant.
Applied to files:
lib/runtime/tests/soak.rslib/bindings/python/rust/http.rslib/runtime/src/distributed.rslib/runtime/src/component.rslib/runtime/src/lib.rs
📚 Learning: 2025-07-01T13:55:03.940Z
Learnt from: nnshah1
Repo: ai-dynamo/dynamo PR: 1444
File: tests/fault_tolerance/utils/metrics.py:30-32
Timestamp: 2025-07-01T13:55:03.940Z
Learning: The `dynamo_worker()` decorator in the dynamo codebase returns a wrapper that automatically injects the `runtime` parameter before calling the wrapped function. This means callers only need to provide the non-runtime parameters, while the decorator handles injecting the runtime argument automatically. For example, a function with signature `async def get_metrics(runtime, log_dir)` decorated with `dynamo_worker()` can be called as `get_metrics(log_dir)` because the decorator wrapper injects the runtime parameter.
Applied to files:
lib/runtime/tests/soak.rs
📚 Learning: 2025-09-02T16:46:54.015Z
Learnt from: GuanLuo
Repo: ai-dynamo/dynamo PR: 2714
File: lib/llm/src/discovery/model_entry.rs:38-42
Timestamp: 2025-09-02T16:46:54.015Z
Learning: In lib/llm/src/discovery/model_entry.rs, GuanLuo prefers not to add serde defaults for model_type and model_input fields to keep the specification explicit and avoid user errors, relying on atomic deployment strategy to avoid backward compatibility issues.
Applied to files:
lib/runtime/src/discovery/mod.rslib/runtime/src/discovery/mock.rs
📚 Learning: 2025-06-13T22:32:05.022Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::client::RequestErrorKind::NoResponders, not async_nats::Error::NoResponders. Use err.downcast_ref::<async_nats::client::RequestError>() and then check request_err.kind() against RequestErrorKind::NoResponders.
Applied to files:
lib/runtime/src/pipeline/network/egress/addressed_router.rs
📚 Learning: 2025-08-27T17:56:14.690Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 2500
File: lib/llm/src/migration.rs:58-77
Timestamp: 2025-08-27T17:56:14.690Z
Learning: In lib/llm/src/migration.rs, the cancellation visibility in the Migration operator is intentionally one-way - it checks engine_ctx.is_stopped()/is_killed() to stop pulling from streams but doesn't link newly created streams as child contexts to the parent. This is a conscious architectural decision with plans for future enhancement.
Applied to files:
lib/runtime/src/pipeline/network/egress/addressed_router.rs
📚 Learning: 2025-06-02T19:37:27.666Z
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.
Applied to files:
lib/runtime/src/traits/events.rslib/llm/src/kv_router/publisher.rslib/runtime/src/worker.rs
📚 Learning: 2025-05-29T00:02:35.018Z
Learnt from: alec-flowers
Repo: ai-dynamo/dynamo PR: 1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions `create_stored_blocks` and `create_stored_block_from_parts` are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane suggested using tokio::select! arm ordering with the existing biased directive in the indexer to create a natural barrier for dump requests, ensuring KV events are drained before snapshotting. This approach leverages existing architecture (biased select) to solve race conditions with minimal code changes, which aligns with their preference for contained solutions.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-10-14T00:58:05.744Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3597
File: lib/llm/src/kv_router/indexer.rs:437-441
Timestamp: 2025-10-14T00:58:05.744Z
Learning: In lib/llm/src/kv_router/indexer.rs, when a KvCacheEventData::Cleared event is received, the system intentionally clears all dp_ranks for the given worker_id by calling clear_all_blocks(worker.worker_id). This is the desired behavior and should not be scoped to individual dp_ranks.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-05-30T06:38:09.630Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1285
File: lib/llm/src/kv_router/scoring.rs:58-63
Timestamp: 2025-05-30T06:38:09.630Z
Learning: In lib/llm/src/kv_router/scoring.rs, the user prefers to keep the panic behavior when calculating load_avg and variance with empty endpoints rather than adding guards for division by zero. They want the code to fail fast on this error condition.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-05-30T06:34:12.785Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1285
File: lib/llm/src/kv_router/scheduler.rs:260-266
Timestamp: 2025-05-30T06:34:12.785Z
Learning: In the KV router scheduler code, PeaBrane prefers fail-fast behavior over silent failure handling. When accessing worker metrics data that could be out-of-bounds (like dp_rank indexing), explicit panics are preferred over graceful degradation with continue statements to ensure data integrity issues are caught early.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-06-05T01:02:15.318Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-08-21T17:23:02.836Z
Learnt from: michaelfeil
Repo: ai-dynamo/dynamo PR: 2591
File: lib/bindings/python/rust/http.rs:0-0
Timestamp: 2025-08-21T17:23:02.836Z
Learning: In lib/bindings/python/rust/http.rs, the enable_endpoint method uses EndpointType::all() to dynamically support all available endpoint types with case-insensitive matching, which is more maintainable than hardcoded match statements for endpoint type mappings.
Applied to files:
lib/runtime/src/component/endpoint.rslib/bindings/python/rust/http.rslib/runtime/src/system_status_server.rs
📚 Learning: 2025-08-25T23:24:42.076Z
Learnt from: tzulingk
Repo: ai-dynamo/dynamo PR: 2666
File: components/backends/trtllm/src/dynamo/trtllm/publisher.py:0-0
Timestamp: 2025-08-25T23:24:42.076Z
Learning: WorkerMetricsPublisher.create_endpoint method signature has been updated in _core.pyi to include metrics_labels parameter: `def create_endpoint(self, component: str, metrics_labels: Optional[List[Tuple[str, str]]] = None)`, making the metrics_labels parameter optional with default value of None.
Applied to files:
lib/runtime/src/component/endpoint.rs
📚 Learning: 2025-06-17T00:50:44.845Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1236
File: lib/llm/src/mocker/engine.rs:140-161
Timestamp: 2025-06-17T00:50:44.845Z
Learning: In Rust async code, when an Arc<Mutex<_>> is used solely to transfer ownership of a resource (like a channel receiver) into a spawned task rather than for sharing between multiple tasks, holding the mutex lock across an await is not problematic since there's no actual contention.
Applied to files:
lib/runtime/src/worker.rslib/runtime/src/lib.rs
🧬 Code graph analysis (20)
lib/runtime/src/component/client.rs (3)
lib/runtime/src/component.rs (3)
component(510-512)component(668-674)endpoint(270-278)lib/runtime/src/storage/key_value_store.rs (1)
etcd(261-263)lib/runtime/src/runtime.rs (1)
secondary(270-272)
lib/runtime/src/transports/etcd.rs (3)
lib/runtime/src/distributed.rs (1)
runtime(265-267)lib/runtime/src/worker.rs (1)
runtime(95-97)lib/runtime/src/utils/leader_worker_barrier.rs (2)
sync(148-175)sync(242-263)
lib/runtime/src/component/component.rs (3)
lib/bindings/python/src/dynamo/_core.pyi (4)
Context(235-320)component(88-92)Component(104-131)DistributedRuntime(35-65)lib/bindings/python/rust/lib.rs (1)
component(824-830)lib/runtime/src/component.rs (2)
component(510-512)component(668-674)
lib/runtime/src/health_check.rs (2)
lib/runtime/src/distributed.rs (2)
system_health(280-282)new(93-252)lib/runtime/src/system_health.rs (2)
get_health_check_target(218-221)new(61-88)
lib/runtime/src/discovery/mod.rs (1)
lib/runtime/src/component.rs (2)
component(510-512)component(668-674)
lib/runtime/src/component/registry.rs (1)
lib/runtime/src/component.rs (2)
component(510-512)component(668-674)
lib/runtime/src/pipeline/network/tcp/server.rs (1)
lib/bindings/python/rust/lib.rs (1)
local_ip(685-693)
lib/runtime/src/component/namespace.rs (2)
lib/bindings/python/rust/lib.rs (1)
component(824-830)lib/runtime/src/component.rs (2)
component(510-512)component(668-674)
lib/runtime/src/pipeline/network/egress/addressed_router.rs (2)
lib/runtime/src/logging.rs (2)
get_distributed_tracing_context(694-706)inject_otel_context_into_nats_headers(395-412)lib/runtime/src/protocols/maybe_error.rs (2)
err(11-11)err(37-39)
lib/llm/src/kv_router/publisher.rs (1)
lib/llm/src/kv_router/indexer.rs (1)
compute_block_hash_for_seq(122-134)
lib/runtime/src/component/endpoint.rs (2)
lib/runtime/src/component.rs (4)
endpoint(270-278)component(510-512)component(668-674)service_name(244-247)lib/runtime/src/distributed.rs (1)
system_health(280-282)
lib/runtime/src/worker.rs (1)
lib/runtime/src/runtime.rs (4)
new(50-84)from_settings(244-250)from_handle(236-240)primary(265-267)
lib/runtime/src/distributed.rs (4)
lib/runtime/src/component.rs (2)
component(510-512)component(668-674)lib/runtime/src/storage/key_value_store.rs (1)
etcd(261-263)lib/runtime/src/worker.rs (1)
runtime(95-97)lib/bindings/python/tests/conftest.py (1)
runtime(407-441)
lib/bindings/python/rust/lib.rs (2)
lib/runtime/src/runtime.rs (2)
from_settings(244-250)primary(265-267)lib/runtime/src/worker.rs (2)
from_settings(57-60)tokio(149-149)
lib/runtime/src/system_status_server.rs (2)
lib/runtime/src/system_health.rs (2)
live_path(275-277)uptime(258-260)lib/runtime/src/distributed.rs (1)
system_health(280-282)
lib/llm/src/mocker/engine.rs (3)
lib/llm/src/mocker/scheduler.rs (5)
tokio(263-263)mpsc(259-259)mpsc(639-639)mpsc(760-760)mpsc(852-852)lib/llm/src/kv_router/publisher.rs (3)
mpsc(107-107)mpsc(1072-1072)mpsc(1096-1096)components/src/dynamo/sglang/protocol.py (1)
PreprocessedRequest(42-48)
lib/bindings/python/rust/engine.rs (2)
lib/runtime/src/logging.rs (1)
get_distributed_tracing_context(694-706)lib/bindings/python/rust/context.rs (1)
callable_accepts_kwarg(115-126)
lib/runtime/src/runtime.rs (3)
lib/runtime/src/worker.rs (3)
runtime(95-97)from_current(199-206)from_settings(57-60)lib/runtime/src/distributed.rs (4)
runtime(265-267)new(93-252)from_settings(254-257)from_settings(365-371)lib/runtime/src/compute/pool.rs (2)
new(99-108)new(294-301)
lib/runtime/src/component.rs (2)
lib/bindings/python/rust/lib.rs (5)
client(794-808)new(431-483)new(1121-1125)component(824-830)namespace(500-505)lib/runtime/src/distributed.rs (3)
new(93-252)runtime(265-267)namespace(294-296)
lib/runtime/src/lib.rs (3)
lib/bindings/python/src/dynamo/_core.pyi (1)
DistributedRuntime(35-65)lib/runtime/src/distributed.rs (1)
runtime(265-267)lib/runtime/src/worker.rs (1)
runtime(95-97)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: vllm (arm64)
- GitHub Check: sglang (arm64)
- GitHub Check: operator (amd64)
- GitHub Check: operator (arm64)
- GitHub Check: tests (launch/dynamo-run)
- GitHub Check: tests (lib/runtime/examples)
- GitHub Check: tests (.)
- GitHub Check: tests (lib/bindings/python)
- GitHub Check: Build and Test - dynamo
🔇 Additional comments (55)
lib/runtime/src/transports/nats.rs (1)
925-966: Metrics accessor usage looks greatSwitching to
drt.metrics()keeps the NATS Prometheus bridge aligned with the new privateDistributedRuntimefields while preserving existing gauge wiring. Thanks for threading the accessor through without touching call sites elsewhere.lib/runtime/src/utils/typed_prefix_watcher.rs (1)
9-204: Consistent anyhow migrationAdopting
anyhow::Resulthere keeps the prefix watcher aligned with the crate-wide error strategy and avoids juggling crate-local aliases. Looks good.lib/runtime/src/pipeline/network/tcp/client.rs (1)
64-194:anyhowcontext integration checks outThe new
Contextusage on the connection info conversion and theanyhowwrappers on handshake send/join failures keep diagnostics sharp while matching the private-field refactor. No issues spotted.lib/runtime/src/service.rs (1)
140-200: Service metrics wiring remains solid
ServiceClientandComponentNatsServerPrometheusMetricsnow lean on accessor methods (nats_client,component.metrics(), etc.), so the private field change doesn’t leak through. Implementation stays tidy.lib/runtime/src/runnable.rs (1)
14-26: Runnable re-export looks goodRe-exporting
anyhow::{Error, Result}keeps the runnable API uniform after the error-type consolidation, so callers continue to import from this module without churn.lib/runtime/src/protocols/annotated.rs (1)
4-6: LGTM!The import reorganization is clean and makes dependencies explicit, aligning with the broader migration to
anyhow-based error handling.lib/runtime/src/pipeline/network/tcp/server.rs (1)
42-59: LGTM! Good dependency injection pattern.The
IpResolvertrait andDefaultIpResolverimplementation provide excellent testability while maintaining clean separation of concerns. The tests demonstrate this works well for both success and fallback scenarios.lib/runtime/src/pipeline/network/egress/addressed_router.rs (2)
87-87: LGTM!The migration to explicit
#[async_trait::async_trait]macro path is consistent with the broader PR pattern and maintains the same functionality.
140-145: LGTM!The logging migration from
logtotracingis complete and consistent throughout the file. All log levels (trace, debug, warn) have been properly updated.Also applies to: 157-158, 185-186, 223-223, 235-235, 239-239
lib/runtime/src/config.rs (1)
4-4: LGTM!The migration from crate-local
Resulttoanyhow::Resultis clean and consistent with the broader error handling refactor across the runtime.Also applies to: 306-306, 385-385
lib/runtime/src/pipeline/nodes/sinks/pipeline.rs (1)
5-5: LGTM!The migration from
crate::Errortoanyhow::Errorin theServiceBackendimplementation is consistent with the broader error handling refactor.Also applies to: 18-18, 26-26
lib/bindings/python/rust/http.rs (1)
6-6: LGTM!The migration from re-exported
dynamo_runtimeerror types to localanyhowimports aligns with the broader refactor to make Runtime fields private and consolidate error handling.lib/runtime/src/discovery/mock.rs (1)
8-8: LGTM!The migration to
anyhow::Resultin the mock discovery implementation is clean and maintains all existing functionality while aligning with the broader error handling refactor.Also applies to: 131-131, 143-143, 152-152
lib/runtime/src/transports/etcd/lock.rs (1)
10-10: LGTM: Error type migration to anyhow.The migration from
crate::Resulttoanyhow::Resultis consistent with the PR's objective to standardize error handling across the runtime.lib/runtime/src/traits.rs (1)
17-21: LGTM: Field access replaced with accessor method.The change from direct field access (
&self.runtime) to the accessor method (self.runtime()) aligns with the PR's objective to make the Runtime field private.lib/runtime/src/component/component.rs (3)
4-12: LGTM: Import additions for event publishing functionality.The added imports support event publishing/subscribing with proper error handling via anyhow and serialization via serde. The DistributedRuntimeProvider import enables access to the distributed runtime context.
14-41: LGTM: EventPublisher implementation uses proper error handling.The EventPublisher implementation correctly uses
anyhow::Resultand accesses NATS client through theDistributedRuntimeProvidertrait, consistent with the PR's accessor pattern changes.
43-70: LGTM: EventSubscriber implementation with proper deserialization.The subscriber implementation correctly handles deserialization errors with context and follows the same accessor pattern as the publisher.
lib/runtime/tests/pipeline.rs (3)
6-6: LGTM: Added anyhow::Error for test error handling.The import supports error handling in test scaffolding, consistent with the PR's migration to anyhow-based error handling.
12-23: LGTM: More explicit imports with acknowledged TODO.Replacing the wildcard import with explicit imports improves code clarity. The remaining star import on line 22 is marked with a TODO for future cleanup.
55-55: LGTM: Updated to explicit async_trait path.The change from
#[async_trait]to#[async_trait::async_trait]uses an explicit path, improving clarity and consistency with other modules in the PR.lib/runtime/src/traits/events.rs (3)
6-8: LGTM: Updated imports for anyhow-based error handling.The imports have been updated to use
anyhow::Resultinstead ofcrate::Result, consistent with the PR-wide migration to standardized error handling.
18-46: LGTM: EventPublisher trait migrated to anyhow::Result.The trait methods now return
anyhow::Result<()>instead ofcrate::Result<()>, aligning with the error handling standardization across the runtime.
48-67: LGTM: EventSubscriber trait updated consistently.The trait methods properly use
anyhow::Resultfor error handling. Based on learnings, the codebase uses async-nats version 0.40, and the return typeResult<async_nats::Subscriber>is correct.lib/runtime/src/pipeline/nodes/sinks/segment.rs (2)
5-5: LGTM: Error type migration to anyhow.The import has been updated to use
anyhow::Errorinstead ofcrate::Error, consistent with the PR's standardization of error handling.
28-39: LGTM: Sink implementation updated with anyhow::Error.The
on_datamethod now returnsResult<(), Error>whereErrorisanyhow::Error, consistent with the error type migration across the runtime.lib/runtime/src/discovery/mod.rs (3)
4-14: LGTM: Updated imports for standardized error handling.The imports have been updated to use
anyhow::Resultand explicitly importTransportType, consistent with the PR's migration to anyhow-based error handling.
76-94: LGTM: Public API migrated to anyhow::Result.The
from_model_cardmethod now returnsanyhow::Result<Self>instead ofcrate::Result<Self>, maintaining the same functionality while using standardized error handling.
154-167: LGTM: Consistent error handling in deserialization.The method properly uses
anyhow::Result<T>for the return type andanyhow::bail!for error cases, consistent with the PR's error handling migration.lib/runtime/src/component/namespace.rs (4)
4-15: LGTM: Import additions for event publishing and distributed runtime access.The added imports support event publishing/subscribing functionality with proper error handling via anyhow, serialization via serde, and distributed runtime access via
DistributedRuntimeProvider.
17-44: LGTM: EventPublisher implementation follows established pattern.The Namespace EventPublisher implementation is consistent with the Component implementation, using
anyhow::Resultand accessing NATS client through theDistributedRuntimeProvidertrait.
46-73: LGTM: EventSubscriber implementation with proper error context.The subscriber implementation correctly handles deserialization and follows the same accessor pattern, consistent with other implementations in the PR.
103-145: LGTM: Test scaffolding updated for integration tests.The test imports have been updated to include
DistributedRuntimeandRuntime, enabling proper integration testing of the namespace event publishing functionality.lib/runtime/src/pipeline/nodes/sinks/base.rs (1)
5-5: LGTM! Clean migration to anyhow error handling.The error type migration from
crate::Errortoanyhow::Erroris straightforward and consistent with the broader refactor across the codebase.Also applies to: 17-17
lib/runtime/src/component/client.rs (1)
214-214: LGTM! Correctly uses the new runtime accessor method.The change from direct field access to
endpoint.component.drt.runtime().secondary()properly uses the new accessor pattern introduced by this PR to encapsulate the Runtime field.lib/runtime/tests/lifecycle.rs (1)
4-6: LGTM! Clean test migration to anyhow.The import refactor aligns with the codebase-wide migration to
anyhow::Result. The test logic remains unchanged.lib/runtime/src/component/registry.rs (1)
20-26: LGTM! Well-structured constructor pattern.The new
Registry::new()constructor properly initializes the thread-safe shared state withArc<Mutex<RegistryInner>>, following standard Rust patterns for shared mutable state.lib/runtime/src/transports/etcd.rs (1)
144-144: LGTM! Consistent anyhow error handling migration.The migration from custom error macros to
anyhow::bail!andanyhow::anyhow!is clean and preserves the original error messages while standardizing on the anyhow error handling pattern used throughout the codebase.Also applies to: 185-193, 375-375
lib/runtime/tests/soak.rs (1)
20-30: LGTM! Clean integration test migration to anyhow.The import refactor successfully migrates the soak test from
dynamo_runtime::Resulttoanyhow::Result, aligning with the broader error handling standardization across the codebase.lib/runtime/src/transports/etcd/connector.rs (1)
79-79: LGTM! Consistent error handling in reconnection logic.The migration to
anyhow::bail!maintains the original error semantics for deadline-exceeded scenarios while aligning with the codebase-wide anyhow adoption.lib/bindings/python/rust/lib.rs (1)
437-452: LGTM! Proper error handling and accessor usage in Python bindings.The changes improve the fallback initialization path:
- Explicitly typed
anyhow::Result<rs::Runtime>for clearer error propagation- INIT closure returns
Result<()>for proper error handling- Line 450 correctly uses
worker.runtime()accessor method instead of direct field access, aligning with the PR's objective of making Runtime fields privatelib/runtime/src/component.rs (4)
383-400: LGTM: Correct usage of component_registry() accessor.The migration from direct field access to the
component_registry()accessor method aligns with the PR's objective to encapsulate DistributedRuntime's internal state.
302-302: LGTM: Consistent migration to anyhow::Result.The return type updates across
scrape_stats,start_scraping_nats_service_component_metrics,stats_stream, andclientprovide a unified error-handling surface usinganyhow::Result.Also applies to: 320-320, 373-373, 590-590
655-665: LGTM: Appropriate signature and visibility changes.The updated
Namespace::newsignature takingDistributedRuntimeby value (which is then wrapped inArc) is correct. The visibility change topub(crate)is also appropriate, as external callers should useDistributedRuntime::namespace()instead.
668-674: LGTM: Consistent error handling in namespace methods.Both
component()andnamespace()methods now returnanyhow::Result, providing consistent error propagation for builder failures.Also applies to: 677-684
lib/bindings/python/rust/engine.rs (2)
4-23: LGTM: Import reorganization aligns with error handling migration.The import restructuring removes re-exported types from
dynamo_runtimein favor of directanyhowimports and explicittokio_util::sync::CancellationTokenusage, improving clarity and consistency.
90-90: LGTM: Consistent async_trait macro path.The updated macro path
#[async_trait::async_trait]is consistent with changes across the codebase and improves explicitness.Also applies to: 144-144
lib/llm/src/mocker/engine.rs (2)
131-150: LGTM: Detached task spawning follows codebase patterns.The spawned background tasks use
CancellationTokenfor lifecycle management rather than storingJoinHandles. This is an intentional design pattern in the codebase for long-running services. Based on learnings.Also applies to: 194-215, 280-340, 362-398
226-345: LGTM: Well-structured generate implementation.The
generatemethod properly validatesdp_rank, handles prefill worker specifics (max_tokens override anddisaggregated_params), and implements cancellation handling viaasync_context.stopped().lib/runtime/src/distributed.rs (4)
38-70: LGTM: Private fields enforce encapsulation.The
DistributedRuntimestruct now has private fields with public accessor methods, achieving the PR's goal of hiding implementation details and enabling future refactoring without breaking external APIs.
273-282: LGTM: Public accessors with acknowledged technical debt.The
component_registry()andsystem_health()accessors provide controlled access to internal state. The TODO comments appropriately flag that returning raw pointers/locks isn't ideal, signaling intent for future API improvements.
93-252: LGTM: Comprehensive async constructor.The new
async fn new()constructor properly initializes all DistributedRuntime components including health tracking, system status server, discovery client, and metrics, consolidating previously scattered initialization logic.
300-300: LGTM: Consistent error handling improvements.Line 300 uses
anyhow::bail!for consistent error handling, and line 313 adds a type annotation to help the compiler infer the closure's error type. Both changes align with the PR's error handling migration.Also applies to: 313-313
lib/runtime/src/worker.rs (2)
33-33: LGTM: Comprehensive migration to anyhow-based error handling.The Worker API has been consistently updated to use
anyhow::Resultacross all public methods (from_settings,from_config,runtime_from_existing,tokio_runtime,from_current,execute,execute_async,execute_internal, andsignal_handler). Error construction now usesanyhow::anyhow!throughout.Also applies to: 57-60, 63-78, 80-88, 90-93, 99-108, 110-120, 124-128, 199-206, 210-239
146-146: LGTM: Correct usage of primary_token() accessor.Using
runtime.primary_token()instead of direct field access aligns with the PR's encapsulation objectives and matches the accessor pattern introduced forDistributedRuntime.
683df18 to
b7c91c9
Compare
rmccorm4
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice cleanup 🧹
LGTM but one comment on possible missed spot in KVBM code in case it wasn't compiled by default or something: https://github.com/ai-dynamo/dynamo/pull/4193/files#r2505832304
Thanks for the review! It said:
so I was thinking darn this is going to be tricky to find a reviewer for :-) |
b7c91c9 to
2846050
Compare
2846050 to
83b2a38
Compare
They were public seemingly by accident, because they were declared in `lib.rs` which allowed anything in the crate to read/write them directly. Lots of places did. Now access to private fields is via the accessor methods. Later we should hide more of the implementation details of DistributedRuntime. Also tidy up some weird imports, for example re-exporting `anyhow` types as our own in runtime. Signed-off-by: Graham King <[email protected]>
Signed-off-by: Graham King <[email protected]>
Signed-off-by: Graham King <[email protected]>
83b2a38 to
d14334f
Compare
Signed-off-by: Graham King <[email protected]> Signed-off-by: Daiyaan <[email protected]>
They were public seemingly by accident, because they were declared in
lib.rswhich allowed anything in the crate to read/write them directly. Lots of places did.Now access to private fields is via the accessor methods. Later we should hide more of the implementation details of
DistributedRuntime.
Also tidy up some weird imports, for example re-exporting
anyhowtypes as our own in runtime.Summary by CodeRabbit
Release Notes
New Features
Bug Fixes & Improvements