Skip to content

Commit cf630bf

Browse files
authored
refactor: Make the Runtime and DistributedRuntime fields private (#4193)
Signed-off-by: Graham King <[email protected]>
1 parent 0e62314 commit cf630bf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+454
-503
lines changed

lib/bindings/python/rust/engine.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use super::context::{Context, callable_accepts_kwarg};
5-
use dynamo_runtime::logging::get_distributed_tracing_context;
4+
use std::sync::Arc;
5+
6+
use anyhow::{Error, Result};
67
use pyo3::prelude::*;
78
use pyo3::types::{PyDict, PyModule};
89
use pyo3::{PyAny, PyErr};
910
use pyo3_async_runtimes::TaskLocals;
1011
use pythonize::{depythonize, pythonize};
11-
use std::sync::Arc;
12+
pub use serde::{Deserialize, Serialize};
1213
use tokio::sync::mpsc;
1314
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
15+
use tokio_util::sync::CancellationToken;
1416

17+
use dynamo_runtime::logging::get_distributed_tracing_context;
1518
pub use dynamo_runtime::{
16-
CancellationToken, Error, Result,
17-
pipeline::{
18-
AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn,
19-
async_trait,
20-
},
19+
pipeline::{AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn},
2120
protocols::annotated::Annotated,
2221
};
23-
pub use serde::{Deserialize, Serialize};
22+
23+
use super::context::{Context, callable_accepts_kwarg};
2424

2525
/// Add bingings from this crate to the provided module
2626
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
@@ -87,7 +87,7 @@ impl PythonAsyncEngine {
8787
}
8888
}
8989

90-
#[async_trait]
90+
#[async_trait::async_trait]
9191
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error> for PythonAsyncEngine
9292
where
9393
Req: Data + Serialize,
@@ -141,7 +141,7 @@ enum ResponseProcessingError {
141141
OffloadError(String),
142142
}
143143

144-
#[async_trait]
144+
#[async_trait::async_trait]
145145
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error>
146146
for PythonServerStreamingEngine
147147
where

lib/bindings/python/rust/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33

44
use std::sync::Arc;
55

6+
use anyhow::{Error, Result, anyhow as error};
67
use pyo3::prelude::*;
78

89
use crate::{CancellationToken, engine::*, to_pyerr};
910

1011
pub use dynamo_llm::endpoint_type::EndpointType;
1112
pub use dynamo_llm::http::service::{error as http_error, service_v2};
1213
pub use dynamo_runtime::{
13-
Error, Result, error,
1414
pipeline::{AsyncEngine, Data, ManyOut, SingleIn, async_trait},
1515
protocols::annotated::Annotated,
1616
};

lib/bindings/python/rust/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -441,20 +441,20 @@ impl DistributedRuntime {
441441
// Try to get existing runtime first, create new Worker only if needed
442442
// This allows multiple DistributedRuntime instances to share the same tokio runtime
443443
let runtime = rs::Worker::runtime_from_existing()
444-
.or_else(|_| {
444+
.or_else(|_| -> anyhow::Result<rs::Runtime> {
445445
// No existing Worker, create new one
446446
let worker = rs::Worker::from_settings()?;
447447

448448
// Initialize pyo3 bridge (only happens once per process)
449-
INIT.get_or_try_init(|| {
449+
INIT.get_or_try_init(|| -> anyhow::Result<()> {
450450
let primary = worker.tokio_runtime()?;
451451
pyo3_async_runtimes::tokio::init_with_runtime(primary).map_err(|e| {
452-
rs::error!("failed to initialize pyo3 static runtime: {:?}", e)
452+
anyhow::anyhow!("failed to initialize pyo3 static runtime: {:?}", e)
453453
})?;
454-
rs::OK(())
454+
Ok(())
455455
})?;
456456

457-
rs::OK(worker.runtime().clone())
457+
Ok(worker.runtime().clone())
458458
})
459459
.map_err(to_pyerr)?;
460460

lib/llm/src/block_manager/pool.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
pub mod managed;
55
pub use managed::ManagedBlockPool;
66

7+
use anyhow::Result;
78
use derive_builder::Builder;
89
use derive_getters::Dissolve;
910
use serde::{Deserialize, Serialize};
@@ -31,8 +32,6 @@ use tokio::runtime::Handle;
3132
use tokio::sync::oneshot;
3233
use tokio_util::sync::CancellationToken;
3334

34-
use dynamo_runtime::Result;
35-
3635
// Type aliases to reduce complexity across the module
3736
type BlockPoolResult<T> = Result<T, BlockPoolError>;
3837
type AsyncResponse<T> = Result<oneshot::Receiver<T>, BlockPoolError>;

lib/llm/src/kv_router/publisher.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,34 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use crate::kv_router::{
5-
KV_EVENT_SUBJECT, KV_METRICS_SUBJECT,
6-
indexer::{RouterEvent, compute_block_hash_for_seq},
7-
protocols::*,
8-
scoring::LoadEvent,
9-
};
10-
use dynamo_runtime::metrics::{MetricsHierarchy, prometheus_names::kvstats};
11-
use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
12-
use dynamo_runtime::{
13-
Result,
14-
component::{Component, Namespace},
15-
transports::nats::{NatsQueue, QUEUE_NAME, Slug},
16-
};
4+
use std::fmt;
5+
use std::sync::atomic::{AtomicU32, Ordering};
176
use std::sync::{Arc, OnceLock};
18-
use tokio::sync::mpsc;
19-
use tokio_util::sync::CancellationToken;
7+
use std::time::Duration;
208

9+
use anyhow::Result;
2110
use rmp_serde as rmps;
2211
use serde::Deserialize;
2312
use serde::Serialize;
2413
use serde::de::{self, Deserializer, IgnoredAny, MapAccess, SeqAccess, Visitor};
25-
use std::fmt;
26-
use std::sync::atomic::{AtomicU32, Ordering};
27-
use std::time::Duration;
14+
use tokio::sync::mpsc;
15+
use tokio_util::sync::CancellationToken;
2816
use zeromq::{Socket, SocketRecv, SubSocket};
2917

18+
use dynamo_runtime::metrics::{MetricsHierarchy, prometheus_names::kvstats};
19+
use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
20+
use dynamo_runtime::{
21+
component::{Component, Namespace},
22+
transports::nats::{NatsQueue, QUEUE_NAME, Slug},
23+
};
24+
25+
use crate::kv_router::{
26+
KV_EVENT_SUBJECT, KV_METRICS_SUBJECT,
27+
indexer::{RouterEvent, compute_block_hash_for_seq},
28+
protocols::*,
29+
scoring::LoadEvent,
30+
};
31+
3032
// -------------------------------------------------------------------------
3133
// KV Event Publishers -----------------------------------------------------
3234
// -------------------------------------------------------------------------
@@ -1025,7 +1027,7 @@ mod tests_startup_helpers {
10251027
&self,
10261028
event_name: impl AsRef<str> + Send + Sync,
10271029
event: &(impl serde::Serialize + Send + Sync),
1028-
) -> dynamo_runtime::Result<()> {
1030+
) -> anyhow::Result<()> {
10291031
let bytes = rmp_serde::to_vec(event).unwrap();
10301032
self.published
10311033
.lock()
@@ -1038,7 +1040,7 @@ mod tests_startup_helpers {
10381040
&self,
10391041
event_name: impl AsRef<str> + Send + Sync,
10401042
bytes: Vec<u8>,
1041-
) -> dynamo_runtime::Result<()> {
1043+
) -> anyhow::Result<()> {
10421044
self.published
10431045
.lock()
10441046
.unwrap()

lib/llm/src/mocker/engine.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,33 @@
66
//! This module provides an AsyncEngine implementation that wraps the Scheduler
77
//! to provide streaming token generation with realistic timing simulation.
88
9-
use crate::kv_router::publisher::WorkerMetricsPublisher;
10-
use crate::mocker::protocols::DirectRequest;
11-
use crate::mocker::protocols::{MockEngineArgs, OutputSignal, WorkerType};
12-
use crate::mocker::scheduler::Scheduler;
13-
use crate::protocols::TokenIdType;
14-
use crate::protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest};
15-
use dynamo_runtime::DistributedRuntime;
16-
use dynamo_runtime::protocols::annotated::Annotated;
9+
use std::collections::HashMap;
10+
use std::sync::Arc;
11+
use std::time::Duration;
12+
13+
use anyhow::Result;
14+
use futures::StreamExt;
15+
use rand::Rng;
16+
use tokio::sync::{Mutex, OnceCell, mpsc};
17+
use tokio_stream::wrappers::UnboundedReceiverStream;
1718
use tokio_util::sync::CancellationToken;
19+
use uuid::Uuid;
1820

21+
use dynamo_runtime::DistributedRuntime;
22+
use dynamo_runtime::protocols::annotated::Annotated;
1923
use dynamo_runtime::{
20-
Result,
2124
component::Component,
2225
engine::AsyncEngineContextProvider,
2326
pipeline::{AsyncEngine, Error, ManyOut, ResponseStream, SingleIn, async_trait},
2427
traits::DistributedRuntimeProvider,
2528
};
26-
use futures::StreamExt;
27-
use rand::Rng;
28-
use std::collections::HashMap;
29-
use std::sync::Arc;
30-
use std::time::Duration;
31-
use tokio::sync::{Mutex, OnceCell, mpsc};
32-
use tokio_stream::wrappers::UnboundedReceiverStream;
33-
use uuid::Uuid;
29+
30+
use crate::kv_router::publisher::WorkerMetricsPublisher;
31+
use crate::mocker::protocols::DirectRequest;
32+
use crate::mocker::protocols::{MockEngineArgs, OutputSignal, WorkerType};
33+
use crate::mocker::scheduler::Scheduler;
34+
use crate::protocols::TokenIdType;
35+
use crate::protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest};
3436

3537
pub const MOCKER_COMPONENT: &str = "mocker";
3638

lib/llm/tests/http_metrics.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ mod integration_tests {
300300
use dynamo_runtime::DistributedRuntime;
301301
use dynamo_runtime::discovery::DiscoveryQuery;
302302
use dynamo_runtime::pipeline::RouterMode;
303-
use dynamo_runtime::traits::DistributedRuntimeProvider;
304303
use std::sync::Arc;
305304

306305
#[tokio::test]

lib/runtime/examples/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/runtime/examples/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ repository = "https://github.com/ai-dynamo/dynamo.git"
2020

2121
[workspace.dependencies]
2222
# local or crates.io
23+
anyhow = "1"
2324
dynamo-runtime = { path = "../" }
2425
prometheus = { version = "0.14" }

lib/runtime/examples/hello_world/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ homepage.workspace = true
1313
dynamo-runtime = { workspace = true }
1414

1515
# third-party
16+
anyhow = { workspace = true }

0 commit comments

Comments
 (0)