diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 3abd999f3d..c1e49dc3a1 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -110,6 +110,16 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/waitable.cpp ) +if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/posix_thread.cpp + ) +elseif(${CMAKE_SYSTEM_NAME} MATCHES "Windows") + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/windows_thread.cpp + ) +endif() + find_package(Python3 REQUIRED COMPONENTS Interpreter) # "watch" template for changes diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 119013ebfb..f1b92b5fa7 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,7 @@ #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/threads.hpp" namespace rclcpp { @@ -79,12 +81,23 @@ class MultiThreadedExecutor : public rclcpp::Executor run(size_t this_thread_number); private: + void run_on_rclcpp_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + std::vector & threads, + size_t thread_id); + + void run_on_this_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + size_t thread_id); + RCLCPP_DISABLE_COPY(MultiThreadedExecutor) std::mutex wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; + rcl_thread_attrs_t * thread_attributes_; + bool use_thread_attrs_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/threads.hpp b/rclcpp/include/rclcpp/threads.hpp new file mode 100644 index 0000000000..ac628de3e1 --- /dev/null +++ b/rclcpp/include/rclcpp/threads.hpp @@ -0,0 +1,26 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS_HPP_ +#define RCLCPP__THREADS_HPP_ + +#if defined(__linux__) +#include "rclcpp/threads/posix/thread.hpp" +#elif defined(_WIN32) +#include "rclcpp/threads/win32/thread.hpp" +#else +#include "rclcpp/threads/std/thread.hpp" +#endif + +#endif // RCLCPP__THREADS_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp new file mode 100644 index 0000000000..45bc2b090e --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp @@ -0,0 +1,119 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ +#define RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ + +#include +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +namespace detail +{ + +struct CpuSet +{ + using NativeCpuSetType = cpu_set_t; + CpuSet() + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + cpu_size_ = CPU_ALLOC_SIZE(processor_num); + cpu_set_ = CPU_ALLOC(cpu_size_); + CPU_ZERO_S(cpu_size_, cpu_set_); + } + explicit CpuSet(int num_cpu) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + cpu_size_ = CPU_ALLOC_SIZE(processor_num); + cpu_set_ = CPU_ALLOC(cpu_size_); + CPU_ZERO_S(cpu_size_, cpu_set_); + CPU_SET_S(num_cpu, cpu_size_, cpu_set_); + } + CpuSet(const CpuSet & cpuset) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + cpu_size_ = cpuset.get_cpu_size(); + cpu_set_ = CPU_ALLOC(cpu_size_); + CPU_ZERO_S(cpu_size_, cpu_set_); + for (int i = 0; i < processor_num; i++) { + if (CPU_ISSET_S(i, cpu_size_, cpuset.native_cpu_set())) { + CPU_SET_S(i, cpu_size_, cpu_set_); + } + } + } + CpuSet & operator=(CpuSet const & cpuset) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + cpu_size_ = cpuset.get_cpu_size(); + cpu_set_ = CPU_ALLOC(cpu_size_); + CPU_ZERO_S(cpu_size_, cpu_set_); + for (int i = 0; i < processor_num; i++) { + if (CPU_ISSET_S(i, cpu_size_, cpuset.native_cpu_set())) { + CPU_SET_S(i, cpu_size_, cpu_set_); + } + } + return *this; + } + CpuSet(CpuSet &&) = delete; + CpuSet & operator=(CpuSet &&) = delete; + ~CpuSet() + { + CPU_FREE(cpu_set_); + cpu_size_ = 0; + } + void set(int cpu) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + if (0 > cpu || processor_num <= cpu) { + auto ec = std::make_error_code(std::errc::invalid_argument); + throw std::system_error{ec, "cpu number is invaild"}; + } + CPU_SET_S(cpu, cpu_size_, cpu_set_); + } + void unset(int cpu) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + if (0 > cpu || processor_num <= cpu) { + auto ec = std::make_error_code(std::errc::invalid_argument); + throw std::system_error{ec, "cpu number is invaild"}; + } + CPU_CLR_S(cpu, cpu_size_, cpu_set_); + } + void clear() + { + CPU_ZERO_S(cpu_size_, cpu_set_); + } + bool is_set(int cpu) + { + return CPU_ISSET_S(cpu, cpu_size_, cpu_set_); + } + size_t get_cpu_size() const + { + return cpu_size_; + } + NativeCpuSetType * native_cpu_set() const {return cpu_set_;} + +private: + NativeCpuSetType * cpu_set_; + size_t cpu_size_; +}; +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread.hpp b/rclcpp/include/rclcpp/threads/posix/thread.hpp new file mode 100644 index 0000000000..fc786859d0 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread.hpp @@ -0,0 +1,141 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/threads/posix/thread_attribute.hpp" +#include "rclcpp/threads/posix/thread_func.hpp" +#include "rclcpp/threads/posix/thread_id.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread +{ + using NativeHandleType = pthread_t; + using Attribute = detail::ThreadAttribute; + using Id = detail::ThreadId; + + // Assume pthread_t is an invalid handle if it's 0 + Thread() noexcept + : handle_{} {} + Thread(Thread && other) + : handle_(other.handle_) {other.handle_ = NativeHandleType{};} + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : Thread(static_cast(nullptr), + make_thread_func(nullptr, std::forward(f), std::forward(args)...)) + {} + template + Thread(Attribute & attr, F && f, Args && ... args) + : Thread(&attr, make_thread_func(&attr, std::forward(f), std::forward(args)...)) + {} + Thread(Thread const &) = delete; + ~Thread() + { + if (handle_) {std::terminate();} + } + + Thread & operator=(Thread && other) noexcept + { + if (handle_) {std::terminate();} + swap(other); + return *this; + } + + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(handle_, other.handle_); + } + + void join() + { + void * p; + int r = pthread_join(handle_, &p); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in pthread_join ");} + handle_ = NativeHandleType{}; + } + + bool joinable() const noexcept + { + return 0 == pthread_equal(handle_, NativeHandleType{}); + } + + void detach() + { + int r = pthread_detach(handle_); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in detach ");} + handle_ = NativeHandleType{}; + } + + NativeHandleType native_handle() const + { + return handle_; + } + + Id get_id() const noexcept + { + return Id{handle_}; + } + + static int hardware_concurrency() noexcept + { + return static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + } + +private: + using ThreadFuncUniquePtr = detail::ThreadFuncUniquePtr; + + Thread(Attribute * attr, ThreadFuncUniquePtr func); + + template + static ThreadFuncUniquePtr make_thread_func(Attribute * attr, F && f, Args && ... args) + { + using detail::ThreadFunc; + using detail::ThreadFuncImplementation; + ThreadFunc * func = new ThreadFuncImplementation, std::decay_t...>( + attr, std::forward(f), std::forward(args)... + ); + + return ThreadFuncUniquePtr{func}; + } + + NativeHandleType handle_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp new file mode 100644 index 0000000000..bf812577e2 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp @@ -0,0 +1,200 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ + +#include + +#include "rcl_yaml_param_parser/types.h" +#include "rclcpp/visibility_control.hpp" + +#ifdef __linux__ +#include "rclcpp/threads/posix/linux/cpu_set.hpp" +#endif + +namespace rclcpp +{ + +namespace detail +{ + +struct ThreadAttribute +{ + ThreadAttribute() + { + NativeAttributeType attr_; + + int r = pthread_attr_init(&attr_); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in pthread_attr_init ");} + + size_t cpu_size_ = CPU_ALLOC_SIZE(static_cast(sysconf(_SC_NPROCESSORS_ONLN))); + r = pthread_attr_getaffinity_np(&attr_, cpu_size_, cpu_set_.native_cpu_set()); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getaffinity_np "); + } + + r = pthread_attr_getschedpolicy(&attr_, &sched_policy_); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getschedpolicy "); + } + + r = pthread_attr_getstacksize(&attr_, &stack_size_); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getstacksize "); + } + + sched_param param; + r = pthread_attr_getschedparam(&attr_, ¶m); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getschedparam "); + } + priority_ = param.sched_priority; + + int flag; + r = pthread_attr_getdetachstate(&attr_, &flag); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getdetachstate "); + } + detached_flag_ = (flag == PTHREAD_CREATE_DETACHED); + pthread_attr_destroy(&attr_); + } + ~ThreadAttribute() + { + } + + ThreadAttribute(ThreadAttribute const &) = delete; + ThreadAttribute(ThreadAttribute &&) = delete; + ThreadAttribute & operator=(ThreadAttribute const &) = delete; + ThreadAttribute & operator=(ThreadAttribute &&) = delete; + + using NativeAttributeType = pthread_attr_t; + + ThreadAttribute & set_affinity(CpuSet & rclcpp_cs) + { + cpu_set_ = rclcpp_cs; + return *this; + } + CpuSet get_affinity() + { + return cpu_set_; + } + + ThreadAttribute & set_sched_policy(rcl_thread_scheduling_policy_type_t sp) + { + sched_policy_ = rcl_scheduling_policy_to_sched_policy(sp); + return *this; + } + int get_sched_policy() const + { + return sched_policy_; + } + + ThreadAttribute & set_stack_size(std::size_t sz) + { + stack_size_ = sz; + return *this; + } + std::size_t get_stack_size() const + { + return stack_size_; + } + + ThreadAttribute & set_priority(int prio) + { + priority_ = prio; + return *this; + } + int get_priority() const + { + return priority_; + } + + ThreadAttribute & set_run_as_detached(bool detach) + { + detached_flag_ = detach; + return *this; + } + + bool get_run_as_detached() const + { + return detached_flag_; + } + + void + set_thread_attributes( + rcl_thread_attr_t attributes) + { + rclcpp::detail::CpuSet cpu_set; + cpu_set.clear(); + cpu_set.set(attributes.core_affinity); + set_affinity(cpu_set); + set_sched_policy(attributes.scheduling_policy); + set_priority(attributes.priority); + } + +private: + CpuSet cpu_set_; + int sched_policy_; + std::size_t stack_size_; + int priority_; + bool detached_flag_; + + int rcl_scheduling_policy_to_sched_policy( + rcl_thread_scheduling_policy_type_t sched_policy) + { + switch (sched_policy) { +#ifdef SCHED_FIFO + case RCL_THREAD_SCHEDULING_POLICY_FIFO: + return SCHED_FIFO; +#endif +#ifdef SCHED_RR + case RCL_THREAD_SCHEDULING_POLICY_RR: + return SCHED_RR; +#endif +#ifdef SCHED_OTHER + case RCL_THREAD_SCHEDULING_POLICY_OTHER: + return SCHED_OTHER; +#endif +#ifdef SCHED_IDLE + case RCL_THREAD_SCHEDULING_POLICY_IDLE: + return SCHED_IDLE; +#endif +#ifdef SCHED_BATCH + case RCL_THREAD_SCHEDULING_POLICY_BATCH: + return SCHED_BATCH; +#endif +#ifdef SCHED_SPORADIC + case RCL_THREAD_SCHEDULING_POLICY_SPORADIC: + return SCHED_SPORADIC; +#endif + /* Todo: Necessity and setting method need to be considered + #ifdef SCHED_DEADLINE + case RCL_THREAD_SCHEDULING_POLICY_DEADLINE: + return SCHED_DEADLINE; + break; + #endif + */ + default: + throw std::runtime_error("Invalid scheduling policy"); + } + return -1; + } +}; + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_func.hpp b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp new file mode 100644 index 0000000000..5620c48532 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp @@ -0,0 +1,102 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ + +#include +#include +#include +#include +#include + +#include "rclcpp/threads/posix/thread_attribute.hpp" + +namespace rclcpp::detail +{ + +struct ThreadFunc +{ + virtual ~ThreadFunc() = default; + virtual void run() = 0; +}; + +template +struct ThreadFuncImplementation : ThreadFunc +{ + template + ThreadFuncImplementation(rclcpp::detail::ThreadAttribute * attr, G && g, As && ... as) + : func_(std::forward(g)), args_(std::forward(as)...) + { + if (nullptr != attr) { + sched_policy_ = attr->get_sched_policy(); + priority_ = attr->get_priority(); + } else { + sched_policy_ = -1; + priority_ = -1; + } + } + +private: + static constexpr bool is_memfun_ = std::is_member_function_pointer_v; + + void run() override + { + if (sched_policy_ != -1 && sched_policy_ != SCHED_FIFO && sched_policy_ != SCHED_RR) { + sched_param param; + param.sched_priority = priority_; + int r = pthread_setschedparam( + pthread_self(), + sched_policy_, + ¶m); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_setschedparam "); + } + } + run_impl(std::index_sequence_for{}); + } + + template> + void run_impl(std::index_sequence<0, Is...>) + { + (arg<0>().*std::move(func_))(arg()...); + } + template, + typename = void> + void run_impl(std::index_sequence) + { + std::move(func_)(arg()...); + } + + template + auto && arg() + { + return std::move(std::get(args_)); + } + + F func_; + std::tuple args_; + int sched_policy_; + int priority_; +}; + +using ThreadFuncUniquePtr = std::unique_ptr; + +} // namespace rclcpp::detail + +#endif // RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_id.hpp b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp new file mode 100644 index 0000000000..4333930903 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp @@ -0,0 +1,141 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ + +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread; + +namespace detail +{ + +namespace thread_id_ns +{ + +struct ThreadId; + +inline ThreadId get_id() noexcept; +inline bool operator==(ThreadId id1, ThreadId id2); +inline bool operator!=(ThreadId id1, ThreadId id2); +inline bool operator<(ThreadId id1, ThreadId id2); +inline bool operator>(ThreadId id1, ThreadId id2); +inline bool operator<=(ThreadId id1, ThreadId id2); +inline bool operator>=(ThreadId id1, ThreadId id2); +template +inline std::basic_ostream & operator<<( + std::basic_ostream &, + ThreadId); + +struct ThreadId +{ + ThreadId() = default; + ThreadId(ThreadId const &) = default; + ThreadId(ThreadId &&) = default; + ThreadId & operator=(ThreadId const &) = default; + ThreadId & operator=(ThreadId &&) = default; + + friend bool operator==(ThreadId id1, ThreadId id2) + { + return pthread_equal(id1.h, id2.h); + } + friend bool operator<(ThreadId id1, ThreadId id2) + { + return id1.h < id2.h; + } + template + friend std::basic_ostream & operator<<( + std::basic_ostream & ost, + ThreadId id) + { + return ost << id.h; + } + +private: + friend class rclcpp::Thread; + friend ThreadId get_id() noexcept; + friend struct std::hash; + explicit ThreadId(pthread_t h) + : h(h) {} + pthread_t h; +}; + +ThreadId get_id() noexcept +{ + return ThreadId{pthread_self()}; +} + +bool operator!=(ThreadId id1, ThreadId id2) +{ + return !(id1 == id2); +} + +bool operator>(ThreadId id1, ThreadId id2) +{ + return id2 < id1; +} + +bool operator<=(ThreadId id1, ThreadId id2) +{ + return !(id1 > id2); +} + +bool operator>=(ThreadId id1, ThreadId id2) +{ + return !(id1 < id2); +} + +} // namespace thread_id_ns + +using thread_id_ns::ThreadId; +using thread_id_ns::operator==; +using thread_id_ns::operator!=; +using thread_id_ns::operator<; // NOLINT +using thread_id_ns::operator>; // NOLINT +using thread_id_ns::operator<=; +using thread_id_ns::operator>=; +using thread_id_ns::operator<<; + +} // namespace detail + +namespace this_thread +{ + +using detail::thread_id_ns::get_id; + +} // namespace this_thread + +} // namespace rclcpp + +namespace std +{ + +template<> +struct hash +{ + std::size_t operator()(rclcpp::detail::thread_id_ns::ThreadId id) + { + return id.h; + } +}; + +} // namespace std + +#endif // RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ diff --git a/rclcpp/include/rclcpp/threads/std/thread.hpp b/rclcpp/include/rclcpp/threads/std/thread.hpp new file mode 100644 index 0000000000..5b783be40a --- /dev/null +++ b/rclcpp/include/rclcpp/threads/std/thread.hpp @@ -0,0 +1,117 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__STD__THREAD_HPP_ +#define RCLCPP__THREADS__STD__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/threads/std/thread_attribute.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread +{ + using NativeHandleType = std::thread::native_handle_type; + using Attribute = detail::ThreadAttribute; + using Id = std::thread::id; + + Thread() noexcept + : thread_{} {} + Thread(Thread &&) = default; + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : thread_(std::forward(f), std::forward(args)...) + {} + template + Thread(Attribute & attr, F && f, Args && ... args) + { + if (attr.set_value) { + throw std::runtime_error("std::thread can't set thread attribute"); + } + thread_ = std::thread(std::forward(f), std::forward(args)...); + } + Thread(Thread const &) = delete; + ~Thread() {} + + Thread & operator=(Thread && other) noexcept = default; + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(thread_, other.thread_); + } + + void join() + { + thread_.join(); + } + + bool joinable() const noexcept + { + return thread_.joinable(); + } + + void detach() + { + thread_.detach(); + } + + NativeHandleType native_handle() + { + return thread_.native_handle(); + } + + Id get_id() const noexcept + { + return thread_.get_id(); + } + + static int hardware_concurrency() noexcept + { + return std::thread::hardware_concurrency(); + } + +private: + std::thread thread_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +namespace this_thread +{ + +using std::this_thread::get_id; + +} // namespace this_thread + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__STD__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/windows/thread.hpp b/rclcpp/include/rclcpp/threads/windows/thread.hpp new file mode 100644 index 0000000000..49e6134fcf --- /dev/null +++ b/rclcpp/include/rclcpp/threads/windows/thread.hpp @@ -0,0 +1,22 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__WINDOWS__THREAD_HPP_ +#define RCLCPP__THREADS__WINDOWS__THREAD_HPP_ + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. + +#endif // RCLCPP__THREADS__WINDOWS__THREAD_HPP_ diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 507d47f913..82c32702a7 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -33,7 +33,8 @@ MultiThreadedExecutor::MultiThreadedExecutor( std::chrono::nanoseconds next_exec_timeout) : rclcpp::Executor(options), yield_before_execute_(yield_before_execute), - next_exec_timeout_(next_exec_timeout) + next_exec_timeout_(next_exec_timeout), + use_thread_attrs_(false) { number_of_threads_ = number_of_threads > 0 ? number_of_threads : @@ -45,6 +46,41 @@ MultiThreadedExecutor::MultiThreadedExecutor( "MultiThreadedExecutor is used with a single thread.\n" "Use the SingleThreadedExecutor instead."); } + + thread_attributes_ = nullptr; + if (RCL_RET_OK == + rcl_arguments_get_thread_attrs( + &options.context->get_rcl_context()->global_arguments, + &thread_attributes_)) + { + if (nullptr != thread_attributes_) { + if (0 != thread_attributes_->num_attributes) { + use_thread_attrs_ = true; + } else { + thread_attributes_ = nullptr; + if (RCL_RET_OK == + rcl_context_get_thread_attrs( + options.context->get_rcl_context().get(), + &thread_attributes_)) + { + if (nullptr != thread_attributes_) { + if (0 != thread_attributes_->num_attributes) { + use_thread_attrs_ = true; + } + } + } + } + } + } + if (true == use_thread_attrs_ && + 0 != number_of_threads && + thread_attributes_->num_attributes != number_of_threads) + { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "The number of thread attribute and number_of_threads is different.\n" + "MultiThreadedExecutor is used with the number of threads specified by thread attribute."); + } } MultiThreadedExecutor::~MultiThreadedExecutor() {} @@ -56,17 +92,35 @@ MultiThreadedExecutor::spin() throw std::runtime_error("spin() called while already spinning"); } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - std::vector threads; + std::vector threads; size_t thread_id = 0; - { - std::lock_guard wait_lock{wait_mutex_}; - for (; thread_id < number_of_threads_ - 1; ++thread_id) { - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); - threads.emplace_back(func); + + if (use_thread_attrs_) { + rclcpp::detail::ThreadAttribute thread_attr; + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) { + thread_attr.set_thread_attributes( + thread_attributes_->attributes[thread_id]); + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(rclcpp::Thread(thread_attr, func)); + } + thread_attr.set_thread_attributes( + thread_attributes_->attributes[thread_id]); + run_on_rclcpp_thread(thread_attr, threads, thread_id); + } + run_on_this_thread(thread_attr, thread_id); + } else { + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < number_of_threads_ - 1; ++thread_id) { + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(func); + } } + run(thread_id); } - run(thread_id); for (auto & thread : threads) { thread.join(); } @@ -104,3 +158,81 @@ MultiThreadedExecutor::run(size_t this_thread_number) any_exec.callback_group.reset(); } } + +#if defined(__linux__) +#include +void +MultiThreadedExecutor::run_on_rclcpp_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + std::vector & threads, + size_t thread_id) +{ + (void)thread_attr; + (void)threads; + (void)thread_id; +} + +void +MultiThreadedExecutor::run_on_this_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + size_t thread_id) +{ + int r; + r = sched_setaffinity(0, sizeof(cpu_set_t), thread_attr.get_affinity().native_cpu_set()); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in sched_setaffinity "); + } + sched_param param; + param.sched_priority = thread_attr.get_priority(); + int policy = thread_attr.get_sched_policy(); + r = sched_setscheduler( + 0, policy, + ¶m); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in sched_setscheduler "); + } + run(thread_id); +} + +#elif defined(_WIN32) +#include +void +MultiThreadedExecutor::run_on_rclcpp_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + std::vector & threads, + size_t thread_id) +{ + (void)thread_attr; + (void)threads; + (void)thread_id; +} + +void +MultiThreadedExecutor::run_on_this_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + size_t thread_id) +{ + // TO DO: Setting parent thread attribute on Windows + run(thread_id); +} +#else +void +MultiThreadedExecutor::run_on_rclcpp_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + std::vector & threads, + size_t thread_id) +{ + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(rclcpp::Thread(thread_attr, func)); +} + +void +MultiThreadedExecutor::run_on_this_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + size_t thread_id) +{ + (void)thread_attr; + (void)thread_id; +} + +#endif diff --git a/rclcpp/src/rclcpp/threads/posix_thread.cpp b/rclcpp/src/rclcpp/threads/posix_thread.cpp new file mode 100644 index 0000000000..a52fda5f0a --- /dev/null +++ b/rclcpp/src/rclcpp/threads/posix_thread.cpp @@ -0,0 +1,101 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include "rclcpp/threads/posix/thread.hpp" + +static void * thread_main(void * p); + +namespace rclcpp +{ + +Thread::Thread(Attribute * attr, ThreadFuncUniquePtr func) +: handle_(NativeHandleType{}) +{ + Attribute::NativeAttributeType native_attr; + int r = pthread_attr_init(&native_attr); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in pthread_attr_init ");} + + if (attr != nullptr) { + r = pthread_attr_setinheritsched(&native_attr, PTHREAD_EXPLICIT_SCHED); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setinheritsched "); + } + + rclcpp::detail::CpuSet affinity = attr->get_affinity(); + size_t cpu_size_ = CPU_ALLOC_SIZE(static_cast(sysconf(_SC_NPROCESSORS_ONLN))); + r = pthread_attr_setaffinity_np(&native_attr, cpu_size_, affinity.native_cpu_set()); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setaffinity_np "); + } + + std::size_t stack_size = attr->get_stack_size(); + r = pthread_attr_setstacksize(&native_attr, stack_size); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setstacksize "); + } + + int flag = attr->get_run_as_detached() ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE; + r = pthread_attr_setdetachstate(&native_attr, flag); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setdetachstate "); + } + + int sched_policy = attr->get_sched_policy(); + if (sched_policy == SCHED_FIFO || sched_policy == SCHED_RR) { + r = pthread_attr_setschedpolicy(&native_attr, sched_policy); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setschedpolicy "); + } + + sched_param param; + param.sched_priority = attr->get_priority(); + r = pthread_attr_setschedparam(&native_attr, ¶m); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setschedparam "); + } + } + } + + NativeHandleType h; + + r = pthread_create(&h, &native_attr, thread_main, func.get()); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in pthread_create ");} + + if (attr == nullptr || !attr->get_run_as_detached()) { + this->handle_ = h; + } + + pthread_attr_destroy(&native_attr); + + func.release(); +} + +} // namespace rclcpp + +static void * thread_main(void * p) +{ + using rclcpp::detail::ThreadFuncUniquePtr; + using rclcpp::detail::ThreadFunc; + + ThreadFuncUniquePtr func{reinterpret_cast(p)}; + try { + func->run(); + } catch (...) { + std::cerr << "failed to run thread" << std::endl; + std::terminate(); + } + + return nullptr; +} diff --git a/rclcpp/src/rclcpp/threads/windows_thread.cpp b/rclcpp/src/rclcpp/threads/windows_thread.cpp new file mode 100644 index 0000000000..2f6ca50ec7 --- /dev/null +++ b/rclcpp/src/rclcpp/threads/windows_thread.cpp @@ -0,0 +1,19 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/threads/windows/thread.hpp" + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG.