Skip to content
7 changes: 4 additions & 3 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -758,14 +758,15 @@
/// - "init": SHM subsystem internals will be initialized upon Session opening. This setting sacrifices
/// startup time, but guarantees no latency impact when first SHM buffer is processed.
mode: "lazy",
/// ROS setting: the section below controls SHM parameters used for large message passing on ROS
transport_optimization: {
/// Enables transport optimization for large messages (default `true`).
/// Implicitly puts large messages into shared memory for transports with SHM-compatible connection.
enabled: true,
/// SHM memory size in bytes used for transport optimization (default `16 * 1024 * 1024`).
pool_size: 16777216,
/// SHM memory size in bytes used for transport optimization (default `48 * 1024 * 1024`).
pool_size: 50331648,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we could add ROS setting: in the comment like this:

/// ROS setting: disable multicast discovery by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/// Allow optimization for messages equal or larger than this threshold in bytes (default `3072`).
message_size_threshold: 3072,
message_size_threshold: 512,
},
},
auth: {
Expand Down
7 changes: 4 additions & 3 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -765,14 +765,15 @@
/// - "init": SHM subsystem internals will be initialized upon Session opening. This setting sacrifices
/// startup time, but guarantees no latency impact when first SHM buffer is processed.
mode: "lazy",
/// ROS setting: the section below controls SHM parameters used for large message passing on ROS
transport_optimization: {
/// Enables transport optimization for large messages (default `true`).
/// Implicitly puts large messages into shared memory for transports with SHM-compatible connection.
enabled: true,
/// SHM memory size in bytes used for transport optimization (default `16 * 1024 * 1024`).
pool_size: 16777216,
/// SHM memory size in bytes used for transport optimization (default `48 * 1024 * 1024`).
pool_size: 50331648,
/// Allow optimization for messages equal or larger than this threshold in bytes (default `3072`).
message_size_threshold: 3072,
message_size_threshold: 512,
},
},
auth: {
Expand Down
41 changes: 35 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>

zenoh::ZResult result;

// Check if shm is enabled.
// Get shm configuration.
bool shm_enabled = false;
std::size_t msgsize_threshold = 512;
{
std::string shm_enabled_val = config.value().get(Z_CONFIG_SHARED_MEMORY_KEY, &result);
if (result == Z_OK) {
Expand All @@ -82,6 +83,36 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
"Not able to get %s from the config file",
Z_CONFIG_SHARED_MEMORY_KEY);
}

const auto threshold_key =
"transport/shared_memory/transport_optimization/message_size_threshold";
std::string shm_size_threshold_val = config.value().get(
threshold_key,
&result);
if (result == Z_OK) {
try {
msgsize_threshold = std::stoull(shm_size_threshold_val);
} catch (const std::invalid_argument & e) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Not able to get %s from the config file: invalid argument: %s",
threshold_key,
e.what()
);
} catch (const std::out_of_range & e) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Not able to get %s from the config file: out of range: %s",
threshold_key,
e.what()
);
}
} else {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Not able to get %s from the config file",
threshold_key);
}
}

// Initialize the zenoh session.
Expand Down Expand Up @@ -181,13 +212,11 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
// Initialize the shm subsystem if shared_memory is enabled in the config
if (shm_enabled) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp",
"SHM is enabled - allocated size: %d - msg size threshold: %d",
rmw_zenoh_cpp::zenoh_shm_alloc_size(),
rmw_zenoh_cpp::zenoh_shm_message_size_threshold());
"SHM is enabled - msg size threshold: %d",
msgsize_threshold);

shm_ = std::make_shared<rmw_zenoh_cpp::ShmContext>(
rmw_zenoh_cpp::zenoh_shm_alloc_size(),
rmw_zenoh_cpp::zenoh_shm_message_size_threshold()
msgsize_threshold
);
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
Expand Down
92 changes: 48 additions & 44 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@

namespace rmw_zenoh_cpp
{
// TODO(yuyuan): SHM, make this configurable
#define SHM_BUF_OK_SIZE 2621440

// Period (ms) of heartbeats sent for detection of lost samples
// by a RELIABLE + TRANSIENT_LOCAL Publisher
#define SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD 500
Expand Down Expand Up @@ -213,7 +210,7 @@ PublisherData::PublisherData(
///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
const std::shared_ptr<ShmContext> shm)
ShmContext * shm)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand Down Expand Up @@ -252,18 +249,24 @@ rmw_ret_t PublisherData::publish(

// Get memory from SHM buffer if available.
if (shm && max_data_length >= shm->msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto alloc_result = shm->shm_provider.alloc_gc_defrag(max_data_length);

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
msg_bytes = reinterpret_cast<uint8_t *>(buf.data());
shm_buf = std::make_optional(std::move(buf));
if (auto shm_provider = shm->get_shm_provider(*sess_)) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto alloc_result = shm_provider.value().shm_provider().alloc_gc_defrag(max_data_length);

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
msg_bytes = reinterpret_cast<uint8_t *>(buf.data());
shm_buf = std::make_optional(std::move(buf));
} else {
// Print a warning and revert to regular allocation
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM");
}
} else {
// Print a warning and revert to regular allocation
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM");
"rmw_zenoh_cpp", "SHM provider is not yet available, fallback to non-SHM");
}
}

Expand All @@ -283,7 +286,6 @@ rmw_ret_t PublisherData::publish(
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);


// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast<char *>(msg_bytes), max_data_length);

Expand Down Expand Up @@ -314,7 +316,7 @@ rmw_ret_t PublisherData::publish(
payload = zenoh::Bytes(std::move(*shm_buf));
} else if (pool_buf.has_value() && pool_buf.value().data) {
auto deleter = [buffer_pool = context_impl->serialization_buffer_pool(),
buffer = pool_buf](uint8_t *){
buffer = pool_buf](uint8_t *) {
buffer_pool->deallocate(buffer.value());
};
payload = zenoh::Bytes(msg_bytes, data_length, deleter);
Expand Down Expand Up @@ -347,7 +349,7 @@ rmw_ret_t PublisherData::publish(
///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
const std::shared_ptr<ShmContext> shm)
ShmContext * shm)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand All @@ -357,6 +359,9 @@ rmw_ret_t PublisherData::publish_serialized_message(
return RMW_RET_ERROR;
}

// Optional shared memory buffer
std::optional<zenoh::ZShmMut> shm_buf = std::nullopt;

std::lock_guard<std::mutex> lock(mutex_);

const size_t data_length = ser.get_serialized_data_length();
Expand All @@ -371,46 +376,45 @@ rmw_ret_t PublisherData::publish_serialized_message(

// Get memory from SHM buffer if available.
if (shm && data_length >= shm->msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto alloc_result = shm->shm_provider.alloc_gc_defrag(data_length);

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
auto msg_bytes = reinterpret_cast<char *>(buf.data());
memcpy(msg_bytes, serialized_message->buffer, data_length);
zenoh::Bytes payload(std::move(buf));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message,
source_timestamp);

pub_.put(std::move(payload), std::move(opts), &result);
if (auto shm_provider = shm->get_shm_provider(*sess_)) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto alloc_result = shm_provider.value().shm_provider().alloc_gc_defrag(data_length);

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
shm_buf = std::make_optional(std::move(buf));
} else {
// Print a warning and revert to regular allocation
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM");
}
} else {
// Print a warning and revert to regular allocation
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM");
"rmw_zenoh_cpp", "SHM provider is not yet available, fallback to non-SHM");
}
}

// TODO(yellowhatter): split the whole publish method onto shm and non-shm versions
std::vector<uint8_t> raw_image(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(raw_image);
if (shm_buf.has_value()) {
auto msg_bytes = reinterpret_cast<char *>(shm_buf.value().data());
memcpy(msg_bytes, serialized_message->buffer, data_length);
zenoh::Bytes payload(std::move(*shm_buf));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message,
source_timestamp);
TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message,
source_timestamp);

pub_.put(std::move(payload), std::move(opts), &result);
}
pub_.put(std::move(payload), std::move(opts), &result);
} else {
std::vector<uint8_t> raw_image(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(raw_image);
zenoh::Bytes payload(std::move(raw_image));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message,
source_timestamp);

pub_.put(std::move(payload), std::move(opts), &result);
}
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ class PublisherData final

// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
const std::shared_ptr<ShmContext> shm
const void *ros_message,
ShmContext *shm
);

// Publish a serialized ROS message.
rmw_ret_t publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
const std::shared_ptr<ShmContext> shm
const rmw_serialized_message_t *serialized_message,
ShmContext *shm
);

// Get a copy of the gid_hash of this PublisherData's liveliness::Entity.
Expand Down
66 changes: 0 additions & 66 deletions rmw_zenoh_cpp/src/detail/zenoh_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ static const std::unordered_map<ConfigurableEntity,
};

static const char * router_check_attempts_envar = "ZENOH_ROUTER_CHECK_ATTEMPTS";
static const char * zenoh_shm_alloc_size_envar = "ZENOH_SHM_ALLOC_SIZE";
static const size_t zenoh_shm_alloc_size_default = 48 * 1024 * 1024;
static const char * zenoh_shm_message_size_threshold_envar = "ZENOH_SHM_MESSAGE_SIZE_THRESHOLD";
static const size_t zenoh_shm_message_size_threshold_default = 512;
/// Allow users to override the configuration using key-value pairs.
/// The supporting syntax is "key1=value2;key2=value2;...". For instance,
/// ZENOH_CONFIG_OVERRIDE='listen/endpoints=["tcp/127.0.0.1:7448"];scouting/multicast/enabled=true'
Expand Down Expand Up @@ -158,66 +154,4 @@ std::optional<uint64_t> zenoh_router_check_attempts()
// If unset, use the default.
return default_value;
}

///=============================================================================
size_t zenoh_shm_alloc_size()
{
const char * envar_value;

if (NULL != rcutils_get_env(zenoh_shm_alloc_size_envar, &envar_value)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.",
zenoh_shm_alloc_size_envar);
return zenoh_shm_alloc_size_default;
}

// If the environment variable contains a value, handle it accordingly.
if (envar_value[0] != '\0') {
const auto read_value = std::strtoull(envar_value, nullptr, 10);
if (read_value > 0) {
if (read_value > std::numeric_limits<size_t>::max()) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Envar %s: value is too large!",
zenoh_shm_alloc_size_envar);
} else {
return read_value;
}
}
}

return zenoh_shm_alloc_size_default;
}
///=============================================================================
size_t zenoh_shm_message_size_threshold()
{
const char * envar_value;

if (NULL != rcutils_get_env(zenoh_shm_message_size_threshold_envar, &envar_value)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.",
zenoh_shm_message_size_threshold_envar);
return zenoh_shm_message_size_threshold_default;
}

// If the environment variable contains a value, handle it accordingly.
if (envar_value[0] != '\0') {
const auto read_value = std::strtoull(envar_value, nullptr, 10);
if (read_value > 0) {
if (read_value > std::numeric_limits<size_t>::max()) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Envar %s: value is too large!",
zenoh_shm_message_size_threshold_envar);
} else if ((read_value & (read_value - 1)) != 0) { // power of 2 check
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Envar %s: value must be power of 2!",
zenoh_shm_message_size_threshold_envar);

} else {
return read_value;
}
}
}

return zenoh_shm_message_size_threshold_default;
}
} // namespace rmw_zenoh_cpp
19 changes: 0 additions & 19 deletions rmw_zenoh_cpp/src/detail/zenoh_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,6 @@ std::optional<zenoh::Config> get_z_config(const ConfigurableEntity & entity);
/// @return The number of times to try connecting to a zenoh router and
/// std::nullopt if establishing a connection to a router is not required.
std::optional<uint64_t> zenoh_router_check_attempts();

///=============================================================================
/// Get the amount of shared memory to be pre-allocated for Zenoh SHM operation
/// based on the environment variable ZENOH_SHM_ALLOC_SIZE.
/// @details The behavior is as follows:
/// - If not set or <= 0, the default value of 16MB is returned.
/// - Else value of environemnt variable is returned.
/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation
size_t zenoh_shm_alloc_size();

///=============================================================================
/// Message size threshold for implicit SHM optimization based on the environment
/// variable ZENOH_SHM_MESSAGE_SIZE_THRESHOLD.
/// Messages smaller than this threshold will not be forwarded through Zenoh SHM
/// @details The behavior is as follows:
/// - If not set or <= 0, the default value of 512B is returned.
/// - Else value of environemnt variable is returned.
/// @return The amount of shared memory to be pre-allocated for Zenoh SHM operation
size_t zenoh_shm_message_size_threshold();
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ZENOH_CONFIG_HPP_
Loading
Loading