diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 522f2554..ac40977a 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -758,14 +758,15 @@ /// - "init": SHM subsystem internals will be initialized upon Session opening. This setting sacrifices /// startup time, but guarantees no latency impact when first SHM buffer is processed. mode: "lazy", + /// ROS setting: the section below controls SHM parameters used for large message passing on ROS transport_optimization: { /// Enables transport optimization for large messages (default `true`). /// Implicitly puts large messages into shared memory for transports with SHM-compatible connection. enabled: true, - /// SHM memory size in bytes used for transport optimization (default `16 * 1024 * 1024`). - pool_size: 16777216, + /// SHM memory size in bytes used for transport optimization (default `48 * 1024 * 1024`). + pool_size: 50331648, /// Allow optimization for messages equal or larger than this threshold in bytes (default `3072`). - message_size_threshold: 3072, + message_size_threshold: 512, }, }, auth: { diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index 5df19b0d..7f37cbdd 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -765,14 +765,15 @@ /// - "init": SHM subsystem internals will be initialized upon Session opening. This setting sacrifices /// startup time, but guarantees no latency impact when first SHM buffer is processed. mode: "lazy", + /// ROS setting: the section below controls SHM parameters used for large message passing on ROS transport_optimization: { /// Enables transport optimization for large messages (default `true`). /// Implicitly puts large messages into shared memory for transports with SHM-compatible connection. enabled: true, - /// SHM memory size in bytes used for transport optimization (default `16 * 1024 * 1024`). - pool_size: 16777216, + /// SHM memory size in bytes used for transport optimization (default `48 * 1024 * 1024`). + pool_size: 50331648, /// Allow optimization for messages equal or larger than this threshold in bytes (default `3072`). - message_size_threshold: 3072, + message_size_threshold: 512, }, }, auth: { diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index efc07e4a..4b8960d0 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -70,8 +70,9 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this zenoh::ZResult result; - // Check if shm is enabled. + // Get shm configuration. bool shm_enabled = false; + std::size_t msgsize_threshold = 512; { std::string shm_enabled_val = config.value().get(Z_CONFIG_SHARED_MEMORY_KEY, &result); if (result == Z_OK) { @@ -82,6 +83,36 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this "Not able to get %s from the config file", Z_CONFIG_SHARED_MEMORY_KEY); } + + const auto threshold_key = + "transport/shared_memory/transport_optimization/message_size_threshold"; + std::string shm_size_threshold_val = config.value().get( + threshold_key, + &result); + if (result == Z_OK) { + try { + msgsize_threshold = std::stoull(shm_size_threshold_val); + } catch (const std::invalid_argument & e) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Not able to get %s from the config file: invalid argument: %s", + threshold_key, + e.what() + ); + } catch (const std::out_of_range & e) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Not able to get %s from the config file: out of range: %s", + threshold_key, + e.what() + ); + } + } else { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Not able to get %s from the config file", + threshold_key); + } } // Initialize the zenoh session. @@ -181,13 +212,11 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this // Initialize the shm subsystem if shared_memory is enabled in the config if (shm_enabled) { RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", - "SHM is enabled - allocated size: %d - msg size threshold: %d", - rmw_zenoh_cpp::zenoh_shm_alloc_size(), - rmw_zenoh_cpp::zenoh_shm_message_size_threshold()); + "SHM is enabled - msg size threshold: %d", + msgsize_threshold); shm_ = std::make_shared( - rmw_zenoh_cpp::zenoh_shm_alloc_size(), - rmw_zenoh_cpp::zenoh_shm_message_size_threshold() + msgsize_threshold ); } else { RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 953e6e3d..a7262bbd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -44,9 +44,6 @@ namespace rmw_zenoh_cpp { -// TODO(yuyuan): SHM, make this configurable -#define SHM_BUF_OK_SIZE 2621440 - // Period (ms) of heartbeats sent for detection of lost samples // by a RELIABLE + TRANSIENT_LOCAL Publisher #define SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD 500 @@ -213,7 +210,7 @@ PublisherData::PublisherData( ///============================================================================= rmw_ret_t PublisherData::publish( const void * ros_message, - const std::shared_ptr shm) + ShmContext * shm) { std::lock_guard lock(mutex_); if (is_shutdown_) { @@ -252,18 +249,24 @@ rmw_ret_t PublisherData::publish( // Get memory from SHM buffer if available. if (shm && max_data_length >= shm->msgsize_threshold) { - RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); - - auto alloc_result = shm->shm_provider.alloc_gc_defrag(max_data_length); - - if (std::holds_alternative(alloc_result)) { - auto && buf = std::get(std::move(alloc_result)); - msg_bytes = reinterpret_cast(buf.data()); - shm_buf = std::make_optional(std::move(buf)); + if (auto shm_provider = shm->get_shm_provider(*sess_)) { + RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); + + auto alloc_result = shm_provider.value().shm_provider().alloc_gc_defrag(max_data_length); + + if (std::holds_alternative(alloc_result)) { + auto && buf = std::get(std::move(alloc_result)); + msg_bytes = reinterpret_cast(buf.data()); + shm_buf = std::make_optional(std::move(buf)); + } else { + // Print a warning and revert to regular allocation + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM"); + } } else { // Print a warning and revert to regular allocation RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM"); + "rmw_zenoh_cpp", "SHM provider is not yet available, fallback to non-SHM"); } } @@ -283,7 +286,6 @@ rmw_ret_t PublisherData::publish( RMW_CHECK_FOR_NULL_WITH_MSG( msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); @@ -314,7 +316,7 @@ rmw_ret_t PublisherData::publish( payload = zenoh::Bytes(std::move(*shm_buf)); } else if (pool_buf.has_value() && pool_buf.value().data) { auto deleter = [buffer_pool = context_impl->serialization_buffer_pool(), - buffer = pool_buf](uint8_t *){ + buffer = pool_buf](uint8_t *) { buffer_pool->deallocate(buffer.value()); }; payload = zenoh::Bytes(msg_bytes, data_length, deleter); @@ -347,7 +349,7 @@ rmw_ret_t PublisherData::publish( ///============================================================================= rmw_ret_t PublisherData::publish_serialized_message( const rmw_serialized_message_t * serialized_message, - const std::shared_ptr shm) + ShmContext * shm) { eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); @@ -357,6 +359,9 @@ rmw_ret_t PublisherData::publish_serialized_message( return RMW_RET_ERROR; } + // Optional shared memory buffer + std::optional shm_buf = std::nullopt; + std::lock_guard lock(mutex_); const size_t data_length = ser.get_serialized_data_length(); @@ -371,46 +376,45 @@ rmw_ret_t PublisherData::publish_serialized_message( // Get memory from SHM buffer if available. if (shm && data_length >= shm->msgsize_threshold) { - RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); - - auto alloc_result = shm->shm_provider.alloc_gc_defrag(data_length); - - if (std::holds_alternative(alloc_result)) { - auto && buf = std::get(std::move(alloc_result)); - auto msg_bytes = reinterpret_cast(buf.data()); - memcpy(msg_bytes, serialized_message->buffer, data_length); - zenoh::Bytes payload(std::move(buf)); - - TRACETOOLS_TRACEPOINT( - rmw_publish, static_cast(rmw_publisher_), serialized_message, - source_timestamp); - - pub_.put(std::move(payload), std::move(opts), &result); + if (auto shm_provider = shm->get_shm_provider(*sess_)) { + RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); + + auto alloc_result = shm_provider.value().shm_provider().alloc_gc_defrag(data_length); + + if (std::holds_alternative(alloc_result)) { + auto && buf = std::get(std::move(alloc_result)); + shm_buf = std::make_optional(std::move(buf)); + } else { + // Print a warning and revert to regular allocation + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM"); + } } else { // Print a warning and revert to regular allocation RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM"); + "rmw_zenoh_cpp", "SHM provider is not yet available, fallback to non-SHM"); + } + } - // TODO(yellowhatter): split the whole publish method onto shm and non-shm versions - std::vector raw_image( - serialized_message->buffer, - serialized_message->buffer + data_length); - zenoh::Bytes payload(raw_image); + if (shm_buf.has_value()) { + auto msg_bytes = reinterpret_cast(shm_buf.value().data()); + memcpy(msg_bytes, serialized_message->buffer, data_length); + zenoh::Bytes payload(std::move(*shm_buf)); - TRACETOOLS_TRACEPOINT( - rmw_publish, static_cast(rmw_publisher_), serialized_message, - source_timestamp); + TRACETOOLS_TRACEPOINT( + rmw_publish, static_cast(rmw_publisher_), serialized_message, + source_timestamp); - pub_.put(std::move(payload), std::move(opts), &result); - } + pub_.put(std::move(payload), std::move(opts), &result); } else { std::vector raw_image( serialized_message->buffer, serialized_message->buffer + data_length); - zenoh::Bytes payload(raw_image); + zenoh::Bytes payload(std::move(raw_image)); TRACETOOLS_TRACEPOINT( - rmw_publish, static_cast(rmw_publisher_), serialized_message, source_timestamp); + rmw_publish, static_cast(rmw_publisher_), serialized_message, + source_timestamp); pub_.put(std::move(payload), std::move(opts), &result); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index c9968e52..5dfe0098 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -56,14 +56,14 @@ class PublisherData final // Publish a ROS message. rmw_ret_t publish( - const void * ros_message, - const std::shared_ptr shm + const void *ros_message, + ShmContext *shm ); // Publish a serialized ROS message. rmw_ret_t publish_serialized_message( - const rmw_serialized_message_t * serialized_message, - const std::shared_ptr shm + const rmw_serialized_message_t *serialized_message, + ShmContext *shm ); // Get a copy of the gid_hash of this PublisherData's liveliness::Entity. diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index dd161694..d179a9ba 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -44,10 +44,6 @@ static const std::unordered_map zenoh_router_check_attempts() // If unset, use the default. return default_value; } - -///============================================================================= -size_t zenoh_shm_alloc_size() -{ - const char * envar_value; - - if (NULL != rcutils_get_env(zenoh_shm_alloc_size_envar, &envar_value)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.", - zenoh_shm_alloc_size_envar); - return zenoh_shm_alloc_size_default; - } - - // If the environment variable contains a value, handle it accordingly. - if (envar_value[0] != '\0') { - const auto read_value = std::strtoull(envar_value, nullptr, 10); - if (read_value > 0) { - if (read_value > std::numeric_limits::max()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s: value is too large!", - zenoh_shm_alloc_size_envar); - } else { - return read_value; - } - } - } - - return zenoh_shm_alloc_size_default; -} -///============================================================================= -size_t zenoh_shm_message_size_threshold() -{ - const char * envar_value; - - if (NULL != rcutils_get_env(zenoh_shm_message_size_threshold_envar, &envar_value)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.", - zenoh_shm_message_size_threshold_envar); - return zenoh_shm_message_size_threshold_default; - } - - // If the environment variable contains a value, handle it accordingly. - if (envar_value[0] != '\0') { - const auto read_value = std::strtoull(envar_value, nullptr, 10); - if (read_value > 0) { - if (read_value > std::numeric_limits::max()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s: value is too large!", - zenoh_shm_message_size_threshold_envar); - } else if ((read_value & (read_value - 1)) != 0) { // power of 2 check - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Envar %s: value must be power of 2!", - zenoh_shm_message_size_threshold_envar); - - } else { - return read_value; - } - } - } - - return zenoh_shm_message_size_threshold_default; -} } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp index 662d323f..958ce1e7 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp @@ -61,25 +61,6 @@ std::optional get_z_config(const ConfigurableEntity & entity); /// @return The number of times to try connecting to a zenoh router and /// std::nullopt if establishing a connection to a router is not required. std::optional zenoh_router_check_attempts(); - -///============================================================================= -/// Get the amount of shared memory to be pre-allocated for Zenoh SHM operation -/// based on the environment variable ZENOH_SHM_ALLOC_SIZE. -/// @details The behavior is as follows: -/// - If not set or <= 0, the default value of 16MB is returned. -/// - Else value of environemnt variable is returned. -/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation -size_t zenoh_shm_alloc_size(); - -///============================================================================= -/// Message size threshold for implicit SHM optimization based on the environment -/// variable ZENOH_SHM_MESSAGE_SIZE_THRESHOLD. -/// Messages smaller than this threshold will not be forwarded through Zenoh SHM -/// @details The behavior is as follows: -/// - If not set or <= 0, the default value of 512B is returned. -/// - Else value of environemnt variable is returned. -/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation -size_t zenoh_shm_message_size_threshold(); } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_CONFIG_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index b7991d9f..c54b8a85 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -129,11 +129,20 @@ bool Payload::empty() const // Create Layout for provider's memory // Provider's alignment is 1 (=2^0) bytes as we are going to make only 1-byte aligned allocations // TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment -ShmContext::ShmContext(size_t alloc_size, size_t msgsize_threshold) -: shm_provider(zenoh::PosixShmProvider(alloc_size)), - msgsize_threshold(msgsize_threshold) +ShmContext::ShmContext(size_t msgsize_threshold) +: msgsize_threshold(msgsize_threshold) {} +std::optional ShmContext::get_shm_provider(zenoh::Session & session) +{ + auto maybe_provider = session.obtain_shm_provider(); + if (std::holds_alternative(maybe_provider)) { + return std::get(std::move(maybe_provider)); + } + return std::nullopt; +} + + ///============================================================================= BufferPool::BufferPool() : buffers_(), mutex_() diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 0127b61b..216421fb 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -104,10 +104,11 @@ class Payload ///============================================================================= struct ShmContext { - zenoh::PosixShmProvider shm_provider; size_t msgsize_threshold; - ShmContext(size_t alloc_size, size_t msgsize_threshold); + explicit ShmContext(size_t msgsize_threshold); + + std::optional get_shm_provider(zenoh::Session & session); }; ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index a7cc4c51..f9bdef43 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -619,7 +619,7 @@ rmw_publish( return pub_data->publish( ros_message, - context_impl->shm() + context_impl->shm().get() ); } @@ -727,7 +727,7 @@ rmw_publish_serialized_message( return publisher_data->publish_serialized_message( serialized_message, - context_impl->shm() + context_impl->shm().get() ); } diff --git a/zenoh_cpp_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt index 5ce9aa30..c8006d21 100644 --- a/zenoh_cpp_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -44,7 +44,7 @@ set(ZENOHC_CARGO_FLAGS "--features=shared-memory zenoh/transport_serial") # - https://github.com/eclipse-zenoh/zenoh/pull/2175 ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION f376456ccf75ed837a21a186bdf5191cba50eb3b + VCS_VERSION 8a163bf0a7e3819faa37d78f690e07603eb0388e CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" @@ -56,7 +56,7 @@ ament_export_dependencies(zenohc) ament_vendor(zenoh_cpp_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp - VCS_VERSION 0cd54f291039a65b96921a5951a66aeef088e67c + VCS_VERSION ab437a6fd25629ab9a2029c3e79b2880deeef262 CMAKE_ARGS -DZENOHCXX_ZENOHC=OFF )