From 86f6d6bbe173df1bd8ef2b0ea5451e55277ea8ba Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Fri, 10 Oct 2025 09:43:40 +0530 Subject: [PATCH 01/15] Add support uccl-backend for NIXL Signed-off-by: Pravein Govindan Kannan --- meson_options.txt | 1 + src/api/python/_api.py | 8 + src/plugins/meson.build | 5 + src/plugins/uccl/README.md | 38 ++ src/plugins/uccl/meson.build | 48 ++ src/plugins/uccl/uccl_backend.cpp | 679 ++++++++++++++++++++++++++ src/plugins/uccl/uccl_backend.h | 164 +++++++ src/plugins/uccl/uccl_plugin.cpp | 94 ++++ test/gtest/plugin_manager.cpp | 11 +- test/gtest/plugins/meson.build | 2 + test/gtest/plugins/uccl/meson.build | 67 +++ test/gtest/plugins/uccl/uccl_test.cpp | 430 ++++++++++++++++ 12 files changed, 1538 insertions(+), 9 deletions(-) create mode 100644 src/plugins/uccl/README.md create mode 100644 src/plugins/uccl/meson.build create mode 100644 src/plugins/uccl/uccl_backend.cpp create mode 100644 src/plugins/uccl/uccl_backend.h create mode 100644 src/plugins/uccl/uccl_plugin.cpp create mode 100644 test/gtest/plugins/uccl/meson.build create mode 100644 test/gtest/plugins/uccl/uccl_test.cpp diff --git a/meson_options.txt b/meson_options.txt index a316184f8..64b22164e 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -19,6 +19,7 @@ option('etcd_inc_path', type: 'string', value: '', description: 'Path to ETCD He option('etcd_lib_path', type: 'string', value: '', description: 'Path to ETCD Libraries') option('disable_gds_backend', type : 'boolean', value : false, description : 'disable gds backend') option('disable_mooncake_backend', type : 'boolean', value : false, description : 'disable mooncake backend') +option('disable_uccl_backend', type : 'boolean', value : false, description : 'disable uccl backend') option('install_headers', type : 'boolean', value : true, description : 'install headers') option('gds_path', type: 'string', value: '/usr/local/cuda/', description: 'Path to GDS CuFile install') option('cudapath_inc', type: 'string', value: '', description: 'Include path for CUDA') diff --git a/src/api/python/_api.py b/src/api/python/_api.py index 8923cbf1b..c37acdc3b 100644 --- a/src/api/python/_api.py +++ b/src/api/python/_api.py @@ -126,6 +126,7 @@ def __del__(self): @param listen_port Specify the port for the listener thread to listen on. @param capture_telemetry Whether to enable telemetry capture. @param num_threads Specify number of threads for the supported multi-threaded backends. +@param device_idx Specify the GPU index to be initialized for the backend. @param backends List of backend names for agent to initialize. Default is UCX, other backends can be added to the list, or after agent creation, can be initialized with create_backend. @@ -140,6 +141,7 @@ def __init__( listen_port: int = 0, capture_telemetry: bool = False, num_threads: int = 0, + device_idx: int = 0, backends: list[str] = ["UCX"], ): # TODO: add backend init parameters @@ -149,6 +151,7 @@ def __init__( self.port = listen_port self.capture_telemetry = capture_telemetry self.num_threads = num_threads + self.device_idx = device_idx """ @@ -232,6 +235,11 @@ def __init__( init["num_threads"] = str(nixl_conf.num_threads) elif bknd == "GDS_MT": init["thread_count"] = str(nixl_conf.num_threads) + elif bknd == "Uccl": + init["num_cpus"] = str(nixl_conf.num_cpus) + if nixl_conf.device_idx > 0: + if bknd == "Uccl": + init["device_idx"] = str(nixl_conf.device_idx) self.create_backend(bknd, init) self.nixl_mems = { diff --git a/src/plugins/meson.build b/src/plugins/meson.build index 9521b7f12..fe030c8b7 100644 --- a/src/plugins/meson.build +++ b/src/plugins/meson.build @@ -42,6 +42,11 @@ if libtransfer_engine.found() and not get_option('disable_mooncake_backend') subdir('mooncake') endif +libuccl_engine = cc.find_library('uccl_engine', required: false) +if libuccl_engine.found() and not get_option('disable_uccl_backend') + subdir('uccl') +endif + hf3fs_lib_path = '/usr/lib/' hf3fs_lib_file = 'hf3fs_api_shared' hf3fs_lib_found = cc.find_library(hf3fs_lib_file, dirs: [hf3fs_lib_path], required: false) diff --git a/src/plugins/uccl/README.md b/src/plugins/uccl/README.md new file mode 100644 index 000000000..74e585f4d --- /dev/null +++ b/src/plugins/uccl/README.md @@ -0,0 +1,38 @@ +## UCCL Backend Plugin [Preview] + +[UCCL](https://github.com/uccl-project/uccl) is an efficient communication library to perform GPU memory transfers, with a focus on flexibility (evolving ML workloads) and portability (heteregenous GPUs). +UCCL supports collectives, p2p communication and gpu-driven communication for expert parallelism. + +## Capabilities + +Currently, the UCCL backend supports internode communication. Intranode communication will be added soon. + +## Installation Guide + +1. Install UCCL's p2p engine manually. You can refer to the [installation guide here](https://https://github.com/uccl-project/uccl). + + ```cpp + git clone https://github.com/uccl-project/uccl.git + cd uccl/p2p + make -j + sudo make install + ``` + +2. Build NIXL using regular method as in [README](https://github.com/ai-dynamo/nixl/blob/main/README.md). + +## Usage Guide + +### Additional Parameters +1. `device_idx` : Specifies which GPU the UCCL engine will be affined to. +Example Usage to create a NIXL agent with uccl engine on GPU 0: + ```python + config = nixl_agent_config(device_idx=0, backends=["Uccl"]) + agent = nixl_agent("agent-name", config) + ``` +UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance + +### Environment Variables +1. `UCCL_IB_GID_INDEX` : GID Index of the device to be used. Usually, its auto-detected. +2. `UCCL_SOCKET_IFNAME` : The ethernet interface to be used for control socket communication. +3. `UCCL_IB_HCA` : HCAs to be used for UCCL connection. +4. `UCCL_RCMODE` : Set to either 0 or 1. To enable RDMA RC (Reliable Connection), set to 1. For `NIXL_READ` operations, set `UCCL_RCMODE` to 1. \ No newline at end of file diff --git a/src/plugins/uccl/meson.build b/src/plugins/uccl/meson.build new file mode 100644 index 000000000..358540b8f --- /dev/null +++ b/src/plugins/uccl/meson.build @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cpp = meson.get_compiler('cpp') +uccl_lib = cpp.find_library('uccl_engine') + +compile_flags = [] + + +if 'Uccl' in static_plugins + uccl_backend_lib = static_library('Uccl', + 'uccl_backend.cpp', 'uccl_backend.h', 'uccl_plugin.cpp', + dependencies: [nixl_infra, serdes_interface, cuda_dep, uccl_lib, nixl_common_dep], + include_directories: nixl_inc_dirs, + install: false, + cpp_args : compile_flags, + name_prefix: 'libplugin_') # Custom prefix for plugin libraries +else + uccl_backend_lib = shared_library('Uccl', + 'uccl_backend.cpp', 'uccl_backend.h', 'uccl_plugin.cpp', + dependencies: [nixl_infra, serdes_interface, cuda_dep, uccl_lib, nixl_common_dep], + include_directories: [nixl_inc_dirs, utils_inc_dirs], + install: true, + cpp_args : compile_flags + ['-fPIC'], + name_prefix: 'libplugin_', # Custom prefix for plugin libraries + install_dir: plugin_install_dir) + + if get_option('buildtype') == 'debug' + run_command('sh', '-c', + 'echo "Uccl=' + uccl_backend_lib.full_path() + '" >> ' + plugin_build_dir + '/pluginlist', + check: true + ) + endif +endif + +uccl_backend_interface = declare_dependency(link_with: uccl_backend_lib) diff --git a/src/plugins/uccl/uccl_backend.cpp b/src/plugins/uccl/uccl_backend.cpp new file mode 100644 index 000000000..70b92bcda --- /dev/null +++ b/src/plugins/uccl/uccl_backend.cpp @@ -0,0 +1,679 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "uccl_backend.h" +#include +#include +#include +#include +#include +#include +#include +#include + +// Parse connection string in format: ip_addr:port?gpu_index +bool +parseConnectionString(const std::string &conn_str, char *&ip_addr, int &port, int &gpu_index) { + // Exit with errror if neither : or ? is found in conn_str + size_t colon_pos = conn_str.find(':'); + if (colon_pos == std::string::npos) { + NIXL_ERROR << "Invalid connection string format: missing colon separator"; + return false; + } + size_t question_pos = conn_str.find('?', colon_pos); + if (question_pos == std::string::npos) { + NIXL_ERROR << "Invalid connection string format: missing question mark separator"; + return false; + } + + std::string ip_str = conn_str.substr(0, colon_pos); + ip_addr = new char[ip_str.length() + 1]; + strcpy(ip_addr, ip_str.c_str()); + + std::string port_str = conn_str.substr(colon_pos + 1, question_pos - colon_pos - 1); + try { + port = std::stoi(port_str); + } + catch (const std::exception &e) { + NIXL_ERROR << "Invalid port number: " << port_str; + delete[] ip_addr; + return false; + } + + std::string gpu_str = conn_str.substr(question_pos + 1); + try { + gpu_index = std::stoi(gpu_str); + } + catch (const std::exception &e) { + NIXL_ERROR << "Invalid GPU index: " << gpu_str; + delete[] ip_addr; + return false; + } + + return true; +} + +int +getNixlParam(const nixl_b_params_t *custom_params, const std::string &key, int default_value) { + if (!custom_params) { + return default_value; + } + + auto it = custom_params->find(key); + if (it == custom_params->end()) { + return default_value; + } + + try { + return std::stoi(it->second); + } + catch (const std::exception &) { + return default_value; + } +} + +nixlUcclEngine::nixlUcclEngine(const nixlBackendInitParams *init_params) + : nixlBackendEngine(init_params) { + local_agent_name_ = init_params->localAgent; + nixl_b_params_t *custom_params = init_params->customParams; + + size_t dev_idx = getNixlParam(custom_params, "device_idx", 0); + size_t num_cpus = getNixlParam(custom_params, "num_cpus", 4); + int in_python = getNixlParam(custom_params, "in_python", 1); + NIXL_DEBUG << "Creating UCCL Engine for dev:" << dev_idx << " num_cpus:" << num_cpus; + engine_ = uccl_engine_create(dev_idx, num_cpus, (in_python == 1)); + NIXL_DEBUG << "UCCL engine created"; + + listener_thread_ = std::thread(&nixlUcclEngine::startListener, this); +} + +nixlUcclEngine::~nixlUcclEngine() { + { + std::lock_guard lock(mutex_); + for (auto &[addr, priv] : mem_reg_info_) { + if (priv && priv->mr_id != 0) { + uccl_mr_t *mr = reinterpret_cast(priv->mr_id); + if (mr) { + uccl_engine_mr_destroy(mr); + NIXL_DEBUG << "Deregistered memory during cleanup: " << addr + << " mr_id: " << priv->mr_id; + } + } + delete priv; + } + mem_reg_info_.clear(); + } + std::set destroyed_agents; + for (auto &[agent_name, conn_id] : connected_agents_) { + if (destroyed_agents.find(agent_name) == destroyed_agents.end()) { + uccl_conn_t *conn = reinterpret_cast(conn_id); + if (conn) { + NIXL_DEBUG << "Disconnecting from agent: " << agent_name; + uccl_engine_conn_destroy(conn); + destroyed_agents.insert(agent_name); + } + } + } + + connected_agents_.clear(); + if (engine_) { + // Add a small delay to allow UCCL internal cleanup to complete + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + uccl_engine_destroy(engine_); + engine_ = nullptr; + } + + if (listener_thread_.joinable()) { + listener_thread_.detach(); + } +} + +void +nixlUcclEngine::startListener() { + NIXL_DEBUG << "Accepting UCCL connections"; + while (true) { + char ip_buf[256]; + int remote_gpu_idx; + uccl_conn_t *conn = uccl_engine_accept(engine_, ip_buf, sizeof(ip_buf), &remote_gpu_idx); + if (!conn) { + NIXL_ERROR << "Failed to accept connection from remote agent"; + continue; + } + uccl_engine_start_listener(conn); + NIXL_DEBUG << "Connected to remote agent: " << ip_buf; + connected_agents_[ip_buf] = reinterpret_cast(conn); + } +} + +nixl_mem_list_t +nixlUcclEngine::getSupportedMems() const { + nixl_mem_list_t mems; + mems.push_back(DRAM_SEG); + mems.push_back(VRAM_SEG); + return mems; +} + +nixl_status_t +nixlUcclEngine::getPublicData(const nixlBackendMD *meta, std::string &str) const { + nixlUcclBackendMD *priv = (nixlUcclBackendMD *)meta; + NIXL_DEBUG << "Getting Public data for : " << priv->addr; + str = std::to_string(priv->mr_id); + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::getConnInfo(std::string &str) const { + if (!engine_) { + return NIXL_ERR_BACKEND; + } + + char *metadata = nullptr; + int result = uccl_engine_get_metadata(engine_, &metadata); + if (result != 0 || !metadata) { + return NIXL_ERR_BACKEND; + } + + str = std::string(metadata); + delete[] metadata; + NIXL_DEBUG << "UCCL engine metadata: " << str; + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::loadRemoteConnInfo(const std::string &remote_agent, + const std::string &remote_conn_info) { + // Parse remote_conn_info and establish connection using UCCL engine + NIXL_DEBUG << "UCCL engine remote_agent: " << remote_agent + << " loadRemoteConnInfo: " << remote_conn_info; + std::lock_guard lock(mutex_); + + char *ip_addr = nullptr; + int port = 0; + int gpu_index = 0; + + if (!parseConnectionString(remote_conn_info, ip_addr, port, gpu_index)) { + return NIXL_ERR_BACKEND; + } + + // Simple role coordination: agent with smaller name acts as client + is_client_ = local_agent_name_ < remote_agent; + uccl_conn_t *conn = nullptr; + + NIXL_DEBUG << "Acting as CLIENT, connecting to " << ip_addr << ":" << port + << "?gpu=" << gpu_index << std::endl; + conn = uccl_engine_connect(engine_, ip_addr, gpu_index, port); + if (!conn) { + NIXL_ERROR << "Failed to connect to remote agent " << remote_agent; + delete[] ip_addr; + return NIXL_ERR_BACKEND; + } + + NIXL_DEBUG << "Successfully connected to remote agent " << remote_agent; + // Start the listener thread for notifications + uccl_engine_start_listener(conn); + + connected_agents_[remote_agent] = reinterpret_cast(conn); + + delete[] ip_addr; + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::connect(const std::string &remote_agent) { + // Unused + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::disconnect(const std::string &remote_agent) { + auto conn_iter = connected_agents_.find(remote_agent); + if (conn_iter == connected_agents_.end()) { + NIXL_ERROR << "No connection found for remote agent: " << remote_agent; + return NIXL_ERR_BACKEND; + } + uccl_conn_t *conn = reinterpret_cast(conn_iter->second); + if (!conn) { + NIXL_ERROR << "Invalid connection for remote agent: " << remote_agent; + return NIXL_ERR_BACKEND; + } + + if (conn) { + NIXL_DEBUG << "Disconnecting from agent: " << remote_agent; + uccl_engine_conn_destroy(conn); + connected_agents_.erase(remote_agent); + } + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::registerMem(const nixlBlobDesc &mem, + const nixl_mem_t &nixl_mem, + nixlBackendMD *&out) { + std::lock_guard lock(mutex_); + + if (mem_reg_info_.count(mem.addr)) { + auto priv = mem_reg_info_[mem.addr]; + NIXL_DEBUG << "Registering memory: " << mem.addr << ", len: " << mem.len; + priv->ref_cnt++; + out = priv; + return NIXL_SUCCESS; + } + + // Register memory with UCCL engine + uccl_mr_t *mr = uccl_engine_reg(engine_, mem.addr, mem.len); + if (!mr) { + NIXL_ERROR << "Failed to register memory with UCCL engine"; + return NIXL_ERR_BACKEND; + } + + auto priv = new nixlUcclBackendMD(true); + priv->addr = (void *)mem.addr; + priv->length = mem.len; + priv->ref_cnt = 1; + priv->mr_id = reinterpret_cast(mr); // Store the memory region handle + out = priv; + mem_reg_info_[mem.addr] = priv; + NIXL_DEBUG << "Registering memory: " << mem.addr << "Device: " << mem.devId + << " ref_cnt: " << priv->ref_cnt << " mr_id: " << priv->mr_id; + + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::deregisterMem(nixlBackendMD *meta) { + std::lock_guard lock(mutex_); + auto priv = static_cast(meta); + priv->ref_cnt--; + if (priv->ref_cnt > 0) return NIXL_SUCCESS; + + // Deregister memory from UCCL engine + if (priv->mr_id != 0) { + uccl_mr_t *mr = reinterpret_cast(priv->mr_id); + if (mr) { + uccl_engine_mr_destroy(mr); + NIXL_DEBUG << "Deregistered memory: " << priv->addr << " mr_id: " << priv->mr_id; + } + priv->mr_id = 0; + } + + mem_reg_info_.erase((uint64_t)priv->addr); + delete priv; + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::loadLocalMD(nixlBackendMD *input, nixlBackendMD *&output) { + nixlUcclBackendMD *input_md = (nixlUcclBackendMD *)input; + NIXL_DEBUG << "UCCL Load Local MD: " << input_md->addr << "Meta Info:" << input_md->mr_id; + + nixlUcclBackendMD *output_md = (nixlUcclBackendMD *)output; + output_md->addr = (void *)input_md->addr; + output_md->length = input_md->length; + output_md->ref_cnt = 1; + output_md->mr_id = reinterpret_cast(input_md->mr_id); + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::loadRemoteMD(const nixlBlobDesc &input, + const nixl_mem_t &nixl_mem, + const std::string &remote_agent, + nixlBackendMD *&output) { + NIXL_DEBUG << "UCCL Load Remote MD: " << input.addr << "Meta Info:" << input.metaInfo + << " remote_agent: " << remote_agent; + + output = new nixlUcclBackendMD(true); + nixlUcclBackendMD *output_md = static_cast(output); + output_md->addr = (void *)input.addr; + output_md->length = input.len; + output_md->ref_cnt = 1; + output_md->mr_id = strtoul(input.metaInfo.c_str(), NULL, 10); + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::unloadMD(nixlBackendMD *input) { + nixlUcclBackendMD *md = (nixlUcclBackendMD *)input; + delete md; + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::prepXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args) const { + int result = 0; + nixlUcclBackendMD *lmd; + nixlUcclBackendMD *rmd; + handle = nullptr; + NIXL_DEBUG << "UCCL PrepXfer: " << operation << " remote_agent: " << remote_agent; + // Get the connection for this remote agent + auto conn_iter = connected_agents_.find(remote_agent); + if (conn_iter == connected_agents_.end()) { + NIXL_ERROR << "No connection found for remote agent: " << remote_agent; + return NIXL_ERR_BACKEND; + } + uccl_conn_t *conn = reinterpret_cast(conn_iter->second); + if (!conn) { + NIXL_ERROR << "Invalid connection for remote agent: " << remote_agent; + return NIXL_ERR_BACKEND; + } + + size_t lcnt = local.descCount(); + size_t rcnt = remote.descCount(); + + if (lcnt != rcnt) { + NIXL_ERROR << "Local and remote descriptor counts don't match: " << lcnt << " != " << rcnt; + return NIXL_ERR_INVALID_PARAM; + } + + // Collect all tx_data into vectors for batch sending + std::vector md_vector; + std::vector local_priv_vector; + + for (size_t i = 0; i < lcnt; i++) { + lmd = (nixlUcclBackendMD *)local[i].metadataP; + rmd = (nixlUcclBackendMD *)remote[i].metadataP; + size_t rsize = remote[i].len; + + NIXL_DEBUG << "lmd: " << lmd->addr << ", " << lmd->mr_id << " rmd: " << rmd->addr << ", " + << rmd->mr_id; + + // Validate the local address is registered + auto local_mem_iter = mem_reg_info_.find((uint64_t)lmd->addr); + if (local_mem_iter == mem_reg_info_.end()) { + NIXL_ERROR << "Local memory not registered for address: " << lmd->addr; + return NIXL_ERR_BACKEND; + } + + auto local_priv = local_mem_iter->second; + if (local_priv->mr_id == 0) { + NIXL_ERROR << "Local memory region not properly registered"; + return NIXL_ERR_BACKEND; + } + + // Prepare the memory region metadata for batch sending + md_t md; + tx_msg_t tx_data; + tx_data.data_ptr = (uint64_t)rmd->addr; + tx_data.data_size = rsize; + + switch (operation) { + case NIXL_READ: + md.op = UCCL_READ; + break; + case NIXL_WRITE: + md.op = UCCL_WRITE; + break; + } + md.data.tx_data = tx_data; + + // Add to vectors for batch processing + md_vector.push_back(md); + local_priv_vector.push_back(local_priv); + } + + // Send all tx_data as a vector + result = uccl_engine_send_tx_md_vector(conn, md_vector.data(), md_vector.size()); + if (result < 0) { + NIXL_ERROR << "Failed to send transfer metadata vector"; + return NIXL_ERR_BACKEND; + } + + // Get FIFO items one by one for READ operations + if (operation == NIXL_READ) { + for (size_t i = 0; i < local_priv_vector.size(); i++) { + char fifo_item[FIFO_ITEM_SIZE]; + int retry_count = 0; + const int max_retries = 5; + do { + result = uccl_engine_get_fifo_item(conn, i, &fifo_item); + if (result == 0) { + // Successfully got fifo_item + NIXL_DEBUG << "Got the FIFO item to perform read operation for item " << i; + memcpy(local_priv_vector[i]->fifo_item_data, fifo_item, FIFO_ITEM_SIZE); + break; + } + retry_count++; + if (retry_count < max_retries) { + NIXL_DEBUG << "Failed to get FIFO item, retry " << retry_count << "/" + << max_retries << " for item " << i; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } while (retry_count < max_retries); + + if (result != 0) { + NIXL_ERROR << "Failed to get FIFO item after " << max_retries + << " retries for item " << i; + return NIXL_ERR_BACKEND; + } + } + } + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::postXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args) const { + nixlUcclReqH *uccl_handle; + nixlUcclBackendMD *lmd; + nixlUcclBackendMD *rmd; + + NIXL_DEBUG << "UCCL PostXfer: " << operation << " remote_agent: " << remote_agent; + + // Get the connection for this remote agent + auto conn_iter = connected_agents_.find(remote_agent); + if (conn_iter == connected_agents_.end()) { + NIXL_ERROR << "No connection found for remote agent: " << remote_agent; + return NIXL_ERR_BACKEND; + } + + uccl_conn_t *conn = reinterpret_cast(conn_iter->second); + if (!conn) { + NIXL_ERROR << "Invalid connection for remote agent: " << remote_agent; + return NIXL_ERR_BACKEND; + } + + size_t lcnt = local.descCount(); + size_t rcnt = remote.descCount(); + + if (lcnt != rcnt) { + NIXL_ERROR << "Local and remote descriptor counts don't match: " << lcnt << " != " << rcnt; + return NIXL_ERR_INVALID_PARAM; + } + + // Process each descriptor pair + for (size_t i = 0; i < lcnt; i++) { + lmd = (nixlUcclBackendMD *)local[i].metadataP; + rmd = (nixlUcclBackendMD *)remote[i].metadataP; + size_t lsize = local[i].len; + size_t rsize = remote[i].len; + + NIXL_DEBUG << "lmd: " << lmd->addr << ", " << lmd->mr_id << " rmd: " << rmd->addr << ", " + << rmd->mr_id; + + if (lsize != rsize) { + NIXL_ERROR << "Local and remote sizes don't match: " << lsize << " != " << rsize; + return NIXL_ERR_INVALID_PARAM; + } + + // Validate the local address is registered + auto local_mem_iter = mem_reg_info_.find((uint64_t)lmd->addr); + if (local_mem_iter == mem_reg_info_.end()) { + NIXL_ERROR << "Local memory not registered for address: " << lmd->addr; + return NIXL_ERR_BACKEND; + } + + auto local_priv = local_mem_iter->second; + if (local_priv->mr_id == 0) { + NIXL_ERROR << "Local memory region not properly registered"; + return NIXL_ERR_BACKEND; + } + + uccl_mr_t *local_mr = reinterpret_cast(local_priv->mr_id); + + int result = 0; + uint64_t transfer_id = 0; + switch (operation) { + case NIXL_READ: { + NIXL_DEBUG << "Performing READ operation: receiving " << lsize << " bytes"; + result = uccl_engine_read( + conn, local_mr, lmd->addr, lsize, local_priv->fifo_item_data, &transfer_id); + break; + } + case NIXL_WRITE: + NIXL_DEBUG << "Performing WRITE operation: sending " << lsize << " bytes"; + result = uccl_engine_write(conn, local_mr, lmd->addr, lsize, &transfer_id); + break; + + default: + NIXL_ERROR << "Unsupported operation type: " << operation; + return NIXL_ERR_INVALID_PARAM; + } + + if (result != 0) { + NIXL_ERROR << "UCCL operation failed with result: " << result; + return NIXL_ERR_BACKEND; + } + + if (!handle) { + handle = new nixlUcclReqH(conn); + } + uccl_handle = static_cast(handle); + uccl_handle->transfer_ids.push_back(transfer_id); + + NIXL_DEBUG << "Successfully posted " << (operation == NIXL_READ ? "READ" : "WRITE") + << " operation: " << lsize << " bytes with transfer_id: " << transfer_id; + } + if (opt_args && opt_args->hasNotif) { + uccl_handle->notif_msg = opt_args->notifMsg; + } + + return NIXL_IN_PROG; +} + +nixl_status_t +nixlUcclEngine::checkXfer(nixlBackendReqH *handle) const { + // TODO: Check transfer status if async + if (!handle) { + NIXL_ERROR << "Invalid handle provided to checkXfer"; + return NIXL_ERR_INVALID_PARAM; + } + + // Cast to our custom handle type + nixlUcclReqH *uccl_handle = dynamic_cast(handle); + if (!uccl_handle) { + NIXL_ERROR << "Invalid handle type for UCCL backend"; + return NIXL_ERR_INVALID_PARAM; + } + + uccl_conn_t *conn = uccl_handle->conn; + if (!conn) { + NIXL_ERROR << "No connection found in handle"; + return NIXL_ERR_BACKEND; + } + + bool all_done = true; + for (uint64_t transfer_id : uccl_handle->transfer_ids) { + if (std::find(uccl_handle->completed_transfer_ids.begin(), + uccl_handle->completed_transfer_ids.end(), + transfer_id) != uccl_handle->completed_transfer_ids.end()) { + continue; + } + + int is_done = uccl_engine_xfer_status(conn, transfer_id); + if (is_done) { + uccl_handle->completed_transfer_ids.push_back(transfer_id); + } else { + all_done = false; + break; + } + } + if (all_done && !uccl_handle->notif_msg.empty()) { + notify_msg_t notify_msg = {}; + strncpy(notify_msg.name, local_agent_name_.c_str(), sizeof(notify_msg.name) - 1); + strncpy(notify_msg.msg, uccl_handle->notif_msg.c_str(), sizeof(notify_msg.msg) - 1); + uccl_engine_send_notif(conn, ¬ify_msg); + NIXL_DEBUG << "All transfers in handle completed, sent notification: " + << uccl_handle->notif_msg; + } + NIXL_DEBUG << "Transfer status: " << (all_done ? "COMPLETED" : "IN_PROGRESS"); + return (all_done) ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlUcclEngine::releaseReqH(nixlBackendReqH *handle) const { + // TODO: Release any resources associated with the transfer handle + if (!handle) { + return NIXL_SUCCESS; // Nothing to release + } + + // Cast to our custom handle type and delete it + nixlUcclReqH *uccl_handle = dynamic_cast(handle); + if (uccl_handle) { + delete uccl_handle; + } + + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::getNotifs(notif_list_t ¬if_list) { + if (notif_list.size() != 0) return NIXL_ERR_INVALID_PARAM; + + std::vector notify_msgs = uccl_engine_get_notifs(); + for (size_t i = 0; i < notify_msgs.size(); i++) { + notif_list.push_back(std::make_pair(notify_msgs[i].name, notify_msgs[i].msg)); + } + + return NIXL_SUCCESS; +} + +nixl_status_t +nixlUcclEngine::genNotif(const std::string &remote_agent, const std::string &msg) const { + NIXL_DEBUG << "UCCL Gen Notify: " << remote_agent << " msg: " << msg; + + // Get the connection for this remote agent + auto conn_iter = connected_agents_.find(remote_agent); + if (conn_iter == connected_agents_.end()) { + NIXL_ERROR << "No connection found for remote agent: " << remote_agent; + return NIXL_ERR_BACKEND; + } + + uccl_conn_t *conn = reinterpret_cast(conn_iter->second); + if (!conn) { + NIXL_ERROR << "Invalid connection for remote agent: " << remote_agent; + return NIXL_ERR_BACKEND; + } + + notify_msg_t notify_msg; + memset(¬ify_msg, 0, sizeof(notify_msg)); + strncpy(notify_msg.name, local_agent_name_.c_str(), sizeof(notify_msg.name) - 1); + strncpy(notify_msg.msg, msg.c_str(), sizeof(notify_msg.msg) - 1); + int result = uccl_engine_send_notif(conn, ¬ify_msg); + if (result < 0) { + NIXL_ERROR << "Failed to send notify message"; + return NIXL_ERR_BACKEND; + } + return NIXL_SUCCESS; +} diff --git a/src/plugins/uccl/uccl_backend.h b/src/plugins/uccl/uccl_backend.h new file mode 100644 index 000000000..34635975f --- /dev/null +++ b/src/plugins/uccl/uccl_backend.h @@ -0,0 +1,164 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __UCCL_BACKEND_H +#define __UCCL_BACKEND_H + +#include +#include +#include +#include +#include +#include +#include + +#include "nixl.h" +#include +#include "backend/backend_engine.h" +#include "common/nixl_log.h" + +#include "uccl_engine.h" + +#define FIFO_ITEM_SIZE 64 + +class nixlUcclBackendMD; +class nixlUcclReqH; + +class nixlUcclEngine : public nixlBackendEngine { +public: + nixlUcclEngine(const nixlBackendInitParams *init_params); + ~nixlUcclEngine(); + + bool + supportsRemote() const { + return true; + } + + bool + supportsLocal() const { + return false; + } + + bool + supportsNotif() const { + return true; + } + + bool + supportsProgTh() const { + return false; + } + + void + startListener(); + + nixl_mem_list_t + getSupportedMems() const; + + nixl_status_t + getPublicData(const nixlBackendMD *meta, std::string &str) const; + nixl_status_t + getConnInfo(std::string &str) const; + nixl_status_t + loadRemoteConnInfo(const std::string &remote_agent, const std::string &remote_conn_info); + + nixl_status_t + connect(const std::string &remote_agent) override; + nixl_status_t + disconnect(const std::string &remote_agent); + + nixl_status_t + registerMem(const nixlBlobDesc &mem, const nixl_mem_t &nixl_mem, nixlBackendMD *&out); + nixl_status_t + deregisterMem(nixlBackendMD *meta); + + nixl_status_t + loadLocalMD(nixlBackendMD *input, nixlBackendMD *&output); + + nixl_status_t + loadRemoteMD(const nixlBlobDesc &input, + const nixl_mem_t &nixl_mem, + const std::string &remote_agent, + nixlBackendMD *&output); + + nixl_status_t + unloadMD(nixlBackendMD *input); + + nixl_status_t + prepXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args = nullptr) const; + + nixl_status_t + postXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args = nullptr) const; + + nixl_status_t + checkXfer(nixlBackendReqH *handle) const; + nixl_status_t + releaseReqH(nixlBackendReqH *handle) const; + + nixl_status_t + getNotifs(notif_list_t ¬if_list); + nixl_status_t + genNotif(const std::string &remote_agent, const std::string &msg) const override; + +private: + mutable std::mutex mutex_; + uccl_engine_t *engine_; + bool is_client_; + std::string local_agent_name_; + std::unordered_map mem_reg_info_; + std::unordered_map connected_agents_; // agent name -> conn_id + std::thread listener_thread_; +}; + +// Minimal metadata struct for UCCL memory registration +class nixlUcclBackendMD : public nixlBackendMD { +public: + nixlUcclBackendMD(bool isPrivate) : nixlBackendMD(isPrivate) {} + + virtual ~nixlUcclBackendMD() {} + + void *addr; + size_t length; + int ref_cnt; + uint64_t mr_id; // UCCL memory region id + char fifo_item_data[FIFO_ITEM_SIZE]; +}; + +// Custom request handle for UCCL transfers +class nixlUcclReqH : public nixlBackendReqH { +public: + nixlUcclReqH(uccl_conn_t *conn) : conn(conn) {} + + virtual ~nixlUcclReqH() {} + + uccl_conn_t *conn; + std::vector transfer_ids; + std::vector + completed_transfer_ids; // Track completed transfers to avoid double polling + nixl_blob_t notif_msg; +}; + +#endif \ No newline at end of file diff --git a/src/plugins/uccl/uccl_plugin.cpp b/src/plugins/uccl/uccl_plugin.cpp new file mode 100644 index 000000000..b7d3e50bc --- /dev/null +++ b/src/plugins/uccl/uccl_plugin.cpp @@ -0,0 +1,94 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "backend/backend_plugin.h" +#include "uccl_backend.h" + +// Plugin version information +static const char *PLUGIN_NAME = "Uccl"; +static const char *PLUGIN_VERSION = "0.0.1"; + +// Function to create a new UCCL backend engine instance +static nixlBackendEngine * +create_uccl_engine(const nixlBackendInitParams *init_params) { + return new nixlUcclEngine(init_params); +} + +static void +destroy_uccl_engine(nixlBackendEngine *engine) { + delete engine; +} + +// Function to get the plugin name +static const char * +get_plugin_name() { + return PLUGIN_NAME; +} + +// Function to get the plugin version +static const char * +get_plugin_version() { + return PLUGIN_VERSION; +} + +// Function to get backend options +static nixl_b_params_t +get_backend_options() { + nixl_b_params_t params; + params["device_idx"] = ""; + return params; +} + +// Function to get supported backend mem types +static nixl_mem_list_t +get_backend_mems() { + nixl_mem_list_t mems; + mems.push_back(DRAM_SEG); + mems.push_back(VRAM_SEG); + return mems; +} + +// Static plugin structure +static nixlBackendPlugin plugin = {NIXL_PLUGIN_API_VERSION, + create_uccl_engine, + destroy_uccl_engine, + get_plugin_name, + get_plugin_version, + get_backend_options, + get_backend_mems}; + +#ifdef STATIC_PLUGIN_UCCL + +nixlBackendPlugin * +createStaticUcclPlugin() { + return &plugin; // Return the static plugin instance +} + +#else + +// Plugin initialization function +extern "C" NIXL_PLUGIN_EXPORT nixlBackendPlugin * +nixl_plugin_init() { + return &plugin; +} + +// Plugin cleanup function +extern "C" NIXL_PLUGIN_EXPORT void +nixl_plugin_fini() { + // Cleanup any resources if needed +} +#endif \ No newline at end of file diff --git a/test/gtest/plugin_manager.cpp b/test/gtest/plugin_manager.cpp index 8423154c5..6159a8310 100644 --- a/test/gtest/plugin_manager.cpp +++ b/test/gtest/plugin_manager.cpp @@ -194,7 +194,6 @@ INSTANTIATE_TEST_SUITE_P(GdsLoadPluginInstantiation, INSTANTIATE_TEST_SUITE_P(UcxMoLoadPluginInstantiation, LoadSinglePluginTestFixture, testing::Values(ucx_mo_plugin_desc)); - /* Load multiple plugins tests instantiations. */ INSTANTIATE_TEST_SUITE_P(UcxGdsLoadMultiplePluginInstantiation, LoadMultiplePluginsTestFixture, @@ -207,14 +206,8 @@ INSTANTIATE_TEST_SUITE_P(UcxUcxMoLoadMultiplePluginInstantiation, ucx_mo_plugin_desc})); INSTANTIATE_TEST_SUITE_P(GdsUcxMoLoadMultiplePluginInstantiation, LoadMultiplePluginsTestFixture, - testing::Values(std::vector{ - gds_plugin_desc, - ucx_mo_plugin_desc})); -INSTANTIATE_TEST_SUITE_P(UcxGdsUcxMoLoadMultiplePluginInstantiation, - LoadMultiplePluginsTestFixture, - testing::Values(std::vector{ - ucx_plugin_desc, gds_plugin_desc, - ucx_mo_plugin_desc})); + testing::Values(std::vector{gds_plugin_desc, + ucx_mo_plugin_desc})); } // namespace plugin_manager } // namespace gtest diff --git a/test/gtest/plugins/meson.build b/test/gtest/plugins/meson.build index e9e11b4c4..2ada71e3b 100644 --- a/test/gtest/plugins/meson.build +++ b/test/gtest/plugins/meson.build @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +subdir('uccl') + aws_s3 = dependency('aws-cpp-sdk-s3', static: false, required: false) if not aws_s3.found() message('aws-cpp-sdk-s3 not found, skipping plugins_gtest build') diff --git a/test/gtest/plugins/uccl/meson.build b/test/gtest/plugins/uccl/meson.build new file mode 100644 index 000000000..fd26d48ec --- /dev/null +++ b/test/gtest/plugins/uccl/meson.build @@ -0,0 +1,67 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cpp = meson.get_compiler('cpp') + +uccl_engine_lib = cpp.find_library('uccl_engine', dirs: ['/usr/local/lib'], required: false) +uccl_rdma_lib = cpp.find_library('rdma_plugin', dirs: ['/usr/local/lib'], required: false) + +if not uccl_engine_lib.found() or not uccl_rdma_lib.found() + message('UCCL libraries not found, skipping uccl_gtest build') + subdir_done() +endif + +python_dep = dependency('python3', required: false) +pybind11_dep = dependency('pybind11', required: false) + +if not python_dep.found() or not pybind11_dep.found() + message('Python dependencies not found, skipping uccl_gtest build') + subdir_done() +endif + +cpp_flags = [] +cpp_flags += '-DBUILD_DIR="' + meson.project_build_root() + '"' + +if cuda_dep.found() + cuda_dependencies = [cuda_dep] + cpp_flags += '-DHAVE_CUDA' +else + cuda_dependencies = [] + cpp_flags += '-UHAVE_CUDA' +endif + +uccl_test_exe = executable('uccl_gtest', + sources : ['uccl_test.cpp', '../../main.cpp', '../../common.cpp'], + include_directories: [nixl_inc_dirs, utils_inc_dirs, gtest_inc_dirs, '.'], + cpp_args : cpp_flags, + dependencies : [ + nixl_dep, + nixl_infra, + nixl_common_deps, + thread_dep, + python_dep, + pybind11_dep, + uccl_rdma_lib, + uccl_engine_lib, + gtest_dep, + gmock_dep, + absl_strings_dep, + absl_time_dep + ] + cuda_dependencies, + link_with: [nixl_build_lib], + install : true +) + +test('uccl_gtest', uccl_test_exe) diff --git a/test/gtest/plugins/uccl/uccl_test.cpp b/test/gtest/plugins/uccl/uccl_test.cpp new file mode 100644 index 000000000..c3d8f3041 --- /dev/null +++ b/test/gtest/plugins/uccl/uccl_test.cpp @@ -0,0 +1,430 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include "common.h" +#include "nixl.h" + +namespace gtest { +namespace nixl { + constexpr const char *uccl_backend_name = "Uccl"; + + static nixlBackendH * + createUcclBackend(nixlAgent &agent) { + std::vector plugins; + nixl_status_t status = agent.getAvailPlugins(plugins); + EXPECT_EQ(status, NIXL_SUCCESS); + auto it = std::find(plugins.begin(), plugins.end(), uccl_backend_name); + EXPECT_NE(it, plugins.end()) << "UCCL plugin not found"; + + nixl_b_params_t params; + nixl_mem_list_t mems; + status = agent.getPluginParams(*it, mems, params); + EXPECT_EQ(NIXL_SUCCESS, status); + params["in_python"] = "0"; + nixlBackendH *backend_handle = nullptr; + status = agent.createBackend(*it, params, backend_handle); + EXPECT_EQ(NIXL_SUCCESS, status); + EXPECT_NE(nullptr, backend_handle); + return backend_handle; + } + + template + void + fillRegList(DListT &dlist, DescT &desc, const std::vector &data) { + desc.addr = reinterpret_cast(data.data()); + desc.len = data.size(); + desc.devId = 0; + dlist.addDesc(desc); + } +} // namespace nixl + +class TestUcclBackend : public testing::Test { + class Agent { + struct MemDesc { + MemDesc() : m_dlist(DRAM_SEG), m_desc() {} + + void + init(nixlBackendH *backend) { + m_params = {.backends = {backend}}; + nixl::fillRegList(m_dlist, m_desc, m_data); + } + + void + fillData() { + std::fill(m_data.begin(), m_data.end(), std::byte(std::rand())); + } + + static constexpr size_t m_data_size = 256; + std::vector m_data = std::vector(m_data_size); + nixl_opt_args_t m_params; + nixl_reg_dlist_t m_dlist; + nixlBlobDesc m_desc; + }; + + public: + void + init(const std::string &name); + + void + destroy(); + void + fillRegList(nixl_xfer_dlist_t &dlist, nixlBasicDesc &desc) const; + std::string + getLocalMD() const; + void + loadRemoteMD(const std::string &remote_name); + nixl_status_t + createXferReq(const nixl_xfer_op_t &op, + nixl_xfer_dlist_t &sReq_descs, + nixl_xfer_dlist_t &rReq_descs, + nixlXferReqH *&req_handle) const; + nixl_status_t + postXferReq(nixlXferReqH *req_handle) const; + nixl_status_t + waitForCompletion(nixlXferReqH *req_handle); + nixl_status_t + waitForNotif(const std::string &expectedNotif); + void + fillData(); + bool + dataCmp(const Agent &other) const; + + private: + std::string m_name; + nixlBackendH *m_backend = nullptr; + std::unique_ptr m_priv = nullptr; + std::string m_MetaRemote; + MemDesc m_mem; + }; + +protected: + enum class TestType { + BASIC_XFER, + LOAD_REMOTE_THEN_FAIL, + XFER_THEN_FAIL, + XFER_FAIL_RESTORE, + FAIL_AFTER_POST, + }; + + TestUcclBackend(); + template + void + testXfer(); + +private: + template + bool + failBeforePost(size_t iter); + template + bool + failAfterPost(size_t iter); + template + bool + isFailure(size_t iter); + template + size_t + numIter(); + void + exchangeMetaData(); + template + std::variant + postXfer(enum nixl_xfer_op_t op, size_t iter); + + ScopedEnv m_env; + Agent m_Initiator; + Agent m_Target; + std::string m_backend_name; +}; + +void +TestUcclBackend::Agent::init(const std::string &name) { + m_priv = std::make_unique(name, nixlAgentConfig(true)); + // Create UCCL backend for testing + m_backend = nixl::createUcclBackend(*m_priv); + m_mem.init(m_backend); + m_mem.fillData(); + + EXPECT_EQ(NIXL_SUCCESS, m_priv->registerMem(m_mem.m_dlist, &m_mem.m_params)); +} + +void +TestUcclBackend::Agent::destroy() { + m_priv->deregisterMem(m_mem.m_dlist, &m_mem.m_params); + m_priv->invalidateRemoteMD(m_MetaRemote); + m_priv.reset(); + m_backend = nullptr; +} + +void +TestUcclBackend::Agent::fillRegList(nixl_xfer_dlist_t &dlist, nixlBasicDesc &desc) const { + nixl::fillRegList(dlist, desc, m_mem.m_data); +} + +std::string +TestUcclBackend::Agent::getLocalMD() const { + std::string meta; + EXPECT_EQ(NIXL_SUCCESS, m_priv->getLocalMD(meta)); + return meta; +} + +void +TestUcclBackend::Agent::loadRemoteMD(const std::string &remote_name) { + EXPECT_EQ(NIXL_SUCCESS, m_priv->loadRemoteMD(remote_name, m_MetaRemote)) + << "Agent " << m_name << " failed to load remote metadata"; +} + +nixl_status_t +TestUcclBackend::Agent::createXferReq(const nixl_xfer_op_t &op, + nixl_xfer_dlist_t &sReq_descs, + nixl_xfer_dlist_t &rReq_descs, + nixlXferReqH *&req_handle) const { + nixl_opt_args_t extra_params = {.backends = {m_backend}}; + extra_params.notifMsg = "notification"; + extra_params.hasNotif = true; + return m_priv->createXferReq( + op, sReq_descs, rReq_descs, m_MetaRemote, req_handle, &extra_params); +} + +nixl_status_t +TestUcclBackend::Agent::postXferReq(nixlXferReqH *req_handle) const { + return m_priv->postXferReq(req_handle); +} + +nixl_status_t +TestUcclBackend::Agent::waitForCompletion(nixlXferReqH *req_handle) { + nixl_status_t status; + + do { + status = m_priv->getXferStatus(req_handle); + EXPECT_NE(NIXL_ERR_NOT_POSTED, status); + } while (status == NIXL_IN_PROG); + + m_priv->releaseXferReq(req_handle); + + return status; +} + +nixl_status_t +TestUcclBackend::Agent::waitForNotif(const std::string &expectedNotif) { + nixl_notifs_t notif_map; + + do { + EXPECT_EQ(NIXL_SUCCESS, m_priv->getNotifs(notif_map)); + } while (notif_map.empty()); + + std::vector notifs = notif_map[m_MetaRemote]; + EXPECT_EQ(1u, notifs.size()); + EXPECT_EQ(expectedNotif, notifs.front()); + return NIXL_SUCCESS; +} + +void +TestUcclBackend::Agent::fillData() { + m_mem.fillData(); +} + +bool +TestUcclBackend::Agent::dataCmp(const TestUcclBackend::Agent &other) const { + return m_mem.m_data == other.m_mem.m_data; +} + +TestUcclBackend::TestUcclBackend() { + m_backend_name = "Uccl"; + m_env.addVar("NIXL_PLUGIN_DIR", std::string(BUILD_DIR) + "/src/plugins/uccl"); +} + +template +void +TestUcclBackend::testXfer() { + if (op == NIXL_READ) { + m_env.addVar("UCCL_RCMODE", "1"); + } + const std::string initiator_name = "initiator"; + const std::string target_name = "target"; + + m_Initiator.init(initiator_name); + m_Target.init(target_name); + + exchangeMetaData(); + + for (size_t i = 0; i < numIter(); ++i) { + nixl_status_t status; + auto result = postXfer(op, i); + if (std::holds_alternative(result)) { + // Transfer completed immediately + status = std::get(result); + } else { + // Transfer was posted, wait for completion + nixlXferReqH *req_handle = std::get(result); + status = m_Initiator.waitForCompletion(req_handle); + } + + if (isFailure(i)) { + if (failBeforePost(i)) { + EXPECT_EQ(status, NIXL_ERR_REMOTE_DISCONNECT); + } else { + EXPECT_TRUE((status == NIXL_ERR_REMOTE_DISCONNECT) || (status == NIXL_SUCCESS)); + } + + if (test_type == TestType::XFER_FAIL_RESTORE) { + m_Target.init(target_name); + exchangeMetaData(); + } + } else { + EXPECT_EQ(NIXL_SUCCESS, status); + EXPECT_EQ(NIXL_SUCCESS, m_Target.waitForNotif("notification")); + EXPECT_TRUE(m_Target.dataCmp(m_Initiator)); + + // Update the data for the next iteration + m_Initiator.fillData(); + m_Target.fillData(); + } + } + + switch (test_type) { + case TestType::BASIC_XFER: + case TestType::XFER_FAIL_RESTORE: + m_Target.destroy(); + m_Initiator.destroy(); + return; + case TestType::LOAD_REMOTE_THEN_FAIL: + case TestType::XFER_THEN_FAIL: + case TestType::FAIL_AFTER_POST: + m_Initiator.destroy(); + return; + } +} + +template +bool +TestUcclBackend::failBeforePost(size_t iter) { + switch (test_type) { + case TestType::BASIC_XFER: + return false; + case TestType::LOAD_REMOTE_THEN_FAIL: + return iter == 0; + case TestType::XFER_THEN_FAIL: + case TestType::XFER_FAIL_RESTORE: + return iter == 1; + case TestType::FAIL_AFTER_POST: + return false; + } +} + +template +bool +TestUcclBackend::failAfterPost(size_t iter) { + return (test_type == TestType::FAIL_AFTER_POST) && (iter == 1); +} + +template +bool +TestUcclBackend::isFailure(size_t iter) { + return failBeforePost(iter) || failAfterPost(iter); +} + +template +size_t +TestUcclBackend::numIter() { + switch (test_type) { + case TestType::BASIC_XFER: + case TestType::LOAD_REMOTE_THEN_FAIL: + return 1; + case TestType::XFER_THEN_FAIL: + case TestType::FAIL_AFTER_POST: + return 2; + case TestType::XFER_FAIL_RESTORE: + return 3; + } +} + +void +TestUcclBackend::exchangeMetaData() { + m_Initiator.loadRemoteMD(m_Target.getLocalMD()); + m_Target.loadRemoteMD(m_Initiator.getLocalMD()); +} + +template +std::variant +TestUcclBackend::postXfer(enum nixl_xfer_op_t op, size_t iter) { + EXPECT_TRUE(op == NIXL_WRITE || op == NIXL_READ); + + nixlBasicDesc sReq_src; + nixl_xfer_dlist_t sReq_descs(DRAM_SEG); + m_Initiator.fillRegList(sReq_descs, sReq_src); + + nixlBasicDesc rReq_dst; + nixl_xfer_dlist_t rReq_descs(DRAM_SEG); + m_Target.fillRegList(rReq_descs, rReq_dst); + + nixlXferReqH *req_handle; + nixl_status_t status = m_Initiator.createXferReq(op, sReq_descs, rReq_descs, req_handle); + EXPECT_EQ(NIXL_SUCCESS, status) + << "createXferReq failed with unexpected error: " << nixlEnumStrings::statusStr(status); + + if (failBeforePost(iter)) { + m_Target.destroy(); + } + + status = m_Initiator.postXferReq(req_handle); + + if (failAfterPost(iter)) { + m_Target.destroy(); + } + + if (isFailure(iter) && (status == NIXL_ERR_REMOTE_DISCONNECT)) { + // failed handle destroyed on post + return status; + } + + EXPECT_LE(0, status) << "status: " << nixlEnumStrings::statusStr(status); + return req_handle; +} + +TEST_F(TestUcclBackend, BasicXfer) { + testXfer(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + testXfer(); +} + +// TODO: Enable failure tests after hardening corner cases +// TEST_F(TestUcclBackend, LoadRemoteThenFail) { +// testXfer(); +// testXfer(); +// } + +// TEST_F(TestUcclBackend, XferThenFail) { +// testXfer(); +// testXfer(); +// } + +// TEST_F(TestUcclBackend, XferFailRestore) { +// testXfer(); +// testXfer(); +// } + +// TEST_F(TestUcclBackend, XferPostThenFail) { +// testXfer(); +// testXfer(); +// } + +} // namespace gtest From cd2bbea4b7b18f5c3efd21e8967fa0b775080c0f Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Fri, 10 Oct 2025 12:16:26 +0530 Subject: [PATCH 02/15] Cleanup and minor changes to readme Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/README.md | 22 +++++++-- src/plugins/uccl/uccl_backend.cpp | 29 ++++------- src/plugins/uccl/uccl_backend.h | 7 ++- src/plugins/uccl/uccl_plugin.cpp | 80 ++++++++----------------------- 4 files changed, 48 insertions(+), 90 deletions(-) diff --git a/src/plugins/uccl/README.md b/src/plugins/uccl/README.md index 74e585f4d..3cad551f2 100644 --- a/src/plugins/uccl/README.md +++ b/src/plugins/uccl/README.md @@ -5,7 +5,7 @@ UCCL supports collectives, p2p communication and gpu-driven communication for ex ## Capabilities -Currently, the UCCL backend supports internode communication. Intranode communication will be added soon. +Currently, the UCCL backend supports internode communication over RDMA. Intranode communication will be added soon. ## Installation Guide @@ -18,7 +18,7 @@ Currently, the UCCL backend supports internode communication. Intranode communic sudo make install ``` -2. Build NIXL using regular method as in [README](https://github.com/ai-dynamo/nixl/blob/main/README.md). +2. Build NIXL using regular method as in [README](https://github.com/ai-dynamo/nixl/blob/main/README.md) ensuring `disable_uccl_backend` is set to `false`. ## Usage Guide @@ -32,7 +32,21 @@ Example Usage to create a NIXL agent with uccl engine on GPU 0: UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance ### Environment Variables -1. `UCCL_IB_GID_INDEX` : GID Index of the device to be used. Usually, its auto-detected. +1. `NCCL_IB_GID_INDEX` : GID Index of the device to be used. Usually, its auto-detected. 2. `UCCL_SOCKET_IFNAME` : The ethernet interface to be used for control socket communication. 3. `UCCL_IB_HCA` : HCAs to be used for UCCL connection. -4. `UCCL_RCMODE` : Set to either 0 or 1. To enable RDMA RC (Reliable Connection), set to 1. For `NIXL_READ` operations, set `UCCL_RCMODE` to 1. \ No newline at end of file +4. `UCCL_RCMODE` : Set to either 0 or 1. To enable RDMA RC (Reliable Connection), set to 1. For `NIXL_READ` operations, set `UCCL_RCMODE` to 1. + +### Usage References + +1) [NIXL Benchmark](https://github.com/uccl-project/uccl/blob/main/p2p/benchmarks/benchmark_nixl.py) in UCCL: Refer to this [README](https://github.com/uccl-project/uccl/tree/main/p2p) on how to run the script. + +2) [NIXL connector](https://github.com/praveingk/vllm/commit/fa67cd7edff076fee4914cc316a9833c2311a65d) in vLLM. + +### Road Map + +- [ ] Add Intra-node communication support + +- [ ] Add asynchronous posting of reads over multiple workers to mitigate latency increase upon fragmentation + +- [ ] Add support for other transport (TCP, TCP-X, etc.) \ No newline at end of file diff --git a/src/plugins/uccl/uccl_backend.cpp b/src/plugins/uccl/uccl_backend.cpp index 70b92bcda..8c8a92676 100644 --- a/src/plugins/uccl/uccl_backend.cpp +++ b/src/plugins/uccl/uccl_backend.cpp @@ -93,7 +93,7 @@ nixlUcclEngine::nixlUcclEngine(const nixlBackendInitParams *init_params) size_t dev_idx = getNixlParam(custom_params, "device_idx", 0); size_t num_cpus = getNixlParam(custom_params, "num_cpus", 4); int in_python = getNixlParam(custom_params, "in_python", 1); - NIXL_DEBUG << "Creating UCCL Engine for dev:" << dev_idx << " num_cpus:" << num_cpus; + NIXL_DEBUG << "Creating UCCL Engine for dev: " << dev_idx << ", num_cpus: " << num_cpus; engine_ = uccl_engine_create(dev_idx, num_cpus, (in_python == 1)); NIXL_DEBUG << "UCCL engine created"; @@ -108,8 +108,6 @@ nixlUcclEngine::~nixlUcclEngine() { uccl_mr_t *mr = reinterpret_cast(priv->mr_id); if (mr) { uccl_engine_mr_destroy(mr); - NIXL_DEBUG << "Deregistered memory during cleanup: " << addr - << " mr_id: " << priv->mr_id; } } delete priv; @@ -121,7 +119,6 @@ nixlUcclEngine::~nixlUcclEngine() { if (destroyed_agents.find(agent_name) == destroyed_agents.end()) { uccl_conn_t *conn = reinterpret_cast(conn_id); if (conn) { - NIXL_DEBUG << "Disconnecting from agent: " << agent_name; uccl_engine_conn_destroy(conn); destroyed_agents.insert(agent_name); } @@ -131,7 +128,7 @@ nixlUcclEngine::~nixlUcclEngine() { connected_agents_.clear(); if (engine_) { // Add a small delay to allow UCCL internal cleanup to complete - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); uccl_engine_destroy(engine_); engine_ = nullptr; } @@ -143,7 +140,8 @@ nixlUcclEngine::~nixlUcclEngine() { void nixlUcclEngine::startListener() { - NIXL_DEBUG << "Accepting UCCL connections"; + // The listener waits for connections from remote agents + NIXL_DEBUG << "UCCL accepting connections"; while (true) { char ip_buf[256]; int remote_gpu_idx; @@ -152,6 +150,7 @@ nixlUcclEngine::startListener() { NIXL_ERROR << "Failed to accept connection from remote agent"; continue; } + // Start the listener thread to send/get notifications from the remote agent uccl_engine_start_listener(conn); NIXL_DEBUG << "Connected to remote agent: " << ip_buf; connected_agents_[ip_buf] = reinterpret_cast(conn); @@ -169,7 +168,6 @@ nixlUcclEngine::getSupportedMems() const { nixl_status_t nixlUcclEngine::getPublicData(const nixlBackendMD *meta, std::string &str) const { nixlUcclBackendMD *priv = (nixlUcclBackendMD *)meta; - NIXL_DEBUG << "Getting Public data for : " << priv->addr; str = std::to_string(priv->mr_id); return NIXL_SUCCESS; } @@ -208,11 +206,9 @@ nixlUcclEngine::loadRemoteConnInfo(const std::string &remote_agent, return NIXL_ERR_BACKEND; } - // Simple role coordination: agent with smaller name acts as client - is_client_ = local_agent_name_ < remote_agent; uccl_conn_t *conn = nullptr; - NIXL_DEBUG << "Acting as CLIENT, connecting to " << ip_addr << ":" << port + NIXL_DEBUG << "Connecting to " << ip_addr << ":" << port << "?gpu=" << gpu_index << std::endl; conn = uccl_engine_connect(engine_, ip_addr, gpu_index, port); if (!conn) { @@ -503,6 +499,7 @@ nixlUcclEngine::postXfer(const nixl_xfer_op_t &operation, } // Process each descriptor pair + // TODO: Use a vector send async API to send all the transfers at once for (size_t i = 0; i < lcnt; i++) { lmd = (nixlUcclBackendMD *)local[i].metadataP; rmd = (nixlUcclBackendMD *)remote[i].metadataP; @@ -536,13 +533,11 @@ nixlUcclEngine::postXfer(const nixl_xfer_op_t &operation, uint64_t transfer_id = 0; switch (operation) { case NIXL_READ: { - NIXL_DEBUG << "Performing READ operation: receiving " << lsize << " bytes"; result = uccl_engine_read( conn, local_mr, lmd->addr, lsize, local_priv->fifo_item_data, &transfer_id); break; } case NIXL_WRITE: - NIXL_DEBUG << "Performing WRITE operation: sending " << lsize << " bytes"; result = uccl_engine_write(conn, local_mr, lmd->addr, lsize, &transfer_id); break; @@ -574,13 +569,11 @@ nixlUcclEngine::postXfer(const nixl_xfer_op_t &operation, nixl_status_t nixlUcclEngine::checkXfer(nixlBackendReqH *handle) const { - // TODO: Check transfer status if async if (!handle) { NIXL_ERROR << "Invalid handle provided to checkXfer"; return NIXL_ERR_INVALID_PARAM; } - // Cast to our custom handle type nixlUcclReqH *uccl_handle = dynamic_cast(handle); if (!uccl_handle) { NIXL_ERROR << "Invalid handle type for UCCL backend"; @@ -617,18 +610,15 @@ nixlUcclEngine::checkXfer(nixlBackendReqH *handle) const { NIXL_DEBUG << "All transfers in handle completed, sent notification: " << uccl_handle->notif_msg; } - NIXL_DEBUG << "Transfer status: " << (all_done ? "COMPLETED" : "IN_PROGRESS"); return (all_done) ? NIXL_SUCCESS : NIXL_IN_PROG; } nixl_status_t nixlUcclEngine::releaseReqH(nixlBackendReqH *handle) const { - // TODO: Release any resources associated with the transfer handle if (!handle) { - return NIXL_SUCCESS; // Nothing to release + return NIXL_SUCCESS; } - // Cast to our custom handle type and delete it nixlUcclReqH *uccl_handle = dynamic_cast(handle); if (uccl_handle) { delete uccl_handle; @@ -651,9 +641,6 @@ nixlUcclEngine::getNotifs(notif_list_t ¬if_list) { nixl_status_t nixlUcclEngine::genNotif(const std::string &remote_agent, const std::string &msg) const { - NIXL_DEBUG << "UCCL Gen Notify: " << remote_agent << " msg: " << msg; - - // Get the connection for this remote agent auto conn_iter = connected_agents_.find(remote_agent); if (conn_iter == connected_agents_.end()) { NIXL_ERROR << "No connection found for remote agent: " << remote_agent; diff --git a/src/plugins/uccl/uccl_backend.h b/src/plugins/uccl/uccl_backend.h index 34635975f..4301a6921 100644 --- a/src/plugins/uccl/uccl_backend.h +++ b/src/plugins/uccl/uccl_backend.h @@ -126,14 +126,13 @@ class nixlUcclEngine : public nixlBackendEngine { private: mutable std::mutex mutex_; uccl_engine_t *engine_; - bool is_client_; std::string local_agent_name_; std::unordered_map mem_reg_info_; std::unordered_map connected_agents_; // agent name -> conn_id std::thread listener_thread_; }; -// Minimal metadata struct for UCCL memory registration +// UCCL Backend Memory Descriptor class nixlUcclBackendMD : public nixlBackendMD { public: nixlUcclBackendMD(bool isPrivate) : nixlBackendMD(isPrivate) {} @@ -147,7 +146,7 @@ class nixlUcclBackendMD : public nixlBackendMD { char fifo_item_data[FIFO_ITEM_SIZE]; }; -// Custom request handle for UCCL transfers +// UCCL Backend Request Handle class nixlUcclReqH : public nixlBackendReqH { public: nixlUcclReqH(uccl_conn_t *conn) : conn(conn) {} @@ -157,7 +156,7 @@ class nixlUcclReqH : public nixlBackendReqH { uccl_conn_t *conn; std::vector transfer_ids; std::vector - completed_transfer_ids; // Track completed transfers to avoid double polling + completed_transfer_ids; nixl_blob_t notif_msg; }; diff --git a/src/plugins/uccl/uccl_plugin.cpp b/src/plugins/uccl/uccl_plugin.cpp index b7d3e50bc..be6ad8da1 100644 --- a/src/plugins/uccl/uccl_plugin.cpp +++ b/src/plugins/uccl/uccl_plugin.cpp @@ -18,77 +18,35 @@ #include "backend/backend_plugin.h" #include "uccl_backend.h" -// Plugin version information -static const char *PLUGIN_NAME = "Uccl"; -static const char *PLUGIN_VERSION = "0.0.1"; -// Function to create a new UCCL backend engine instance -static nixlBackendEngine * -create_uccl_engine(const nixlBackendInitParams *init_params) { - return new nixlUcclEngine(init_params); -} - -static void -destroy_uccl_engine(nixlBackendEngine *engine) { - delete engine; -} - -// Function to get the plugin name -static const char * -get_plugin_name() { - return PLUGIN_NAME; -} - -// Function to get the plugin version -static const char * -get_plugin_version() { - return PLUGIN_VERSION; -} +namespace { + nixl_b_params_t + get_uccl_options() { + nixl_b_params_t params; + params["device_idx"] = ""; + params["in_python"] = ""; + params["num_cpus"] = ""; + return params; + } +} // namespace -// Function to get backend options -static nixl_b_params_t -get_backend_options() { - nixl_b_params_t params; - params["device_idx"] = ""; - return params; -} - -// Function to get supported backend mem types -static nixl_mem_list_t -get_backend_mems() { - nixl_mem_list_t mems; - mems.push_back(DRAM_SEG); - mems.push_back(VRAM_SEG); - return mems; -} -// Static plugin structure -static nixlBackendPlugin plugin = {NIXL_PLUGIN_API_VERSION, - create_uccl_engine, - destroy_uccl_engine, - get_plugin_name, - get_plugin_version, - get_backend_options, - get_backend_mems}; +// Plugin type alias for convenience +using uccl_plugin_t = nixlBackendPluginCreator; #ifdef STATIC_PLUGIN_UCCL - nixlBackendPlugin * -createStaticUcclPlugin() { - return &plugin; // Return the static plugin instance +createStaticUCCLPlugin() { + return uccl_plugin_t::create( + NIXL_PLUGIN_API_VERSION, "UCCL", "0.1.0", get_uccl_options(), {DRAM_SEG, VRAM_SEG}); } - #else - -// Plugin initialization function extern "C" NIXL_PLUGIN_EXPORT nixlBackendPlugin * nixl_plugin_init() { - return &plugin; + return uccl_plugin_t::create( + NIXL_PLUGIN_API_VERSION, "UCCL", "0.1.0", get_uccl_options(), {DRAM_SEG, VRAM_SEG}); } -// Plugin cleanup function extern "C" NIXL_PLUGIN_EXPORT void -nixl_plugin_fini() { - // Cleanup any resources if needed -} -#endif \ No newline at end of file +nixl_plugin_fini() {} +#endif From 7e211cbd29ebcb645c2b95d8dc6f7395a602283b Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Fri, 10 Oct 2025 16:51:06 +0530 Subject: [PATCH 03/15] Further cleanup including moving startListener to private Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/README.md | 2 +- src/plugins/uccl/uccl_backend.cpp | 11 +++++++++++ src/plugins/uccl/uccl_backend.h | 7 ++++--- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/plugins/uccl/README.md b/src/plugins/uccl/README.md index 3cad551f2..b6af48599 100644 --- a/src/plugins/uccl/README.md +++ b/src/plugins/uccl/README.md @@ -41,7 +41,7 @@ UCCL engine would auto discover the right NIC to be used for the GPU based on th 1) [NIXL Benchmark](https://github.com/uccl-project/uccl/blob/main/p2p/benchmarks/benchmark_nixl.py) in UCCL: Refer to this [README](https://github.com/uccl-project/uccl/tree/main/p2p) on how to run the script. -2) [NIXL connector](https://github.com/praveingk/vllm/commit/fa67cd7edff076fee4914cc316a9833c2311a65d) in vLLM. +2) [NIXL connector](https://github.com/praveingk/vllm/commit/fa67cd7edff076fee4914cc316a9833c2311a65d) in vLLM. vLLM's NIXL connector uses `NIXL_READ`, hence set env `UCCL_RCMODE` to 1. ### Road Map diff --git a/src/plugins/uccl/uccl_backend.cpp b/src/plugins/uccl/uccl_backend.cpp index 8c8a92676..5dd575276 100644 --- a/src/plugins/uccl/uccl_backend.cpp +++ b/src/plugins/uccl/uccl_backend.cpp @@ -162,6 +162,7 @@ nixlUcclEngine::getSupportedMems() const { nixl_mem_list_t mems; mems.push_back(DRAM_SEG); mems.push_back(VRAM_SEG); + return mems; } @@ -169,6 +170,7 @@ nixl_status_t nixlUcclEngine::getPublicData(const nixlBackendMD *meta, std::string &str) const { nixlUcclBackendMD *priv = (nixlUcclBackendMD *)meta; str = std::to_string(priv->mr_id); + return NIXL_SUCCESS; } @@ -187,6 +189,7 @@ nixlUcclEngine::getConnInfo(std::string &str) const { str = std::string(metadata); delete[] metadata; NIXL_DEBUG << "UCCL engine metadata: " << str; + return NIXL_SUCCESS; } @@ -224,6 +227,7 @@ nixlUcclEngine::loadRemoteConnInfo(const std::string &remote_agent, connected_agents_[remote_agent] = reinterpret_cast(conn); delete[] ip_addr; + return NIXL_SUCCESS; } @@ -251,6 +255,7 @@ nixlUcclEngine::disconnect(const std::string &remote_agent) { uccl_engine_conn_destroy(conn); connected_agents_.erase(remote_agent); } + return NIXL_SUCCESS; } @@ -320,6 +325,7 @@ nixlUcclEngine::loadLocalMD(nixlBackendMD *input, nixlBackendMD *&output) { output_md->length = input_md->length; output_md->ref_cnt = 1; output_md->mr_id = reinterpret_cast(input_md->mr_id); + return NIXL_SUCCESS; } @@ -337,6 +343,7 @@ nixlUcclEngine::loadRemoteMD(const nixlBlobDesc &input, output_md->length = input.len; output_md->ref_cnt = 1; output_md->mr_id = strtoul(input.metaInfo.c_str(), NULL, 10); + return NIXL_SUCCESS; } @@ -344,6 +351,7 @@ nixl_status_t nixlUcclEngine::unloadMD(nixlBackendMD *input) { nixlUcclBackendMD *md = (nixlUcclBackendMD *)input; delete md; + return NIXL_SUCCESS; } @@ -461,6 +469,7 @@ nixlUcclEngine::prepXfer(const nixl_xfer_op_t &operation, } } } + return NIXL_SUCCESS; } @@ -610,6 +619,7 @@ nixlUcclEngine::checkXfer(nixlBackendReqH *handle) const { NIXL_DEBUG << "All transfers in handle completed, sent notification: " << uccl_handle->notif_msg; } + return (all_done) ? NIXL_SUCCESS : NIXL_IN_PROG; } @@ -662,5 +672,6 @@ nixlUcclEngine::genNotif(const std::string &remote_agent, const std::string &msg NIXL_ERROR << "Failed to send notify message"; return NIXL_ERR_BACKEND; } + return NIXL_SUCCESS; } diff --git a/src/plugins/uccl/uccl_backend.h b/src/plugins/uccl/uccl_backend.h index 4301a6921..b7b641552 100644 --- a/src/plugins/uccl/uccl_backend.h +++ b/src/plugins/uccl/uccl_backend.h @@ -49,6 +49,7 @@ class nixlUcclEngine : public nixlBackendEngine { bool supportsLocal() const { + // TODO: Enable this when local transfers are supported return false; } @@ -62,9 +63,6 @@ class nixlUcclEngine : public nixlBackendEngine { return false; } - void - startListener(); - nixl_mem_list_t getSupportedMems() const; @@ -124,6 +122,9 @@ class nixlUcclEngine : public nixlBackendEngine { genNotif(const std::string &remote_agent, const std::string &msg) const override; private: + void + startListener(); + mutable std::mutex mutex_; uccl_engine_t *engine_; std::string local_agent_name_; From b5e3c5b831d9bde1eb4c2a54f0ef775dfda78f96 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Fri, 10 Oct 2025 17:06:21 +0530 Subject: [PATCH 04/15] Revert changes to plugin manager test Signed-off-by: Pravein Govindan Kannan --- test/gtest/plugin_manager.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/test/gtest/plugin_manager.cpp b/test/gtest/plugin_manager.cpp index 6159a8310..8423154c5 100644 --- a/test/gtest/plugin_manager.cpp +++ b/test/gtest/plugin_manager.cpp @@ -194,6 +194,7 @@ INSTANTIATE_TEST_SUITE_P(GdsLoadPluginInstantiation, INSTANTIATE_TEST_SUITE_P(UcxMoLoadPluginInstantiation, LoadSinglePluginTestFixture, testing::Values(ucx_mo_plugin_desc)); + /* Load multiple plugins tests instantiations. */ INSTANTIATE_TEST_SUITE_P(UcxGdsLoadMultiplePluginInstantiation, LoadMultiplePluginsTestFixture, @@ -206,8 +207,14 @@ INSTANTIATE_TEST_SUITE_P(UcxUcxMoLoadMultiplePluginInstantiation, ucx_mo_plugin_desc})); INSTANTIATE_TEST_SUITE_P(GdsUcxMoLoadMultiplePluginInstantiation, LoadMultiplePluginsTestFixture, - testing::Values(std::vector{gds_plugin_desc, - ucx_mo_plugin_desc})); + testing::Values(std::vector{ + gds_plugin_desc, + ucx_mo_plugin_desc})); +INSTANTIATE_TEST_SUITE_P(UcxGdsUcxMoLoadMultiplePluginInstantiation, + LoadMultiplePluginsTestFixture, + testing::Values(std::vector{ + ucx_plugin_desc, gds_plugin_desc, + ucx_mo_plugin_desc})); } // namespace plugin_manager } // namespace gtest From 9648e6e6fae9387e6cb43b0d340745e7d21b7ecb Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 13 Oct 2025 12:16:57 +0530 Subject: [PATCH 05/15] Fix formatting Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/uccl_backend.cpp | 3 +-- src/plugins/uccl/uccl_backend.h | 3 +-- src/plugins/uccl/uccl_plugin.cpp | 18 ++++++++---------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/plugins/uccl/uccl_backend.cpp b/src/plugins/uccl/uccl_backend.cpp index 5dd575276..2eab5aaf7 100644 --- a/src/plugins/uccl/uccl_backend.cpp +++ b/src/plugins/uccl/uccl_backend.cpp @@ -211,8 +211,7 @@ nixlUcclEngine::loadRemoteConnInfo(const std::string &remote_agent, uccl_conn_t *conn = nullptr; - NIXL_DEBUG << "Connecting to " << ip_addr << ":" << port - << "?gpu=" << gpu_index << std::endl; + NIXL_DEBUG << "Connecting to " << ip_addr << ":" << port << "?gpu=" << gpu_index << std::endl; conn = uccl_engine_connect(engine_, ip_addr, gpu_index, port); if (!conn) { NIXL_ERROR << "Failed to connect to remote agent " << remote_agent; diff --git a/src/plugins/uccl/uccl_backend.h b/src/plugins/uccl/uccl_backend.h index b7b641552..bc80956ee 100644 --- a/src/plugins/uccl/uccl_backend.h +++ b/src/plugins/uccl/uccl_backend.h @@ -156,8 +156,7 @@ class nixlUcclReqH : public nixlBackendReqH { uccl_conn_t *conn; std::vector transfer_ids; - std::vector - completed_transfer_ids; + std::vector completed_transfer_ids; nixl_blob_t notif_msg; }; diff --git a/src/plugins/uccl/uccl_plugin.cpp b/src/plugins/uccl/uccl_plugin.cpp index be6ad8da1..7c7eb1a14 100644 --- a/src/plugins/uccl/uccl_plugin.cpp +++ b/src/plugins/uccl/uccl_plugin.cpp @@ -18,19 +18,17 @@ #include "backend/backend_plugin.h" #include "uccl_backend.h" - namespace { - nixl_b_params_t - get_uccl_options() { - nixl_b_params_t params; - params["device_idx"] = ""; - params["in_python"] = ""; - params["num_cpus"] = ""; - return params; - } +nixl_b_params_t +get_uccl_options() { + nixl_b_params_t params; + params["device_idx"] = ""; + params["in_python"] = ""; + params["num_cpus"] = ""; + return params; +} } // namespace - // Plugin type alias for convenience using uccl_plugin_t = nixlBackendPluginCreator; From 489ecd0f926c8170927a10cf08fcf5aa778ad1e2 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 13 Oct 2025 14:08:39 +0530 Subject: [PATCH 06/15] Add more details to readme Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/uccl/README.md b/src/plugins/uccl/README.md index b6af48599..60f970971 100644 --- a/src/plugins/uccl/README.md +++ b/src/plugins/uccl/README.md @@ -1,6 +1,6 @@ ## UCCL Backend Plugin [Preview] -[UCCL](https://github.com/uccl-project/uccl) is an efficient communication library to perform GPU memory transfers, with a focus on flexibility (evolving ML workloads) and portability (heteregenous GPUs). +[UCCL](https://github.com/uccl-project/uccl) is an efficient communication library to perform GPU memory transfers, with a focus on flexibility (evolving ML workloads) and portability (heteregenous GPUs). UCCL provides a software transport stack which runs on the CPUs and are easily extensible to support different techniques like congestion control, multipathing, efficient loss recovery, etc. UCCL supports collectives, p2p communication and gpu-driven communication for expert parallelism. ## Capabilities @@ -29,7 +29,7 @@ Example Usage to create a NIXL agent with uccl engine on GPU 0: config = nixl_agent_config(device_idx=0, backends=["Uccl"]) agent = nixl_agent("agent-name", config) ``` -UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance +UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance, and intialize the engine which manages communication for that NIC. This is the reason why its necessary to provide the gpu device index during agent creation. ### Environment Variables 1. `NCCL_IB_GID_INDEX` : GID Index of the device to be used. Usually, its auto-detected. From 25da2118572ffd3898ed587d5e54cfe6f9c27594 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 13 Oct 2025 14:11:05 +0530 Subject: [PATCH 07/15] Remove trailing whitespaces in readme Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/uccl/README.md b/src/plugins/uccl/README.md index 60f970971..583f16d9e 100644 --- a/src/plugins/uccl/README.md +++ b/src/plugins/uccl/README.md @@ -24,7 +24,7 @@ Currently, the UCCL backend supports internode communication over RDMA. Intranod ### Additional Parameters 1. `device_idx` : Specifies which GPU the UCCL engine will be affined to. -Example Usage to create a NIXL agent with uccl engine on GPU 0: +Example Usage to create a NIXL agent with uccl engine on GPU 0: ```python config = nixl_agent_config(device_idx=0, backends=["Uccl"]) agent = nixl_agent("agent-name", config) @@ -32,7 +32,7 @@ Example Usage to create a NIXL agent with uccl engine on GPU 0: UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance, and intialize the engine which manages communication for that NIC. This is the reason why its necessary to provide the gpu device index during agent creation. ### Environment Variables -1. `NCCL_IB_GID_INDEX` : GID Index of the device to be used. Usually, its auto-detected. +1. `NCCL_IB_GID_INDEX` : GID Index of the device to be used. Usually, its auto-detected. 2. `UCCL_SOCKET_IFNAME` : The ethernet interface to be used for control socket communication. 3. `UCCL_IB_HCA` : HCAs to be used for UCCL connection. 4. `UCCL_RCMODE` : Set to either 0 or 1. To enable RDMA RC (Reliable Connection), set to 1. For `NIXL_READ` operations, set `UCCL_RCMODE` to 1. From baa7c855c1df2cdbe1549311eafde1cb0df432ca Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 13 Oct 2025 14:20:09 +0530 Subject: [PATCH 08/15] Remove trailing whitespaces in test Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/uccl_backend.h | 2 +- test/gtest/plugins/uccl/meson.build | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/plugins/uccl/uccl_backend.h b/src/plugins/uccl/uccl_backend.h index bc80956ee..b75490f01 100644 --- a/src/plugins/uccl/uccl_backend.h +++ b/src/plugins/uccl/uccl_backend.h @@ -160,4 +160,4 @@ class nixlUcclReqH : public nixlBackendReqH { nixl_blob_t notif_msg; }; -#endif \ No newline at end of file +#endif diff --git a/test/gtest/plugins/uccl/meson.build b/test/gtest/plugins/uccl/meson.build index fd26d48ec..6f458aff7 100644 --- a/test/gtest/plugins/uccl/meson.build +++ b/test/gtest/plugins/uccl/meson.build @@ -47,17 +47,17 @@ uccl_test_exe = executable('uccl_gtest', include_directories: [nixl_inc_dirs, utils_inc_dirs, gtest_inc_dirs, '.'], cpp_args : cpp_flags, dependencies : [ - nixl_dep, - nixl_infra, - nixl_common_deps, - thread_dep, - python_dep, - pybind11_dep, - uccl_rdma_lib, + nixl_dep, + nixl_infra, + nixl_common_deps, + thread_dep, + python_dep, + pybind11_dep, + uccl_rdma_lib, uccl_engine_lib, - gtest_dep, - gmock_dep, - absl_strings_dep, + gtest_dep, + gmock_dep, + absl_strings_dep, absl_time_dep ] + cuda_dependencies, link_with: [nixl_build_lib], From db047c03193ebae858fad93d529c1fe0538739b7 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 13 Oct 2025 14:25:26 +0530 Subject: [PATCH 09/15] Remove num_cpus from agent config Signed-off-by: Pravein Govindan Kannan --- src/api/python/_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/python/_api.py b/src/api/python/_api.py index c37acdc3b..ec2203eb8 100644 --- a/src/api/python/_api.py +++ b/src/api/python/_api.py @@ -236,7 +236,7 @@ def __init__( elif bknd == "GDS_MT": init["thread_count"] = str(nixl_conf.num_threads) elif bknd == "Uccl": - init["num_cpus"] = str(nixl_conf.num_cpus) + init["num_cpus"] = str(nixl_conf.num_threads) if nixl_conf.device_idx > 0: if bknd == "Uccl": init["device_idx"] = str(nixl_conf.device_idx) From 524e078c27e80a03284f2330415b759147434a93 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 13 Oct 2025 14:27:38 +0530 Subject: [PATCH 10/15] Fix typo in uccl backend Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/uccl_backend.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/uccl/uccl_backend.cpp b/src/plugins/uccl/uccl_backend.cpp index 2eab5aaf7..60bf013d4 100644 --- a/src/plugins/uccl/uccl_backend.cpp +++ b/src/plugins/uccl/uccl_backend.cpp @@ -27,7 +27,7 @@ // Parse connection string in format: ip_addr:port?gpu_index bool parseConnectionString(const std::string &conn_str, char *&ip_addr, int &port, int &gpu_index) { - // Exit with errror if neither : or ? is found in conn_str + // Exit with error if neither : or ? is found in conn_str size_t colon_pos = conn_str.find(':'); if (colon_pos == std::string::npos) { NIXL_ERROR << "Invalid connection string format: missing colon separator"; From dde50c57afba8a56ec1a0978b28c1c856d9c79da Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 27 Oct 2025 11:54:02 +0530 Subject: [PATCH 11/15] Add UCCL to test_plugin Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/meson.build | 8 ++++---- test/nixl/test_plugin.cpp | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/plugins/uccl/meson.build b/src/plugins/uccl/meson.build index 358540b8f..9a6e4a7ee 100644 --- a/src/plugins/uccl/meson.build +++ b/src/plugins/uccl/meson.build @@ -19,8 +19,8 @@ uccl_lib = cpp.find_library('uccl_engine') compile_flags = [] -if 'Uccl' in static_plugins - uccl_backend_lib = static_library('Uccl', +if 'UCCL' in static_plugins + uccl_backend_lib = static_library('UCCL', 'uccl_backend.cpp', 'uccl_backend.h', 'uccl_plugin.cpp', dependencies: [nixl_infra, serdes_interface, cuda_dep, uccl_lib, nixl_common_dep], include_directories: nixl_inc_dirs, @@ -28,7 +28,7 @@ if 'Uccl' in static_plugins cpp_args : compile_flags, name_prefix: 'libplugin_') # Custom prefix for plugin libraries else - uccl_backend_lib = shared_library('Uccl', + uccl_backend_lib = shared_library('UCCL', 'uccl_backend.cpp', 'uccl_backend.h', 'uccl_plugin.cpp', dependencies: [nixl_infra, serdes_interface, cuda_dep, uccl_lib, nixl_common_dep], include_directories: [nixl_inc_dirs, utils_inc_dirs], @@ -39,7 +39,7 @@ else if get_option('buildtype') == 'debug' run_command('sh', '-c', - 'echo "Uccl=' + uccl_backend_lib.full_path() + '" >> ' + plugin_build_dir + '/pluginlist', + 'echo "UCCL=' + uccl_backend_lib.full_path() + '" >> ' + plugin_build_dir + '/pluginlist', check: true ) endif diff --git a/test/nixl/test_plugin.cpp b/test/nixl/test_plugin.cpp index dcbf1bd96..ed09707fb 100644 --- a/test/nixl/test_plugin.cpp +++ b/test/nixl/test_plugin.cpp @@ -66,7 +66,7 @@ int main(int argc, char** argv) { std::set staticPlugs; std::set plugins = { - "UCX", "GDS", "POSIX", "UCX_MO", "MOCK_BACKEND", "GPUNETIO", "OBJ", "GDS_MT", "LIBFABRIC"}; + "UCX", "GDS", "POSIX", "UCX_MO", "MOCK_BACKEND", "GPUNETIO", "OBJ", "GDS_MT", "LIBFABRIC", "UCCL"}; if (argc > 1 && (std::string(argv[1]) == "-h" || std::string(argv[1]) == "--help")) { print_usage(argv[0]); From 7eb674c6c00cb9ceb5fe2d83adb31753c3e8a67d Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 27 Oct 2025 11:54:38 +0530 Subject: [PATCH 12/15] Modify env vars descriptions and road map to README Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl/README.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/plugins/uccl/README.md b/src/plugins/uccl/README.md index 583f16d9e..025e98ea1 100644 --- a/src/plugins/uccl/README.md +++ b/src/plugins/uccl/README.md @@ -32,21 +32,23 @@ Example Usage to create a NIXL agent with uccl engine on GPU 0: UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance, and intialize the engine which manages communication for that NIC. This is the reason why its necessary to provide the gpu device index during agent creation. ### Environment Variables -1. `NCCL_IB_GID_INDEX` : GID Index of the device to be used. Usually, its auto-detected. -2. `UCCL_SOCKET_IFNAME` : The ethernet interface to be used for control socket communication. -3. `UCCL_IB_HCA` : HCAs to be used for UCCL connection. -4. `UCCL_RCMODE` : Set to either 0 or 1. To enable RDMA RC (Reliable Connection), set to 1. For `NIXL_READ` operations, set `UCCL_RCMODE` to 1. + +Refer to [README](https://github.com/uccl-project/uccl/tree/main/collective/rdma#environment-variables-in-uccl) for the complete list of environment variables that can be set to customize UCCL. + +**Important**: For `NIXL_READ` operations set `UCCL_RCMODE=1`. By default, UCCL uses RDMA UC (Unreliable Connection). However, `READ` operations need to operate on RDMA RC (Reliable Connection). ### Usage References 1) [NIXL Benchmark](https://github.com/uccl-project/uccl/blob/main/p2p/benchmarks/benchmark_nixl.py) in UCCL: Refer to this [README](https://github.com/uccl-project/uccl/tree/main/p2p) on how to run the script. -2) [NIXL connector](https://github.com/praveingk/vllm/commit/fa67cd7edff076fee4914cc316a9833c2311a65d) in vLLM. vLLM's NIXL connector uses `NIXL_READ`, hence set env `UCCL_RCMODE` to 1. +2) [NIXL connector](https://github.com/praveingk/vllm/commit/fa67cd7edff076fee4914cc316a9833c2311a65d) in vLLM. vLLM's NIXL connector uses `NIXL_READ` operations, hence set env `UCCL_RCMODE` to 1. ### Road Map - [ ] Add Intra-node communication support +- [ ] Add Progress Thread support + - [ ] Add asynchronous posting of reads over multiple workers to mitigate latency increase upon fragmentation - [ ] Add support for other transport (TCP, TCP-X, etc.) \ No newline at end of file From 96866cda65120377be2d3df66e6bb1415682a892 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Thu, 30 Oct 2025 10:41:53 +0530 Subject: [PATCH 13/15] Remove custom parameter device_idx Signed-off-by: Pravein Govindan Kannan --- src/api/python/_api.py | 6 ------ src/plugins/uccl/README.md | 9 ++++----- src/plugins/uccl/uccl_backend.cpp | 4 +--- src/plugins/uccl/uccl_plugin.cpp | 1 - 4 files changed, 5 insertions(+), 15 deletions(-) diff --git a/src/api/python/_api.py b/src/api/python/_api.py index ec2203eb8..28d9fb50d 100644 --- a/src/api/python/_api.py +++ b/src/api/python/_api.py @@ -126,7 +126,6 @@ def __del__(self): @param listen_port Specify the port for the listener thread to listen on. @param capture_telemetry Whether to enable telemetry capture. @param num_threads Specify number of threads for the supported multi-threaded backends. -@param device_idx Specify the GPU index to be initialized for the backend. @param backends List of backend names for agent to initialize. Default is UCX, other backends can be added to the list, or after agent creation, can be initialized with create_backend. @@ -141,7 +140,6 @@ def __init__( listen_port: int = 0, capture_telemetry: bool = False, num_threads: int = 0, - device_idx: int = 0, backends: list[str] = ["UCX"], ): # TODO: add backend init parameters @@ -151,7 +149,6 @@ def __init__( self.port = listen_port self.capture_telemetry = capture_telemetry self.num_threads = num_threads - self.device_idx = device_idx """ @@ -237,9 +234,6 @@ def __init__( init["thread_count"] = str(nixl_conf.num_threads) elif bknd == "Uccl": init["num_cpus"] = str(nixl_conf.num_threads) - if nixl_conf.device_idx > 0: - if bknd == "Uccl": - init["device_idx"] = str(nixl_conf.device_idx) self.create_backend(bknd, init) self.nixl_mems = { diff --git a/src/plugins/uccl/README.md b/src/plugins/uccl/README.md index 025e98ea1..5f547418a 100644 --- a/src/plugins/uccl/README.md +++ b/src/plugins/uccl/README.md @@ -22,14 +22,13 @@ Currently, the UCCL backend supports internode communication over RDMA. Intranod ## Usage Guide -### Additional Parameters -1. `device_idx` : Specifies which GPU the UCCL engine will be affined to. -Example Usage to create a NIXL agent with uccl engine on GPU 0: +Example Usage to create a NIXL agent with uccl engine: + ```python - config = nixl_agent_config(device_idx=0, backends=["Uccl"]) + config = nixl_agent_config(backends=["UCCL"]) agent = nixl_agent("agent-name", config) ``` -UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance, and intialize the engine which manages communication for that NIC. This is the reason why its necessary to provide the gpu device index during agent creation. +UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance during memory registration based on the data locality. ### Environment Variables diff --git a/src/plugins/uccl/uccl_backend.cpp b/src/plugins/uccl/uccl_backend.cpp index 60bf013d4..97ab8e61e 100644 --- a/src/plugins/uccl/uccl_backend.cpp +++ b/src/plugins/uccl/uccl_backend.cpp @@ -90,11 +90,9 @@ nixlUcclEngine::nixlUcclEngine(const nixlBackendInitParams *init_params) local_agent_name_ = init_params->localAgent; nixl_b_params_t *custom_params = init_params->customParams; - size_t dev_idx = getNixlParam(custom_params, "device_idx", 0); size_t num_cpus = getNixlParam(custom_params, "num_cpus", 4); int in_python = getNixlParam(custom_params, "in_python", 1); - NIXL_DEBUG << "Creating UCCL Engine for dev: " << dev_idx << ", num_cpus: " << num_cpus; - engine_ = uccl_engine_create(dev_idx, num_cpus, (in_python == 1)); + engine_ = uccl_engine_create(num_cpus, (in_python == 1)); NIXL_DEBUG << "UCCL engine created"; listener_thread_ = std::thread(&nixlUcclEngine::startListener, this); diff --git a/src/plugins/uccl/uccl_plugin.cpp b/src/plugins/uccl/uccl_plugin.cpp index 7c7eb1a14..dc7f7e3cf 100644 --- a/src/plugins/uccl/uccl_plugin.cpp +++ b/src/plugins/uccl/uccl_plugin.cpp @@ -22,7 +22,6 @@ namespace { nixl_b_params_t get_uccl_options() { nixl_b_params_t params; - params["device_idx"] = ""; params["in_python"] = ""; params["num_cpus"] = ""; return params; From bc6b1914b310b47f90963d727c5866141ddec4fa Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Fri, 21 Nov 2025 12:08:31 +0530 Subject: [PATCH 14/15] Rename to UCCL P2P Signed-off-by: Pravein Govindan Kannan --- meson_options.txt | 2 +- src/api/python/_api.py | 2 +- src/plugins/meson.build | 6 +++--- src/plugins/{uccl => uccl_p2p}/README.md | 18 ++++++++--------- src/plugins/{uccl => uccl_p2p}/meson.build | 20 +++++++++---------- .../uccl_p2p_backend.cpp} | 4 ++-- .../uccl_p2p_backend.h} | 0 .../uccl_p2p_plugin.cpp} | 14 ++++++------- test/gtest/plugins/meson.build | 2 +- .../plugins/{uccl => uccl_p2p}/meson.build | 16 +++++++-------- .../uccl_p2p_test.cpp} | 10 +++++----- test/nixl/test_plugin.cpp | 2 +- 12 files changed, 47 insertions(+), 49 deletions(-) rename src/plugins/{uccl => uccl_p2p}/README.md (65%) rename src/plugins/{uccl => uccl_p2p}/meson.build (72%) rename src/plugins/{uccl/uccl_backend.cpp => uccl_p2p/uccl_p2p_backend.cpp} (99%) rename src/plugins/{uccl/uccl_backend.h => uccl_p2p/uccl_p2p_backend.h} (100%) rename src/plugins/{uccl/uccl_plugin.cpp => uccl_p2p/uccl_p2p_plugin.cpp} (74%) rename test/gtest/plugins/{uccl => uccl_p2p}/meson.build (77%) rename test/gtest/plugins/{uccl/uccl_test.cpp => uccl_p2p/uccl_p2p_test.cpp} (98%) diff --git a/meson_options.txt b/meson_options.txt index e8d428194..92fea7df2 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -19,7 +19,7 @@ option('etcd_inc_path', type: 'string', value: '', description: 'Path to ETCD He option('etcd_lib_path', type: 'string', value: '', description: 'Path to ETCD Libraries') option('disable_gds_backend', type : 'boolean', value : false, description : 'disable gds backend') option('disable_mooncake_backend', type : 'boolean', value : false, description : 'disable mooncake backend') -option('disable_uccl_backend', type : 'boolean', value : false, description : 'disable uccl backend') +option('disable_uccl_p2p_backend', type : 'boolean', value : false, description : 'disable UCCL P2P backend') option('install_headers', type : 'boolean', value : true, description : 'install headers') option('gds_path', type: 'string', value: '/usr/local/cuda/', description: 'Path to GDS CuFile install') option('cudapath_inc', type: 'string', value: '', description: 'Include path for CUDA') diff --git a/src/api/python/_api.py b/src/api/python/_api.py index 28d9fb50d..2ea066427 100644 --- a/src/api/python/_api.py +++ b/src/api/python/_api.py @@ -232,7 +232,7 @@ def __init__( init["num_threads"] = str(nixl_conf.num_threads) elif bknd == "GDS_MT": init["thread_count"] = str(nixl_conf.num_threads) - elif bknd == "Uccl": + elif bknd == "UCCL_P2P": init["num_cpus"] = str(nixl_conf.num_threads) self.create_backend(bknd, init) diff --git a/src/plugins/meson.build b/src/plugins/meson.build index b1fe14e67..55e945e37 100644 --- a/src/plugins/meson.build +++ b/src/plugins/meson.build @@ -42,9 +42,9 @@ if libtransfer_engine.found() and not get_option('disable_mooncake_backend') subdir('mooncake') endif -libuccl_engine = cc.find_library('uccl_engine', required: false) -if libuccl_engine.found() and not get_option('disable_uccl_backend') - subdir('uccl') +libuccl_p2p = cc.find_library('uccl_p2p', required: false) +if libuccl_p2p.found() and not get_option('disable_uccl_p2p_backend') + subdir('uccl_p2p') endif hf3fs_lib_path = '/usr/lib/' diff --git a/src/plugins/uccl/README.md b/src/plugins/uccl_p2p/README.md similarity index 65% rename from src/plugins/uccl/README.md rename to src/plugins/uccl_p2p/README.md index 5f547418a..db8879322 100644 --- a/src/plugins/uccl/README.md +++ b/src/plugins/uccl_p2p/README.md @@ -1,15 +1,15 @@ -## UCCL Backend Plugin [Preview] +## UCCL P2P Backend Plugin [Preview] -[UCCL](https://github.com/uccl-project/uccl) is an efficient communication library to perform GPU memory transfers, with a focus on flexibility (evolving ML workloads) and portability (heteregenous GPUs). UCCL provides a software transport stack which runs on the CPUs and are easily extensible to support different techniques like congestion control, multipathing, efficient loss recovery, etc. -UCCL supports collectives, p2p communication and gpu-driven communication for expert parallelism. +[UCCL](https://github.com/uccl-project/uccl) is an efficient communication library to perform GPU memory transfers, with a focus on flexibility (evolving ML workloads) and portability (heteregenous GPUs/NICs). UCCL provides a software transport stack which runs on the CPUs and are easily extensible to support different techniques like congestion control, multipathing, efficient loss recovery, etc. +UCCL supports collectives for training, P2P communication for PD disaggregation and gpu-driven communication for expert parallelism. This backend plugin adds support for UCCL P2P. ## Capabilities -Currently, the UCCL backend supports internode communication over RDMA. Intranode communication will be added soon. +Currently, the UCCL P2P backend supports internode communication over RDMA. Intranode communication will be added soon. ## Installation Guide -1. Install UCCL's p2p engine manually. You can refer to the [installation guide here](https://https://github.com/uccl-project/uccl). +1. Install UCCL's P2P engine manually. You can refer to the [installation guide here](https://https://github.com/uccl-project/uccl). ```cpp git clone https://github.com/uccl-project/uccl.git @@ -18,14 +18,14 @@ Currently, the UCCL backend supports internode communication over RDMA. Intranod sudo make install ``` -2. Build NIXL using regular method as in [README](https://github.com/ai-dynamo/nixl/blob/main/README.md) ensuring `disable_uccl_backend` is set to `false`. +2. Build NIXL using regular method as in [README](https://github.com/ai-dynamo/nixl/blob/main/README.md) ensuring `disable_uccl_p2p_backend` is set to `false`. ## Usage Guide -Example Usage to create a NIXL agent with uccl engine: +Example Usage to create a NIXL agent with UCCL P2P engine: ```python - config = nixl_agent_config(backends=["UCCL"]) + config = nixl_agent_config(backends=["UCCL_P2P"]) agent = nixl_agent("agent-name", config) ``` UCCL engine would auto discover the right NIC to be used for the GPU based on the PCIe distance during memory registration based on the data locality. @@ -38,7 +38,7 @@ Refer to [README](https://github.com/uccl-project/uccl/tree/main/collective/rdma ### Usage References -1) [NIXL Benchmark](https://github.com/uccl-project/uccl/blob/main/p2p/benchmarks/benchmark_nixl.py) in UCCL: Refer to this [README](https://github.com/uccl-project/uccl/tree/main/p2p) on how to run the script. +1) [NIXL Benchmark](https://github.com/uccl-project/uccl/blob/main/p2p/benchmarks/benchmark_nixl.py) in UCCL P2P: Refer to this [README](https://github.com/uccl-project/uccl/tree/main/p2p) on how to run the script. 2) [NIXL connector](https://github.com/praveingk/vllm/commit/fa67cd7edff076fee4914cc316a9833c2311a65d) in vLLM. vLLM's NIXL connector uses `NIXL_READ` operations, hence set env `UCCL_RCMODE` to 1. diff --git a/src/plugins/uccl/meson.build b/src/plugins/uccl_p2p/meson.build similarity index 72% rename from src/plugins/uccl/meson.build rename to src/plugins/uccl_p2p/meson.build index 9a6e4a7ee..2dac24056 100644 --- a/src/plugins/uccl/meson.build +++ b/src/plugins/uccl_p2p/meson.build @@ -14,23 +14,21 @@ # limitations under the License. cpp = meson.get_compiler('cpp') -uccl_lib = cpp.find_library('uccl_engine') - compile_flags = [] -if 'UCCL' in static_plugins - uccl_backend_lib = static_library('UCCL', - 'uccl_backend.cpp', 'uccl_backend.h', 'uccl_plugin.cpp', - dependencies: [nixl_infra, serdes_interface, cuda_dep, uccl_lib, nixl_common_dep], +if 'UCCL_P2P' in static_plugins + uccl_p2p_backend_lib = static_library('UCCL_P2P', + 'uccl_p2p_backend.cpp', 'uccl_p2p_backend.h', 'uccl_p2p_plugin.cpp', + dependencies: [nixl_infra, serdes_interface, cuda_dep, libuccl_p2p, nixl_common_dep], include_directories: nixl_inc_dirs, install: false, cpp_args : compile_flags, name_prefix: 'libplugin_') # Custom prefix for plugin libraries else - uccl_backend_lib = shared_library('UCCL', - 'uccl_backend.cpp', 'uccl_backend.h', 'uccl_plugin.cpp', - dependencies: [nixl_infra, serdes_interface, cuda_dep, uccl_lib, nixl_common_dep], + uccl_p2p_backend_lib = shared_library('UCCL_P2P', + 'uccl_p2p_backend.cpp', 'uccl_p2p_backend.h', 'uccl_p2p_plugin.cpp', + dependencies: [nixl_infra, serdes_interface, cuda_dep, libuccl_p2p, nixl_common_dep], include_directories: [nixl_inc_dirs, utils_inc_dirs], install: true, cpp_args : compile_flags + ['-fPIC'], @@ -39,10 +37,10 @@ else if get_option('buildtype') == 'debug' run_command('sh', '-c', - 'echo "UCCL=' + uccl_backend_lib.full_path() + '" >> ' + plugin_build_dir + '/pluginlist', + 'echo "UCCL_P2P=' + uccl_p2p_backend_lib.full_path() + '" >> ' + plugin_build_dir + '/pluginlist', check: true ) endif endif -uccl_backend_interface = declare_dependency(link_with: uccl_backend_lib) +uccl_backend_interface = declare_dependency(link_with: uccl_p2p_backend_lib) diff --git a/src/plugins/uccl/uccl_backend.cpp b/src/plugins/uccl_p2p/uccl_p2p_backend.cpp similarity index 99% rename from src/plugins/uccl/uccl_backend.cpp rename to src/plugins/uccl_p2p/uccl_p2p_backend.cpp index 97ab8e61e..3520e6618 100644 --- a/src/plugins/uccl/uccl_backend.cpp +++ b/src/plugins/uccl_p2p/uccl_p2p_backend.cpp @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "uccl_backend.h" +#include "uccl_p2p_backend.h" #include #include #include @@ -285,7 +285,7 @@ nixlUcclEngine::registerMem(const nixlBlobDesc &mem, out = priv; mem_reg_info_[mem.addr] = priv; NIXL_DEBUG << "Registering memory: " << mem.addr << "Device: " << mem.devId - << " ref_cnt: " << priv->ref_cnt << " mr_id: " << priv->mr_id; + << " ref_cnt: " << priv->ref_cnt << " mr_id: " << priv->mr_id; return NIXL_SUCCESS; } diff --git a/src/plugins/uccl/uccl_backend.h b/src/plugins/uccl_p2p/uccl_p2p_backend.h similarity index 100% rename from src/plugins/uccl/uccl_backend.h rename to src/plugins/uccl_p2p/uccl_p2p_backend.h diff --git a/src/plugins/uccl/uccl_plugin.cpp b/src/plugins/uccl_p2p/uccl_p2p_plugin.cpp similarity index 74% rename from src/plugins/uccl/uccl_plugin.cpp rename to src/plugins/uccl_p2p/uccl_p2p_plugin.cpp index dc7f7e3cf..14b8e9a9a 100644 --- a/src/plugins/uccl/uccl_plugin.cpp +++ b/src/plugins/uccl_p2p/uccl_p2p_plugin.cpp @@ -16,11 +16,11 @@ */ #include "backend/backend_plugin.h" -#include "uccl_backend.h" +#include "uccl_p2p_backend.h" namespace { nixl_b_params_t -get_uccl_options() { +get_uccl_p2p_options() { nixl_b_params_t params; params["in_python"] = ""; params["num_cpus"] = ""; @@ -29,19 +29,19 @@ get_uccl_options() { } // namespace // Plugin type alias for convenience -using uccl_plugin_t = nixlBackendPluginCreator; +using uccl_p2p_plugin_t = nixlBackendPluginCreator; #ifdef STATIC_PLUGIN_UCCL nixlBackendPlugin * createStaticUCCLPlugin() { - return uccl_plugin_t::create( - NIXL_PLUGIN_API_VERSION, "UCCL", "0.1.0", get_uccl_options(), {DRAM_SEG, VRAM_SEG}); + return uccl_p2p_plugin_t::create( + NIXL_PLUGIN_API_VERSION, "UCCL_P2P", "0.1.0", get_uccl_p2p_options(), {DRAM_SEG, VRAM_SEG}); } #else extern "C" NIXL_PLUGIN_EXPORT nixlBackendPlugin * nixl_plugin_init() { - return uccl_plugin_t::create( - NIXL_PLUGIN_API_VERSION, "UCCL", "0.1.0", get_uccl_options(), {DRAM_SEG, VRAM_SEG}); + return uccl_p2p_plugin_t::create( + NIXL_PLUGIN_API_VERSION, "UCCL_P2P", "0.1.0", get_uccl_p2p_options(), {DRAM_SEG, VRAM_SEG}); } extern "C" NIXL_PLUGIN_EXPORT void diff --git a/test/gtest/plugins/meson.build b/test/gtest/plugins/meson.build index 2ada71e3b..22dbb50ad 100644 --- a/test/gtest/plugins/meson.build +++ b/test/gtest/plugins/meson.build @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -subdir('uccl') +subdir('uccl_p2p') aws_s3 = dependency('aws-cpp-sdk-s3', static: false, required: false) if not aws_s3.found() diff --git a/test/gtest/plugins/uccl/meson.build b/test/gtest/plugins/uccl_p2p/meson.build similarity index 77% rename from test/gtest/plugins/uccl/meson.build rename to test/gtest/plugins/uccl_p2p/meson.build index 6f458aff7..b7320b32e 100644 --- a/test/gtest/plugins/uccl/meson.build +++ b/test/gtest/plugins/uccl_p2p/meson.build @@ -15,11 +15,11 @@ cpp = meson.get_compiler('cpp') -uccl_engine_lib = cpp.find_library('uccl_engine', dirs: ['/usr/local/lib'], required: false) +uccl_p2p_lib = cpp.find_library('uccl_p2p', dirs: ['/usr/local/lib'], required: false) uccl_rdma_lib = cpp.find_library('rdma_plugin', dirs: ['/usr/local/lib'], required: false) -if not uccl_engine_lib.found() or not uccl_rdma_lib.found() - message('UCCL libraries not found, skipping uccl_gtest build') +if not uccl_p2p_lib.found() or not uccl_rdma_lib.found() + message('UCCL P2P libraries not found, skipping uccl_p2p_gtest build') subdir_done() endif @@ -27,7 +27,7 @@ python_dep = dependency('python3', required: false) pybind11_dep = dependency('pybind11', required: false) if not python_dep.found() or not pybind11_dep.found() - message('Python dependencies not found, skipping uccl_gtest build') + message('Python dependencies not found, skipping uccl_p2p_gtest build') subdir_done() endif @@ -42,8 +42,8 @@ else cpp_flags += '-UHAVE_CUDA' endif -uccl_test_exe = executable('uccl_gtest', - sources : ['uccl_test.cpp', '../../main.cpp', '../../common.cpp'], +uccl_p2p_test_exe = executable('uccl_p2p_gtest', + sources : ['uccl_p2p_test.cpp', '../../main.cpp', '../../common.cpp'], include_directories: [nixl_inc_dirs, utils_inc_dirs, gtest_inc_dirs, '.'], cpp_args : cpp_flags, dependencies : [ @@ -54,7 +54,7 @@ uccl_test_exe = executable('uccl_gtest', python_dep, pybind11_dep, uccl_rdma_lib, - uccl_engine_lib, + uccl_p2p_lib, gtest_dep, gmock_dep, absl_strings_dep, @@ -64,4 +64,4 @@ uccl_test_exe = executable('uccl_gtest', install : true ) -test('uccl_gtest', uccl_test_exe) +test('uccl_p2p_gtest', uccl_p2p_test_exe) diff --git a/test/gtest/plugins/uccl/uccl_test.cpp b/test/gtest/plugins/uccl_p2p/uccl_p2p_test.cpp similarity index 98% rename from test/gtest/plugins/uccl/uccl_test.cpp rename to test/gtest/plugins/uccl_p2p/uccl_p2p_test.cpp index c3d8f3041..6db998bc4 100644 --- a/test/gtest/plugins/uccl/uccl_test.cpp +++ b/test/gtest/plugins/uccl_p2p/uccl_p2p_test.cpp @@ -25,7 +25,7 @@ namespace gtest { namespace nixl { - constexpr const char *uccl_backend_name = "Uccl"; + constexpr const char *uccl_backend_name = "UCCL_P2P"; static nixlBackendH * createUcclBackend(nixlAgent &agent) { @@ -33,7 +33,7 @@ namespace nixl { nixl_status_t status = agent.getAvailPlugins(plugins); EXPECT_EQ(status, NIXL_SUCCESS); auto it = std::find(plugins.begin(), plugins.end(), uccl_backend_name); - EXPECT_NE(it, plugins.end()) << "UCCL plugin not found"; + EXPECT_NE(it, plugins.end()) << "UCCL_P2P plugin not found"; nixl_b_params_t params; nixl_mem_list_t mems; @@ -158,7 +158,7 @@ class TestUcclBackend : public testing::Test { void TestUcclBackend::Agent::init(const std::string &name) { m_priv = std::make_unique(name, nixlAgentConfig(true)); - // Create UCCL backend for testing + // Create UCCL P2P backend for testing m_backend = nixl::createUcclBackend(*m_priv); m_mem.init(m_backend); m_mem.fillData(); @@ -248,8 +248,8 @@ TestUcclBackend::Agent::dataCmp(const TestUcclBackend::Agent &other) const { } TestUcclBackend::TestUcclBackend() { - m_backend_name = "Uccl"; - m_env.addVar("NIXL_PLUGIN_DIR", std::string(BUILD_DIR) + "/src/plugins/uccl"); + m_backend_name = "UCCL_P2P"; + m_env.addVar("NIXL_PLUGIN_DIR", std::string(BUILD_DIR) + "/src/plugins/uccl_p2p"); } template diff --git a/test/nixl/test_plugin.cpp b/test/nixl/test_plugin.cpp index 6d098de39..003270f87 100644 --- a/test/nixl/test_plugin.cpp +++ b/test/nixl/test_plugin.cpp @@ -75,7 +75,7 @@ int main(int argc, char** argv) { "GDS_MT", "LIBFABRIC", "GUSLI", - "UCCL"}; + "UCCL_P2P"}; if (argc > 1 && (std::string(argv[1]) == "-h" || std::string(argv[1]) == "--help")) { print_usage(argv[0]); From 835c6367f01c767542ccb03fed82cd9439492486 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Fri, 21 Nov 2025 12:13:35 +0530 Subject: [PATCH 15/15] Fix formatting Signed-off-by: Pravein Govindan Kannan --- src/plugins/uccl_p2p/uccl_p2p_backend.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/uccl_p2p/uccl_p2p_backend.cpp b/src/plugins/uccl_p2p/uccl_p2p_backend.cpp index 3520e6618..e7794bec1 100644 --- a/src/plugins/uccl_p2p/uccl_p2p_backend.cpp +++ b/src/plugins/uccl_p2p/uccl_p2p_backend.cpp @@ -285,7 +285,7 @@ nixlUcclEngine::registerMem(const nixlBlobDesc &mem, out = priv; mem_reg_info_[mem.addr] = priv; NIXL_DEBUG << "Registering memory: " << mem.addr << "Device: " << mem.devId - << " ref_cnt: " << priv->ref_cnt << " mr_id: " << priv->mr_id; + << " ref_cnt: " << priv->ref_cnt << " mr_id: " << priv->mr_id; return NIXL_SUCCESS; }