diff --git a/CMakeLists.txt b/CMakeLists.txt index 6f28583..ff7e188 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,12 +77,20 @@ daq_add_application(datahandlinglibs_test_bufferedfilereader test_bufferedfilere daq_add_application(datahandlinglibs_test_skiplist test_skiplist_app.cxx TEST LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) daq_add_application(datahandlinglibs_test_composite_key test_composite_key_app.cxx TEST LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) +daq_add_application(datahandlinglibs_test_to_include_to_lcov_app test_to_include_to_lcov_app.cxx TEST LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) + ############################################################################## # Unit Tests -#daq_add_unit_test(datahandlinglibs_BufferedReadWrite_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) +daq_add_unit_test(datahandlinglibs_BufferedReadWrite_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) #daq_add_unit_test(datahandlinglibs_VariableSizeElementQueue_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) daq_add_unit_test(datahandlinglibs_DataMoveCallbackRegistry_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) +daq_add_unit_test(datahandlinglibs_ReadoutTypes_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) +daq_add_unit_test(datahandlinglibs_IterableQueueModel_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) +daq_add_unit_test(datahandlinglibs_SkiplistLatencyBufferModel_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) +daq_add_unit_test(datahandlinglibs_TaskRawDataProcessorModel_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) +daq_add_unit_test(datahandlinglibs_DefaultRequestHandlerModel_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) +daq_add_unit_test(datahandlinglibs_DataHandlingModel_test LINK_LIBRARIES datahandlinglibs ${BOOST_LIBS}) ############################################################################## # Installation diff --git a/include/datahandlinglibs/ReadoutTypes.hpp b/include/datahandlinglibs/ReadoutTypes.hpp index 1845f9c..8e17b5e 100644 --- a/include/datahandlinglibs/ReadoutTypes.hpp +++ b/include/datahandlinglibs/ReadoutTypes.hpp @@ -1,5 +1,5 @@ /** - * @file ReadoutTypes.hpp Common types in datahandlinglibs + * @file ReadoutTypes.hpp Common types in datahandlinglibs, used for tests * * This is part of the DUNE DAQ , copyright 2020. * Licensing/copyright details are in the COPYING file that you should have diff --git a/include/datahandlinglibs/concepts/RecorderConcept.hpp b/include/datahandlinglibs/concepts/RecorderConcept.hpp index d4d3272..11cfc7c 100644 --- a/include/datahandlinglibs/concepts/RecorderConcept.hpp +++ b/include/datahandlinglibs/concepts/RecorderConcept.hpp @@ -26,7 +26,7 @@ namespace dunedaq { namespace datahandlinglibs { -class RecorderConcept +class RecorderConcept : public opmonlib::MonitorableObject { public: RecorderConcept() {} diff --git a/include/datahandlinglibs/models/IterableQueueModel.hpp b/include/datahandlinglibs/models/IterableQueueModel.hpp index 3049491..dbe15b7 100644 --- a/include/datahandlinglibs/models/IterableQueueModel.hpp +++ b/include/datahandlinglibs/models/IterableQueueModel.hpp @@ -91,38 +91,6 @@ struct IterableQueueModel : public LatencyBufferConcept , writeIndex_(0) {} - // Explicit constructor with size - explicit IterableQueueModel(std::size_t size) // size must be >= 2 - : LatencyBufferConcept() // NOLINT(build/unsigned) - , numa_aware_(false) - , numa_node_(0) - , intrinsic_allocator_(false) - , alignment_size_(0) - , invalid_configuration_requested_(false) - , prefill_ready_(false) - , prefill_done_(false) - , size_(size) - , records_(static_cast(std::malloc(sizeof(T) * size))) - , readIndex_(0) - , writeIndex_(0) - { - assert(size >= 2); - if (!records_) { - throw std::bad_alloc(); - } -#if 0 - ptrlogger = std::thread([&](){ - while(true) { - auto const currentRead = readIndex_.load(std::memory_order_relaxed); - auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); - TLOG() << "BEG:" << std::hex << &records_[0] << " END:" << &records_[size] << std::dec - << " R:" << currentRead << " - W:" << currentWrite - << " OFLOW:" << overflow_ctr; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - }); -#endif - } // Constructor with alignment strategies IterableQueueModel(std::size_t size, // size must be >= 2 @@ -218,7 +186,7 @@ struct IterableQueueModel : public LatencyBufferConcept // Gives a pointer to the current read index const T* front() override; - // Gives a pointer to the current write index + // Gives a pointer to the last written element const T* back() override; // Gives a pointer to the first available slot of the queue diff --git a/include/datahandlinglibs/models/detail/DefaultRequestHandlerModel.hxx b/include/datahandlinglibs/models/detail/DefaultRequestHandlerModel.hxx index d0fb317..fe8398b 100644 --- a/include/datahandlinglibs/models/detail/DefaultRequestHandlerModel.hxx +++ b/include/datahandlinglibs/models/detail/DefaultRequestHandlerModel.hxx @@ -511,7 +511,7 @@ DefaultRequestHandlerModel::get_fragment_pieces(uint64_t start_win_ts, } } else { - //TLOG() << "Add element " << element->get_timestamp(); + TLOG() << "Add element " << element->get_timestamp(); // We are somewhere in the middle -> the whole aggregated object (e.g.: superchunk) can be copied frag_pieces.emplace_back( std::make_pair(static_cast((*start_iter).begin()), element->get_payload_size())); diff --git a/include/datahandlinglibs/models/detail/EmptyFragmentRequestHandlerModel.hxx b/include/datahandlinglibs/models/detail/EmptyFragmentRequestHandlerModel.hxx index 5889941..cf02b65 100644 --- a/include/datahandlinglibs/models/detail/EmptyFragmentRequestHandlerModel.hxx +++ b/include/datahandlinglibs/models/detail/EmptyFragmentRequestHandlerModel.hxx @@ -24,7 +24,7 @@ EmptyFragmentRequestHandlerModel::issue_request( << fragment->get_element_id(); //auto frag = std::make_pair(std::move(fragment), datarequest.data_destination); get_iom_sender>(datarequest.data_destination) - ->send(std::move(fragment), inherited::m_fragment_send_timeout_ms); + ->send(std::move(fragment), std::chrono::milliseconds(inherited::m_fragment_send_timeout_ms)); } catch (const ers::Issue& excpt) { ers::warning(CannotWriteToQueue( ERS_HERE, DefaultRequestHandlerModel::m_sourceid, "fragment queue", excpt)); diff --git a/include/datahandlinglibs/models/detail/IterableQueueModel.hxx b/include/datahandlinglibs/models/detail/IterableQueueModel.hxx index 27afd82..ac2d775 100644 --- a/include/datahandlinglibs/models/detail/IterableQueueModel.hxx +++ b/include/datahandlinglibs/models/detail/IterableQueueModel.hxx @@ -273,7 +273,7 @@ IterableQueueModel::front() return &records_[currentRead]; } -// Gives a pointer to the current write index +// Gives a pointer to the last written element template const T* IterableQueueModel::back() diff --git a/include/datahandlinglibs/testutils/UnitTestUtilities.hpp b/include/datahandlinglibs/testutils/UnitTestUtilities.hpp new file mode 100644 index 0000000..6a91c2d --- /dev/null +++ b/include/datahandlinglibs/testutils/UnitTestUtilities.hpp @@ -0,0 +1,392 @@ +#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_TESTUTILS_UNITTESTUTILITIES_HPP +#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_TESTUTILS_UNITTESTUTILITIES_HPP + +#include "datahandlinglibs/models/DefaultRequestHandlerModel.hpp" +#include "datahandlinglibs/models/TaskRawDataProcessorModel.hpp" +#include "datahandlinglibs/models/EmptyFragmentRequestHandlerModel.hpp" +#include "datahandlinglibs/models/DataHandlingModel.hpp" + +#include + +namespace dunedaq { +namespace datahandlinglibs { +namespace unittest { + + +struct FakeReadoutTypeBase +{ + uint64_t get_timestamp() const { return timestamp; } + void set_timestamp(uint64_t ts) { timestamp = ts; } + size_t get_payload_size() const { return fixed_num_frames * fixed_frame_size; } + size_t get_frame_size() const { return fixed_frame_size; } + virtual size_t get_num_frames() const { return fixed_num_frames; } + virtual FakeReadoutTypeBase* begin() = 0; + virtual FakeReadoutTypeBase* end() = 0; + + uint64_t timestamp; // NOLINT(build/unsigned) + static constexpr size_t fixed_frame_size = 16; + static constexpr size_t fixed_num_frames = 1; + + static const constexpr dunedaq::daqdataformats::SourceID::Subsystem subsystem = + dunedaq::daqdataformats::SourceID::Subsystem::kUnknown; + static const constexpr dunedaq::daqdataformats::FragmentType fragment_type = + dunedaq::daqdataformats::FragmentType::kUnknown; + static const constexpr uint64_t expected_tick_difference = 1; + static constexpr size_t fixed_payload_size = fixed_frame_size * fixed_num_frames; + + char data[fixed_payload_size]; + + virtual ~FakeReadoutTypeBase() = default; +}; + +struct FakeReadoutType : public FakeReadoutTypeBase +{ + size_t get_num_frames() const override { return fixed_num_frames; } + FakeReadoutType* begin() override { return reinterpret_cast(data); } + + FakeReadoutType* end() override { return begin() + fixed_num_frames; } + + static constexpr size_t fixed_num_frames = 1; + static constexpr size_t fixed_payload_size = fixed_frame_size * fixed_num_frames; + char data[fixed_payload_size]; + + FakeReadoutType operator*(const FakeReadoutType& other) const + { + FakeReadoutType result; + result.timestamp = this->timestamp * other.timestamp; + return result; + } + FakeReadoutType operator+(int value) const + { + FakeReadoutType result; + result.timestamp = this->timestamp + value; + return result; + } + bool operator<(const FakeReadoutType& rhs) const { return this->timestamp < rhs.get_timestamp(); } + bool operator==(const FakeReadoutType& rhs) const { return this->timestamp == rhs.get_timestamp(); } + + FakeReadoutType() {} +}; +inline std::ostream& +operator<<(std::ostream& os, const FakeReadoutType& obj) +{ + os << "FakeReadoutType(timestamp=" << obj.timestamp << ")"; + return os; +} + +struct FakeSuperChunkReadoutType : public FakeReadoutTypeBase +{ + size_t get_num_frames() const override { return fixed_num_frames; } + FakeSuperChunkReadoutType* begin() override { return reinterpret_cast(data); } + FakeSuperChunkReadoutType* end() override { return begin() + fixed_num_frames; } + + static constexpr size_t fixed_num_frames = 4; + static constexpr size_t fixed_payload_size = fixed_frame_size * fixed_num_frames; + char data[fixed_payload_size]; + + FakeSuperChunkReadoutType operator*(const FakeSuperChunkReadoutType& other) const + { + FakeSuperChunkReadoutType result; + result.timestamp = this->timestamp * other.timestamp; + return result; + } + FakeSuperChunkReadoutType operator+(int value) const + { + FakeSuperChunkReadoutType result; + result.timestamp = this->timestamp + value; + return result; + } + bool operator<(const FakeSuperChunkReadoutType& rhs) const { return this->timestamp < rhs.get_timestamp(); } + bool operator==(const FakeSuperChunkReadoutType& rhs) const { return this->timestamp == rhs.get_timestamp(); } + + FakeSuperChunkReadoutType() {} +}; +inline std::ostream& +operator<<(std::ostream& os, const FakeSuperChunkReadoutType& obj) +{ + os << "FakeSuperChunkReadoutType(timestamp=" << obj.timestamp << ")"; + return os; +} + +struct FakeIterator +{ + using value_type = FakeReadoutType; + + FakeIterator(FakeReadoutType* ptr = nullptr) + : ptr_(ptr) + { + } + + FakeReadoutType& operator*() const { return *ptr_; } + FakeReadoutType* operator->() const { return ptr_; } + + FakeIterator& operator++() + { + ++ptr_; + return *this; + } + + friend bool operator!=(const FakeIterator& a, const FakeIterator& b) { return a.ptr_ != b.ptr_; } + + friend bool operator==(const FakeIterator& a, const FakeIterator& b) { return a.ptr_ == b.ptr_; } + + bool good() const { return ptr_ != nullptr; } + +private: + FakeReadoutType* ptr_; +}; + +template +class FakeLatencyBufferType : public LatencyBufferConcept +{ +public: + void conf([[maybe_unused]] const dunedaq::appmodel::LatencyBuffer* conf) override {} + void scrap([[maybe_unused]] const appfwk::DAQModule::CommandData_t& cfg) override {} + std::size_t occupancy() const override { return buffer_.size(); } + void flush() override { buffer_.clear(); } + bool write([[maybe_unused]] T&& element) override + { + buffer_.push_back(std::move(element)); + return true; + } + const T* back() override { return buffer_.empty() ? nullptr : &buffer_.back(); } + const T* front() override { return buffer_.empty() ? nullptr : &buffer_.front(); } + + bool read(T& element) override + { + if (buffer_.empty()) + return false; + element = buffer_.front(); + return true; + } + void pop(std::size_t amount) override + { + while (amount-- && !buffer_.empty()) { + buffer_.pop_front(); + } + } + void allocate_memory([[maybe_unused]] size_t /*size*/) override{}; + + FakeIterator lower_bound([[maybe_unused]] T& element, [[maybe_unused]] bool with_errors = false) { return begin(); } + FakeIterator end() { return buffer_.empty() ? FakeIterator{ nullptr } : FakeIterator{ &buffer_.back() + 1 }; } + FakeIterator begin() { return buffer_.empty() ? FakeIterator{ nullptr } : FakeIterator{ &buffer_.front() }; } + + size_t get_alignment_size() const { return alignof(T); } + size_t size() const { return buffer_.size(); } + const T* start_of_buffer() const { return buffer_.empty() ? nullptr : &buffer_.front(); } + const T* end_of_buffer() const { return buffer_.empty() ? nullptr : &buffer_.back() + 1; } + + std::deque buffer_; +}; + +template +class FakeRequestHandlerType : public DefaultRequestHandlerModel +{ +public: + using DefaultRequestHandlerModel::DefaultRequestHandlerModel; + + int get_handled_requests() const { return (this->m_handled_requests).load(); } + int get_response_time_acc() const { return (this->m_response_time_acc).load(); } + int get_response_time_min() const { return (this->m_response_time_min).load(); } + int get_response_time_max() const { return (this->m_response_time_max).load(); } + int get_num_buffer_cleanups() const { return (this->m_num_buffer_cleanups).load(); } + int get_waiting_requests() const { return (this->m_waiting_requests).size(); } + int get_pops_count() const { return (this->m_pops_count).load(); } + int get_occupancy() const { return (this->m_occupancy).load(); } + void change_run_marker(bool change) { (this->m_run_marker) = change; } + bool get_run_marker() { return (this->m_run_marker).load(); } + void public_dump_to_buffer(const void* data, + std::size_t size, + void* buffer, + uint32_t buffer_pos, // NOLINT(build/unsigned) + const std::size_t& buffer_size) + { + this->dump_to_buffer(data, size, buffer, buffer_pos, buffer_size); + } + + void public_check_waiting_requests() { this->check_waiting_requests(); } + std::unique_ptr public_create_empty_fragment( + const dunedaq::dfmessages::DataRequest& dr) + { + return this->create_empty_fragment(dr); + } + + void public_periodic_cleanups() { this->periodic_cleanups(); } + void test_start() + { + this->m_request_handler_thread_pool = std::make_unique(1); + this->m_fragment_send_timeout_ms = 100; + + this->m_pop_limit_size = 5; + this->m_pop_size_pct = 0.5; + reset_opmon_variables(); + } + + dunedaq::daqdataformats::timestamp_t cutoff; + dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp() override { return cutoff; } + void set_cutoff_timestamp(dunedaq::daqdataformats::timestamp_t ncutoff) { cutoff = ncutoff; } + void reset_opmon_variables() + { + this->m_num_requests_found = 0; + this->m_num_requests_bad = 0; + this->m_num_requests_old_window = 0; + this->m_num_requests_delayed = 0; + this->m_num_requests_uncategorized = 0; + this->m_num_buffer_cleanups = 0; + this->m_num_requests_timed_out = 0; + this->m_handled_requests = 0; + this->m_response_time_acc = 0; + this->m_pop_reqs = 0; + this->m_pops_count = 0; + this->m_payloads_written = 0; + this->m_bytes_written = 0; + } + + int get_num_requests_found() { return this->m_num_requests_found.load(); } + int get_num_requests_delayed() { return this->m_num_requests_delayed.load(); } + int get_num_requests_old_window() { return this->m_num_requests_old_window.load(); } + int get_num_requests_bad() { return this->m_num_requests_bad.load(); } +}; + +class FakeRawDataProcessorType : public dunedaq::datahandlinglibs::TaskRawDataProcessorModel +{ +public: + using Base = dunedaq::datahandlinglibs::TaskRawDataProcessorModel; + using Base::Base; + + void public_post_processing_thread(std::function& func, + folly::ProducerConsumerQueue& queue) + { + this->run_post_processing_thread(func, queue); + } + + void public_post_processing_threads() + { + for (size_t i = 0; i < this->m_post_process_threads.size(); ++i) { + this->m_post_process_threads[i]->set_work(&FakeRawDataProcessorType::public_post_processing_thread, + this, + std::ref(this->m_post_process_functions[i]), + std::ref(*this->m_items_to_postprocess_queues[i])); + } + } + + void make_queues(int size) + { + for (size_t i = 0; i < this->m_post_process_functions.size(); ++i) { + this->m_items_to_postprocess_queues.push_back( + std::make_unique>(size)); // size can be anything + this->m_post_process_threads[i]->set_name(std::to_string(i), i); + } + } + + int get_post_queues_size() { return (this->m_items_to_postprocess_queues).size(); } + + const std::vector>& get_post_threads() const + { + return this->m_post_process_threads; + } + std::vector>& get_post_functions() + { + return this->m_post_process_functions; + } + const std::vector>>& get_post_queues() + const + { + return this->m_items_to_postprocess_queues; + } + void run_marker_set(bool m_set) { this->m_run_marker = m_set; } +}; + +template +class FakeDataHandlingModel + : public datahandlinglibs::DataHandlingModel + +{ + using Base = DataHandlingModel; + +public: + // Inherit all constructors from the base class + using Base::Base; + + void initialize(bool post_pro_enabled) + { + this->m_error_registry.reset(new FrameErrorRegistry()); + this->m_error_registry->set_ers_metadata("DLH of SourceID[" + std::to_string(1) + "] "); + this->m_latency_buffer_impl.reset(new LatencyBufferType()); + this->m_raw_processor_impl.reset(new RawDataProcessorType(this->m_error_registry, post_pro_enabled)); + this->m_request_handler_impl.reset(new RequestHandlerType(this->m_latency_buffer_impl, this->m_error_registry)); + } + + void initialize_iterable_queue(bool post_pro_enabled, int size) + { + this->m_error_registry.reset(new FrameErrorRegistry()); + this->m_error_registry->set_ers_metadata("DLH of SourceID[" + std::to_string(1) + "] "); + this->m_latency_buffer_impl.reset(new LatencyBufferType(size)); + this->m_raw_processor_impl.reset(new RawDataProcessorType(this->m_error_registry, post_pro_enabled)); + this->m_request_handler_impl.reset(new RequestHandlerType(this->m_latency_buffer_impl, this->m_error_registry)); + } + + void raw_processor(bool& pre_func_one_called, bool& post_func_one_called, int cutoff_timestamp) + { + this->m_raw_processor_impl->add_preprocess_task( + [&pre_func_one_called](ReadoutType* /*elem*/) { pre_func_one_called = true; }); + this->m_raw_processor_impl->add_postprocess_task( + [&post_func_one_called](const ReadoutType* /*elem*/) { post_func_one_called = true; }); + this->m_request_handler_supports_cutoff_timestamp = true; + this->m_processing_delay_ticks = 0; + this->m_request_handler_impl->set_cutoff_timestamp(cutoff_timestamp); + this->m_raw_processor_impl->make_queues(32); + } + + void post_schedule_init(bool& pre_func_one_called, bool& post_func_one_called, int cutoff_timestamp, int min_wait) + { + raw_processor(pre_func_one_called, post_func_one_called, cutoff_timestamp); + + this->m_postprocess_scheduler_thread.set_name("pprocsched", 1); + this->m_timekeeper = std::make_unique(); + + this->m_processing_delay_ticks = 4; + this->m_post_processing_delay_max_wait = 100; + this->m_num_post_processing_delay_max_waits = 0; + this->m_post_processing_delay_min_wait = min_wait; + } + + void public_process_item(ReadoutType&& payload) { this->process_item(std::move(payload)); } + + template + std::vector public_transform_payload(IDT& payload) + { + return this->transform_payload(payload); + } + + template + void public_transform_and_process(IDT&& payload) + { + this->transform_and_process(std::forward(payload)); + } + template + void public_consume_callback(IDT&& payload) + { + this->consume_callback(std::move(payload)); + } + void public_run_postprocess_scheduler() { this->run_postprocess_scheduler(); } + + int get_m_num_payloads() { return (this->m_num_payloads).load(); } + int get_m_sum_payloads() { return (this->m_sum_payloads).load(); } + int get_m_stats_packet_count() { return (this->m_stats_packet_count).load(); } + int get_raw_processor_queue_size() { return this->m_raw_processor_impl->get_post_queues_size(); } + int get_m_num_post_processing_delay_max_waits() { return this->m_num_post_processing_delay_max_waits.load(); } + int get_m_num_lb_insert_failures() { return static_cast(this->m_num_lb_insert_failures.load()); } + + void change_m_run_marker(bool change) { this->m_run_marker = change; } +}; + +} +} +} + +#endif // DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_TESTUTILS_UNITTESTUTILITIES_HPP \ No newline at end of file diff --git a/test/apps/test_skiplist_app.cxx b/test/apps/test_skiplist_app.cxx index d1d867f..4b4f598 100644 --- a/test/apps/test_skiplist_app.cxx +++ b/test/apps/test_skiplist_app.cxx @@ -22,12 +22,14 @@ #include #include #include +#include + using namespace dunedaq::datahandlinglibs; using namespace folly; int -main(int /*argc*/, char** /*argv[]*/) +main(int argc, char** argv) { // ConcurrentSkipList from Folly @@ -74,23 +76,45 @@ main(int /*argc*/, char** /*argv[]*/) }); // Producer thread - auto producer = std::thread([&]() { - TLOG() << "SkipList Producer spawned... Creating accessor."; - uint64_t ts = 0; // NOLINT(build/unsigned) - while (marker) { - types::DUMMY_FRAME_STRUCT pl; - auto plptr = - const_cast(reinterpret_cast(&pl)); // NOLINT - plptr->timestamp = ts; - { - SkipListTAcc prodacc(skl); - prodacc.insert(std::move(pl)); - } - ts += 25; - rl.limit(); + + //vector of threads + std::vector< std::thread >producers; + int producer_num = 1; + if (argc > 1) { + try { + producer_num = std::stoi(argv[1]); + if (producer_num <= 0) { + std::cerr << "Error: producer_num must be positive.\n"; + return 1; + } + } catch (const std::exception& e) { + std::cerr << "Error: Invalid input for producer_num: " << argv[1] << "\n"; + return 1; } - TLOG() << "Producer joins..."; - }); +} + + for(int i = 0; i (reinterpret_cast(&pl)); // NOLINT + plptr->timestamp = ts; + { + SkipListTAcc prodacc(skl); + prodacc.insert(std::move(pl)); + } + ts +=25; + rl.limit(); + } + }); + + producers.push_back(std::move(producer)); + } + // Cleanup thread auto cleaner = std::thread([&]() { @@ -107,20 +131,20 @@ main(int /*argc*/, char** /*argv[]*/) auto headptr = reinterpret_cast(head); // NOLINT auto tailts = tailptr->get_timestamp(); auto headts = headptr->get_timestamp(); - if (headts - tailts > max_time_diff) { // ts differnce exceeds maximum + if (tailts - headts > max_time_diff) { uint64_t timediff = max_time_diff; // NOLINT(build/unsigned) auto removed_ctr = 0; while (timediff >= max_time_diff) { - bool removed = cleanacc.remove(*tail); + bool removed = cleanacc.remove(*head); if (!removed) { TLOG() << tname << ": Unsuccessfull remove: " << removed; } else { ++removed_ctr; } - tail = cleanacc.last(); - tailptr = reinterpret_cast(tail); // NOLINT - tailts = tailptr->get_timestamp(); - timediff = headts - tailts; + head = cleanacc.first(); + headptr = reinterpret_cast(head); // NOLINT + headts = headptr->get_timestamp(); + timediff = tailts - headts; } TLOG() << tname << ": Cleared " << removed_ctr << " elements."; } @@ -187,9 +211,12 @@ main(int /*argc*/, char** /*argv[]*/) killswitch.join(); } - if (producer.joinable()) { - producer.join(); + for (auto& producer : producers){ + if (producer.joinable()) { + producer.join(); + } } + if (cleaner.joinable()) { cleaner.join(); diff --git a/test/apps/test_to_include_to_lcov_app.cxx b/test/apps/test_to_include_to_lcov_app.cxx new file mode 100644 index 0000000..9976b6e --- /dev/null +++ b/test/apps/test_to_include_to_lcov_app.cxx @@ -0,0 +1,86 @@ +/** + * @file test_to_include_to_lcov_app.cxx Test application for + * ratelimiter implementation + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + + +#include "datahandlinglibs/models/BinarySearchQueueModel.hpp" +#include "datahandlinglibs/models/DataHandlingModel.hpp" +#include "datahandlinglibs/models/DataSubscriberModel.hpp" +#include "datahandlinglibs/models/DefaultRequestHandlerModel.hpp" +#include "datahandlinglibs/models/DefaultSkipListRequestHandler.hpp" +#include "datahandlinglibs/models/EmptyFragmentRequestHandlerModel.hpp" +#include "datahandlinglibs/models/FixedRateQueueModel.hpp" +#include "datahandlinglibs/models/IterableQueueModel.hpp" +#include "datahandlinglibs/models/RecorderModel.hpp" + +#include "datahandlinglibs/models/SourceEmulatorModel.hpp" +#include "datahandlinglibs/models/TaskRawDataProcessorModel.hpp" +#include "datahandlinglibs/models/ZeroCopyRecordingRequestHandlerModel.hpp" +#include "datahandlinglibs/ReadoutTypes.hpp" + + + + +#include "datahandlinglibs/opmon/datahandling_info.pb.h" +#include "datahandlinglibs/models/SkipListLatencyBufferModel.hpp" +#include "datahandlinglibs/testutils/UnitTestUtilities.hpp" + +#include + + + +using namespace dunedaq::datahandlinglibs; +using namespace folly; + + + + +int +main(int /*argc*/, char** /*argv[]*/) +{ + + BinarySearchQueueModel binary_search_queue_model; + std::atomic flag{true}; + std::atomic run_marker; + + auto readout_model = std::make_shared>, + unittest::FakeLatencyBufferType, + unittest::FakeRawDataProcessorType, + unittest::FakeReadoutType>>(run_marker); + + DataSubscriberModel data_subsciber_model; + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + + DefaultRequestHandlerModel> default_request_handler_model(latency_buffer, error_registry); + + auto latency_buffer2 = std::make_shared>(); + auto error_registry2 = std::make_unique(); + DefaultSkipListRequestHandler default_skiplist_request_handler(latency_buffer2,error_registry2); + + EmptyFragmentRequestHandlerModel >empty_fragment_request_handler_model(latency_buffer,error_registry); + + FixedRateQueueModel fixed_rate_queue_model; + IterableQueueModel iterable_queue_model; + RecorderModel recorder_model("name"); + SkipListLatencyBufferModel skiplist_latency_buffer_model; + SourceEmulatorPatternGenerator source_emulator_pattern_generator; + TaskRawDataProcessorModel task_rawdata_processor_model(error_registry,false); + ZeroCopyRecordingRequestHandlerModel > zero_copy_recording_request_hander_model(latency_buffer,error_registry); + + + + + + + + + return 0; +} // NOLINT(readability/fn_size) diff --git a/unittest/datahandlinglibs_BufferedReadWrite_test.cxx b/unittest/datahandlinglibs_BufferedReadWrite_test.cxx index 58704ec..6630846 100644 --- a/unittest/datahandlinglibs_BufferedReadWrite_test.cxx +++ b/unittest/datahandlinglibs_BufferedReadWrite_test.cxx @@ -13,9 +13,9 @@ #include "boost/test/unit_test.hpp" -#include "logging/Logging.hpp" #include "datahandlinglibs/utils/BufferedFileReader.hpp" #include "datahandlinglibs/utils/BufferedFileWriter.hpp" +#include "logging/Logging.hpp" #include #include @@ -34,7 +34,9 @@ test_read_write(BufferedFileWriter<>& writer, BufferedFileReader& reader, u for (uint i = 0; i < numbers.size(); ++i) { numbers[i] = i; write_successful = writer.write(reinterpret_cast(&i), sizeof(i)); - BOOST_REQUIRE(write_successful); + if (!write_successful) { + BOOST_FAIL("Failed to write to file at index " + std::to_string(i)); + } } writer.close(); @@ -43,10 +45,12 @@ test_read_write(BufferedFileWriter<>& writer, BufferedFileReader& reader, u bool read_successful = false; for (uint i = 0; i < numbers.size(); ++i) { read_successful = reader.read(read_value); - if (!read_successful) + if (!read_successful) { TLOG() << i << std::endl; - BOOST_REQUIRE(read_successful); - BOOST_REQUIRE_EQUAL(read_value, numbers[i]); + BOOST_FAIL("Failed to read file at index " + std::to_string(i)); + } + if (read_value != numbers[i]) + BOOST_FAIL("Data mismatch at index " + std::to_string(i)); } read_successful = reader.read(read_value); diff --git a/unittest/datahandlinglibs_DataHandlingModel_test.cxx b/unittest/datahandlinglibs_DataHandlingModel_test.cxx new file mode 100644 index 0000000..444c94b --- /dev/null +++ b/unittest/datahandlinglibs_DataHandlingModel_test.cxx @@ -0,0 +1,292 @@ +/** + * @file datahandlinglibs_DataHandlingModel_test.cxx Unit Tests for DataHandlingModel + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#define BOOST_TEST_MODULE datahandlinglibs_DataHandlingModel_test // NOLINT + +#include "boost/test/unit_test.hpp" +#include "datahandlinglibs/models/BinarySearchQueueModel.hpp" +#include "datahandlinglibs/models/DataHandlingModel.hpp" +#include "datahandlinglibs/models/FixedRateQueueModel.hpp" +#include "datahandlinglibs/models/IterableQueueModel.hpp" +#include "datahandlinglibs/models/SkipListLatencyBufferModel.hpp" +#include "datahandlinglibs/opmon/datahandling_info.pb.h" +#include "datahandlinglibs/testutils/UnitTestUtilities.hpp" + +#include +#include + +using namespace dunedaq::datahandlinglibs; + +BOOST_AUTO_TEST_SUITE(datahandlinglibs_DataHandlingModel_test) + +using RDT = unittest::FakeReadoutType; +using RHT = unittest::FakeRequestHandlerType>; +using LBT = unittest::FakeLatencyBufferType; +using RPT = unittest::FakeRawDataProcessorType; + + + +BOOST_AUTO_TEST_CASE(DataHandlingModel_process_item_pre_post_process) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel(run_marker); + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.initialize(true); + model.raw_processor(pre_func_one_called, post_func_one_called, 0); + + RDT elem; + elem.set_timestamp(2); + + model.public_process_item(std::move(elem)); + BOOST_REQUIRE(pre_func_one_called); + BOOST_REQUIRE_EQUAL(model.get_m_num_payloads(), 1); + BOOST_REQUIRE_EQUAL(model.get_m_sum_payloads(), 1); + BOOST_REQUIRE_EQUAL(model.get_m_stats_packet_count(), 1); +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_process_item_cutoff_triggered) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel(run_marker); + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.initialize(true); + model.raw_processor(pre_func_one_called, post_func_one_called, 10); + + RDT elem; + elem.set_timestamp(2); + + setenv("DUNEDAQ_ERS_WARNING", "throw", 1); + BOOST_CHECK_THROW(model.public_process_item(std::move(elem)), std::exception); +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_transform_payload) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel(run_marker); + + unittest::FakeSuperChunkReadoutType input; + input.set_timestamp(4); + auto output = model.public_transform_payload(input); + BOOST_REQUIRE((std::is_same_v)); +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_transform_and_process_same_type) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel(run_marker); + model.initialize(true); + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.raw_processor(pre_func_one_called, post_func_one_called, 0); + + RDT input; + input.set_timestamp(4); + model.public_transform_and_process(std::move(input)); + BOOST_REQUIRE(pre_func_one_called); + BOOST_REQUIRE_EQUAL(model.get_m_num_payloads(), 1); +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_transform_and_process_different_type) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel(run_marker); + model.initialize(true); + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.raw_processor(pre_func_one_called, post_func_one_called, 0); + + unittest::FakeSuperChunkReadoutType input; + input.set_timestamp(4); + model.public_transform_and_process(std::move(input)); + BOOST_REQUIRE(pre_func_one_called); + BOOST_REQUIRE_EQUAL(model.get_m_num_payloads(), 1); +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_consume_callback) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel(run_marker); + model.initialize(true); + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.raw_processor(pre_func_one_called, post_func_one_called, 0); + + unittest::FakeSuperChunkReadoutType input; + input.set_timestamp(4); + model.public_consume_callback(std::move(input)); + BOOST_REQUIRE(pre_func_one_called); + BOOST_REQUIRE_EQUAL(model.get_m_num_payloads(), 1); +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_consume_callback_write_failure) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel>, + BinarySearchQueueModel, + RPT, + RDT>(run_marker); + model.initialize(true); // default size of LB is 2 + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.raw_processor(pre_func_one_called, post_func_one_called, 0); + + for (int i = 100; i < 102; i++) { + RDT input; + input.set_timestamp(i); + model.public_consume_callback(std::move(input)); + } + + BOOST_REQUIRE(pre_func_one_called); + BOOST_REQUIRE_EQUAL(model.get_m_num_lb_insert_failures(), 1); +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_postprocess_schedule_SkipListLatencyBufferModel) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel< + RDT, + unittest::FakeRequestHandlerType>, + SkipListLatencyBufferModel, + RPT, + RDT>(run_marker); + model.initialize(true); + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.post_schedule_init(pre_func_one_called, post_func_one_called, 0, 0); + + for (int i = 100; i < 105; i++) { + RDT input; + input.set_timestamp(i); + model.public_transform_and_process(std::move(input)); + } + // std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread postprocess([&]() { model.public_run_postprocess_scheduler(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + for (int i = 105; i < 112; i++) { + RDT input; + input.set_timestamp(i); + model.public_transform_and_process(std::move(input)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + model.change_m_run_marker(false); + { + RDT input; + input.set_timestamp(112); + model.public_transform_and_process(std::move(input)); + } + postprocess.join(); + + BOOST_REQUIRE_EQUAL(model.get_m_num_post_processing_delay_max_waits(), 0); + BOOST_REQUIRE_EQUAL(model.get_m_num_payloads(), 8); // newest ts - delay ticks +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_postprocess_schedule_BinarySearchQueueModel) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel>, + BinarySearchQueueModel, + RPT, + RDT>(run_marker); + model.initialize_iterable_queue(true, 32); + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.post_schedule_init(pre_func_one_called, post_func_one_called, 0, 0); + + for (int i = 100; i < 105; i++) { + RDT input; + input.set_timestamp(i); + model.public_transform_and_process(std::move(input)); + } + // std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread postprocess([&]() { model.public_run_postprocess_scheduler(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + for (int i = 105; i < 112; i++) { + RDT input; + input.set_timestamp(i); + model.public_transform_and_process(std::move(input)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + model.change_m_run_marker(false); + { + RDT input; + input.set_timestamp(112); + model.public_transform_and_process(std::move(input)); + } + postprocess.join(); + + BOOST_REQUIRE_EQUAL(model.get_m_num_post_processing_delay_max_waits(), 0); + BOOST_REQUIRE_EQUAL(model.get_m_num_payloads(), 8); // newest ts - delay ticks +} + +BOOST_AUTO_TEST_CASE(DataHandlingModel_postprocess_schedule_FixedRateQueueModel) +{ + std::atomic run_marker = true; + + auto model = unittest::FakeDataHandlingModel>, + FixedRateQueueModel, + RPT, + RDT>(run_marker); + model.initialize_iterable_queue(true, 32); + + bool pre_func_one_called = false; + bool post_func_one_called = false; + model.post_schedule_init(pre_func_one_called, post_func_one_called, 0, 0); + + for (int i = 100; i < 105; i++) { + RDT input; + input.set_timestamp(i); + model.public_transform_and_process(std::move(input)); + } + // std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread postprocess([&]() { model.public_run_postprocess_scheduler(); }); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + for (int i = 105; i < 112; i++) { + RDT input; + input.set_timestamp(i); + model.public_transform_and_process(std::move(input)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + model.change_m_run_marker(false); + { + RDT input; + input.set_timestamp(112); + model.public_transform_and_process(std::move(input)); + } + postprocess.join(); + + BOOST_REQUIRE_EQUAL(model.get_m_num_post_processing_delay_max_waits(), 0); + BOOST_REQUIRE_EQUAL(model.get_m_num_payloads(), 8); // newest ts - delay ticks +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/unittest/datahandlinglibs_DataMoveCallbackRegistry_test.cxx b/unittest/datahandlinglibs_DataMoveCallbackRegistry_test.cxx index 26e719f..98955d7 100644 --- a/unittest/datahandlinglibs_DataMoveCallbackRegistry_test.cxx +++ b/unittest/datahandlinglibs_DataMoveCallbackRegistry_test.cxx @@ -15,6 +15,10 @@ #include "boost/test/unit_test.hpp" +#include + +#include + using namespace dunedaq::datahandlinglibs; BOOST_AUTO_TEST_SUITE(datahandlinglibs_DataMoveCallbackRegistry_test) @@ -41,6 +45,15 @@ BOOST_AUTO_TEST_CASE(DataMoveCallbackRegistry_get_callback) * @test The registered callback function's parameter type and template argument must match */ BOOST_CHECK_THROW(registry->get_callback("id"), GenericConfigurationError); + + /** + * @test Returned function must work + */ + int check; + registry->register_callback("cout", [&check](int&& num) { check = num; }); + std::shared_ptr> returned_func = registry->get_callback("cout"); + (*returned_func)(42); + BOOST_CHECK_EQUAL(check, 42); } BOOST_AUTO_TEST_SUITE_END() diff --git a/unittest/datahandlinglibs_DefaultRequestHandlerModel_test.cxx b/unittest/datahandlinglibs_DefaultRequestHandlerModel_test.cxx new file mode 100644 index 0000000..37ab494 --- /dev/null +++ b/unittest/datahandlinglibs_DefaultRequestHandlerModel_test.cxx @@ -0,0 +1,350 @@ +/** + * @file datahandlinglibs_DefaultRequestHandlerModel_test.cxx Unit Tests for DefaultRequestHandlerModel + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#define BOOST_TEST_MODULE datahandlinglibs_DefaultRequestHandlerModel_test // NOLINT + +#include "boost/test/unit_test.hpp" +#include "datahandlinglibs/models/DefaultRequestHandlerModel.hpp" +#include "datahandlinglibs/models/SkipListLatencyBufferModel.hpp" +#include "datahandlinglibs/opmon/datahandling_info.pb.h" +#include "datahandlinglibs/testutils/UnitTestUtilities.hpp" + +#include "dfmessages/Types.hpp" + +#include "dfmessages/DataRequest.hpp" +#include +#include +#include + +using namespace dunedaq::datahandlinglibs; + +BOOST_AUTO_TEST_SUITE(datahandlinglibs_DefaultRequestHandlerModel_test) + +dunedaq::dfmessages::DataRequest +create_request(int req_number, int difference) +{ + dunedaq::dfmessages::DataRequest req; + + req.request_number = req_number; // Unique number for this request + req.trigger_number = req_number; // Trigger ID that caused this + req.run_number = req_number; // The current run being taken + req.trigger_timestamp = req_number; // When the event occurred + req.readout_type = dunedaq::dfmessages::ReadoutType::kInvalid; // Readout mode + req.sequence_number = req_number; // Sequence within this run + req.data_destination = "somewhere"; // Where to send the result + + req.request_information.window_begin = req_number; + req.request_information.window_end = req_number + difference; + return req; +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_timestamp_virtuals) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + + DefaultRequestHandlerModel> + req_handler(latency_buffer, error_registry); + + BOOST_REQUIRE_EQUAL(req_handler.get_cutoff_timestamp(), 0); + BOOST_REQUIRE(!req_handler.supports_cutoff_timestamp()); +} + +// create_fragment_header,issue_request, data_request +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_issue_request_empty_buffer) +{ + dunedaq::dfmessages::DataRequest req_1 = create_request(1, 1); + + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + testhandler.test_start(); + + testhandler.issue_request(req_1, false); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + BOOST_REQUIRE_EQUAL(testhandler.get_handled_requests(), 0); + BOOST_REQUIRE(testhandler.get_response_time_acc() == 0); + BOOST_REQUIRE_EQUAL(testhandler.get_response_time_min(), std::numeric_limits::max()); + BOOST_REQUIRE_EQUAL(testhandler.get_response_time_max(), 0); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_issue_request_kPartiallyOld) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + testhandler.test_start(); + unittest::FakeReadoutType elem; + elem.set_timestamp(2); + latency_buffer->write(std::move(elem)); + testhandler.issue_request(create_request(1, 1)); + + auto start = std::chrono::steady_clock::now(); + while (testhandler.get_handled_requests() < 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + if (std::chrono::steady_clock::now() - start > std::chrono::seconds(6)) { + BOOST_FAIL("Timeout: handler never processed the request"); + } + } + BOOST_REQUIRE_EQUAL(testhandler.get_handled_requests(), 1); + BOOST_REQUIRE(testhandler.get_response_time_acc() > 0); + BOOST_REQUIRE_EQUAL(testhandler.get_response_time_acc(), testhandler.get_response_time_min()); + BOOST_REQUIRE_EQUAL(testhandler.get_response_time_acc(), testhandler.get_response_time_max()); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_issue_request_kFound) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + testhandler.test_start(); + for (int i = 1; i < 7; i++) { + unittest::FakeReadoutType elem; + elem.set_timestamp(i); + latency_buffer->write(std::move(elem)); + } + + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_found(), 0); + + testhandler.issue_request(create_request(3, 2)); + + auto start = std::chrono::steady_clock::now(); + while (testhandler.get_handled_requests() < 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (std::chrono::steady_clock::now() - start > std::chrono::seconds(6)) { + BOOST_FAIL("Timeout: handler never processed the request"); + } + } + BOOST_REQUIRE_EQUAL(testhandler.get_handled_requests(), 1); + BOOST_REQUIRE(testhandler.get_response_time_acc() > 0); + BOOST_REQUIRE_EQUAL(testhandler.get_response_time_acc(), testhandler.get_response_time_min()); + BOOST_REQUIRE_EQUAL(testhandler.get_response_time_acc(), testhandler.get_response_time_max()); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_found(), 1); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_issue_request_kNotFound) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + testhandler.test_start(); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_bad(), 0); + + testhandler.issue_request(create_request(3, 2)); + + auto start = std::chrono::steady_clock::now(); + while (testhandler.get_handled_requests() < 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (std::chrono::steady_clock::now() - start > std::chrono::seconds(6)) { + BOOST_FAIL("Timeout: handler never processed the request"); + } + } + BOOST_REQUIRE_EQUAL(testhandler.get_handled_requests(), 1); + BOOST_REQUIRE(testhandler.get_response_time_acc() > 0); + BOOST_REQUIRE_EQUAL(testhandler.get_response_time_acc(), testhandler.get_response_time_min()); + BOOST_REQUIRE_EQUAL(testhandler.get_response_time_acc(), testhandler.get_response_time_max()); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_bad(), 1); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_issue_request_kNotYet) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + testhandler.test_start(); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_delayed(), 0); + for (int i = 1; i < 7; i++) { + unittest::FakeReadoutType elem; + elem.set_timestamp(i); + latency_buffer->write(std::move(elem)); + } + + testhandler.issue_request(create_request(8, 1)); + + auto start = std::chrono::steady_clock::now(); + while (testhandler.get_handled_requests() < 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + if (std::chrono::steady_clock::now() - start > std::chrono::seconds(6)) { + BOOST_FAIL("Timeout: handler never processed the request"); + } + } + BOOST_REQUIRE_EQUAL(testhandler.get_handled_requests(), 1); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_delayed(), 1); + BOOST_REQUIRE_EQUAL(testhandler.get_waiting_requests(), 1); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_issue_request_kTooOld) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + testhandler.test_start(); + for (int i = 3; i < 7; i++) { + unittest::FakeReadoutType elem; + elem.set_timestamp(i); + latency_buffer->write(std::move(elem)); + } + + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_old_window(), 0); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_bad(), 0); + testhandler.issue_request(create_request(1, 1)); + + auto start = std::chrono::steady_clock::now(); + while (testhandler.get_handled_requests() < 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + if (std::chrono::steady_clock::now() - start > std::chrono::seconds(6)) { + BOOST_FAIL("Timeout: handler never processed the request"); + } + } + BOOST_REQUIRE_EQUAL(testhandler.get_handled_requests(), 1); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_old_window(), 1); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_bad(), 1); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_issue_request_kPartial) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + testhandler.test_start(); + for (int i = 1; i < 7; i++) { + unittest::FakeReadoutType elem; + elem.set_timestamp(i); + latency_buffer->write(std::move(elem)); + } + + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_delayed(), 0); + testhandler.issue_request(create_request(6, 1)); + + auto start = std::chrono::steady_clock::now(); + while (testhandler.get_handled_requests() < 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + if (std::chrono::steady_clock::now() - start > std::chrono::seconds(6)) { + BOOST_FAIL("Timeout: handler never processed the request"); + } + } + BOOST_REQUIRE_EQUAL(testhandler.get_handled_requests(), 1); + BOOST_REQUIRE_EQUAL(testhandler.get_num_requests_delayed(), 1); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_cleanups) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + testhandler.change_run_marker(true); + testhandler.test_start(); + + for (int i = 1; i < 7; i++) { + unittest::FakeReadoutType elem; + elem.set_timestamp(i); + latency_buffer->write(std::move(elem)); + } + std::thread cleanup_thread([&]() { testhandler.public_periodic_cleanups(); }); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + testhandler.change_run_marker(false); + + cleanup_thread.join(); + + BOOST_REQUIRE_EQUAL(testhandler.get_num_buffer_cleanups(), 1); + BOOST_REQUIRE_EQUAL(testhandler.get_occupancy(), 3); + BOOST_REQUIRE_EQUAL(testhandler.get_pops_count(), 3); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_check_waiting_requests_window_end) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + testhandler.change_run_marker(true); + testhandler.test_start(); + + for (int i = 1; i < 7; i++) { + unittest::FakeReadoutType elem; + elem.set_timestamp(i); + latency_buffer->write(std::move(elem)); + } + + BOOST_REQUIRE_EQUAL(testhandler.get_waiting_requests(), 0); + + testhandler.issue_request(create_request(6, 1)); + + auto start = std::chrono::steady_clock::now(); + while (testhandler.get_handled_requests() < 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + if (std::chrono::steady_clock::now() - start > std::chrono::seconds(6)) { + BOOST_FAIL("Timeout: handler never processed the request"); + } + } + BOOST_REQUIRE_EQUAL(testhandler.get_waiting_requests(), 1); + + for (int i = 7; i < 9; i++) { + unittest::FakeReadoutType elem; + elem.set_timestamp(i); + latency_buffer->write(std::move(elem)); + } + + std::thread t([&]() { testhandler.public_check_waiting_requests(); }); + std::this_thread::sleep_for(std::chrono::seconds(1)); + testhandler.change_run_marker(false); + t.join(); + + BOOST_REQUIRE_EQUAL(testhandler.get_waiting_requests(), 0); + BOOST_REQUIRE_EQUAL(testhandler.get_handled_requests(), + 2); // first one is issue_request, second one is check_waiting_requests +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_dump_to_buffer) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + std::string data = "data to copy"; + std::string buffer = " "; + + testhandler.public_dump_to_buffer(data.data(), 4, buffer.data(), 0, buffer.size()); + BOOST_REQUIRE_EQUAL(buffer, "data"); +} + +BOOST_AUTO_TEST_CASE(DefaultRequestHandlerModel_create_empty_fragment) +{ + auto latency_buffer = std::make_shared>(); + auto error_registry = std::make_unique(); + unittest::FakeRequestHandlerType> + testhandler(latency_buffer, error_registry); + + auto fragment = testhandler.public_create_empty_fragment(create_request(1, 1)); + BOOST_REQUIRE_EQUAL(fragment->get_header().error_bits, 1); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/unittest/datahandlinglibs_IterableQueueModel_test.cxx b/unittest/datahandlinglibs_IterableQueueModel_test.cxx new file mode 100644 index 0000000..6f910bc --- /dev/null +++ b/unittest/datahandlinglibs_IterableQueueModel_test.cxx @@ -0,0 +1,356 @@ +/** + * @file datahandlinglibs_IterableQueueModel_test.cxx Unit Tests for IterableQueueModel + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +/** + * @brief Name of this test module + */ +#define BOOST_TEST_MODULE datahandlinglibs_IterableQueueModel_test // NOLINT + +#include "boost/test/unit_test.hpp" + +#include "datahandlinglibs/models/IterableQueueModel.hpp" + +#include "appmodel/LatencyBuffer.hpp" +#include "conffwk/Configuration.hpp" +#include "conffwk/DalRegistry.hpp" + +#include + +using namespace dunedaq::datahandlinglibs; + +BOOST_AUTO_TEST_SUITE(datahandlinglibs_IterableQueueModel_test) + +/** + * Tests is_Empty, capacity, read, write, isFull, pop, popFront, flush functions + * and that capacity, occupancy and size is preserved + */ + +BOOST_AUTO_TEST_CASE(IterableQueueModel_initial_state) +{ + int size = 5; + IterableQueueModel queue(size, false, 0, false, 0); + + BOOST_REQUIRE(queue.isEmpty()); + BOOST_REQUIRE(!queue.isFull()); + BOOST_REQUIRE_EQUAL(queue.size(), size); + BOOST_REQUIRE_EQUAL(queue.capacity(), size - 1); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_write_until_full) +{ + int size = 5; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); i++) + BOOST_REQUIRE(queue.write(std::move(i))); + + BOOST_REQUIRE(queue.isFull()); + BOOST_REQUIRE(!queue.write(10)); // Attempt to write when full + BOOST_REQUIRE_EQUAL(queue.occupancy(), queue.capacity()); + BOOST_REQUIRE_EQUAL(queue.capacity(), size - 1); + BOOST_REQUIRE_EQUAL(queue.size(), size); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_read_until_empty) +{ + int size = 5; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); i++) + queue.write(std::move(i)); + + for (std::size_t i = 0; i < queue.capacity(); i++) { + int value_in_queue; + BOOST_REQUIRE(queue.read(value_in_queue)); + BOOST_REQUIRE_EQUAL(i, value_in_queue); + } + + BOOST_REQUIRE(queue.isEmpty()); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_read_after_full_write) +{ + int size = 5; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); ++i) + queue.write(std::move(i)); + + int value; + BOOST_REQUIRE(queue.read(value)); + BOOST_REQUIRE_EQUAL(queue.capacity(), size - 1); + BOOST_REQUIRE_EQUAL(queue.size(), size); + BOOST_REQUIRE(!queue.isFull()); +} + +/** + * Tests write wraps correctly after reading from a full queue + */ +BOOST_AUTO_TEST_CASE(IterableQueueModel_write_wrap) +{ + int size = 5; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); ++i) + queue.write(std::move(i)); + int value; + queue.read(value); + + BOOST_REQUIRE(queue.write(std::move(size - 1))); + BOOST_REQUIRE(queue.isFull()); + BOOST_REQUIRE_EQUAL(queue.occupancy(), queue.capacity()); + BOOST_REQUIRE_EQUAL(queue.capacity(), size - 1); + BOOST_REQUIRE_EQUAL(queue.size(), size); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_popFront) +{ + int size = 5; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); ++i) + queue.write(std::move(i)); + + int full_occupancy = queue.occupancy(); + + queue.popFront(); + BOOST_REQUIRE_EQUAL(queue.occupancy(), full_occupancy - 1); + BOOST_REQUIRE_EQUAL(queue.capacity(), size - 1); + BOOST_REQUIRE_EQUAL(queue.size(), size); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_pop_multiple) +{ + int size = 5; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); ++i) + queue.write(std::move(i)); + + int full_occupancy = queue.occupancy(); + + queue.pop(3); + BOOST_REQUIRE_EQUAL(queue.occupancy(), full_occupancy - 3); + BOOST_REQUIRE_EQUAL(queue.capacity(), size - 1); + BOOST_REQUIRE_EQUAL(queue.size(), size); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_flush_queue) +{ + int size = 5; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); ++i) + queue.write(std::move(i)); + + queue.flush(); + + BOOST_REQUIRE(queue.isEmpty()); + BOOST_REQUIRE_EQUAL(queue.occupancy(), 0); + BOOST_REQUIRE_EQUAL(queue.capacity(), size - 1); + BOOST_REQUIRE_EQUAL(queue.size(), size); +} + +/** + * Tests force pagefault function + */ +BOOST_AUTO_TEST_CASE(IterableQueueModel_force_pagefault) +{ + IterableQueueModel queue(6, false, 0, false, 0); + + queue.write(1); + BOOST_REQUIRE(!queue.isEmpty()); + queue.force_pagefault(); + BOOST_REQUIRE(queue.isEmpty()); +} + +/** + * Tests read wraps correctly + */ +BOOST_AUTO_TEST_CASE(IterableQueueModel_read_wrap) +{ + int size = 3; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); i++) { + queue.write(std::move(i)); + } + + int value_in_queue; + queue.read(value_in_queue); + BOOST_REQUIRE(queue.write(2)); + BOOST_REQUIRE(queue.read(value_in_queue)); + BOOST_REQUIRE(queue.read(value_in_queue)); + BOOST_REQUIRE_EQUAL(2, value_in_queue); + BOOST_REQUIRE(queue.isEmpty()); +} + +/** + * Tests pop wraps correctly + */ +BOOST_AUTO_TEST_CASE(IterableQueueModel_pop_wrap) +{ + int size = 3; + IterableQueueModel queue(size, false, 0, false, 0); + + for (std::size_t i = 0; i < queue.capacity(); i++) { + queue.write(std::move(i)); + } + + queue.popFront(); + BOOST_REQUIRE(queue.write(2)); + queue.popFront(); + queue.popFront(); + BOOST_REQUIRE(queue.isEmpty()); +} + +/** + * Tests mamory allocation + */ +BOOST_AUTO_TEST_CASE(IterableQueueModel_default_constructor) +{ + IterableQueueModel default_queue; + BOOST_REQUIRE_EQUAL(default_queue.size(), 2); + BOOST_REQUIRE(default_queue.isEmpty()); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_allocate_memory_standard_allocation) +{ + IterableQueueModel queue; + queue.allocate_memory(4, false, 0, true, 0); + + BOOST_REQUIRE_EQUAL(queue.size(), 4); + BOOST_REQUIRE_EQUAL(queue.get_alignment_size(), 0); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_allocate_memory_intrinsic_allocator_with_alignment) +{ + IterableQueueModel queue; + queue.allocate_memory(16, false, 0, true, 2); + + BOOST_REQUIRE_EQUAL(queue.size(), 16); + BOOST_REQUIRE_EQUAL(queue.get_alignment_size(), 2); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_allocate_memory_aligned_allocation_without_intrinsic_allocator) +{ + IterableQueueModel queue; + queue.allocate_memory(8, false, 0, false, 4); + + BOOST_REQUIRE_EQUAL(queue.size(), 8); + BOOST_REQUIRE_EQUAL(queue.get_alignment_size(), 4); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_allocate_memory_numa_aware_allocation) +{ + IterableQueueModel queue; + queue.allocate_memory(64, true, 4, false, 0); + + BOOST_REQUIRE_EQUAL(queue.size(), 64); + BOOST_REQUIRE_EQUAL(queue.get_alignment_size(), 0); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_allocate_memory_basic_overload) +{ + IterableQueueModel queue; + queue.allocate_memory(42); + + BOOST_REQUIRE_EQUAL(queue.size(), 42); + BOOST_REQUIRE_EQUAL(queue.get_alignment_size(), 0); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_free_memory) +{ + IterableQueueModel queue; + queue.allocate_memory(42); + queue.free_memory(); + queue.allocate_memory(8); + + BOOST_REQUIRE_EQUAL(queue.size(), 8); +} + +/** + * Tests queue pointers after write and read operations + */ +BOOST_AUTO_TEST_CASE(IterableQueueModel_queue_pointers) +{ + IterableQueueModel queue(5, false, 0, false, 0); + queue.write(0); + queue.write(1); + queue.write(2); + queue.write(3); + + BOOST_REQUIRE_EQUAL(*queue.front(), 0); + BOOST_REQUIRE_EQUAL(*queue.back(), 3); + + BOOST_CHECK_EQUAL(queue.front(), queue.start_of_buffer()); + BOOST_CHECK_EQUAL( + queue.back() + 2, + queue.end_of_buffer()); // capacity is size -1, + one reserved slot + // end of buffer is 2 elements after the last written element when queue is full + + int value; + queue.read(value); + BOOST_REQUIRE_EQUAL(*queue.front(), 1); + BOOST_REQUIRE_EQUAL(queue.front(), queue.start_of_buffer() + 1); +} + +/** + * Tests functionality for iterators + */ + +BOOST_AUTO_TEST_CASE(EmptyQueue_iterators_empty_queue) +{ + IterableQueueModel queue(5, false, 0, false, 0); + auto it = queue.begin(); + BOOST_REQUIRE(it == queue.end()); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_iterators_iterate_without_wrap) +{ + IterableQueueModel queue(5, false, 0, false, 0); + queue.write(0); + queue.write(1); + queue.write(2); + queue.write(3); + + auto it = queue.begin(); + while (it != queue.end()) { + BOOST_REQUIRE(it.good()); + ++it; + } + BOOST_REQUIRE(!it.good()); +} + +BOOST_AUTO_TEST_CASE(IterableQueueModel_iterators_iterate_after_wrap) +{ + IterableQueueModel queue(5, false, 0, false, 0); + queue.write(0); + queue.write(1); + queue.write(2); + queue.write(3); + int value; + queue.read(value); + queue.write(4); + + int last_value; + int i = 1; + + auto it = queue.begin(); + while (it != queue.end()) { + BOOST_REQUIRE(it.good()); + last_value = *it; + BOOST_REQUIRE_EQUAL(last_value, i); + i++; + ++it; + } + BOOST_REQUIRE(!it.good()); + BOOST_REQUIRE(it == queue.end()); + BOOST_REQUIRE_EQUAL(last_value, *queue.back()); +} +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/unittest/datahandlinglibs_ReadoutTypes_test.cxx b/unittest/datahandlinglibs_ReadoutTypes_test.cxx new file mode 100644 index 0000000..33a1267 --- /dev/null +++ b/unittest/datahandlinglibs_ReadoutTypes_test.cxx @@ -0,0 +1,84 @@ +/** + * @file datahandlinglibs_ReadoutTypes_test.cxx Unit Tests for ReadoutTypes + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +/** + * @brief Name of this test module + */ +#define BOOST_TEST_MODULE datahandlinglibs_ReadoutTypes_test // NOLINT + +#include "boost/test/unit_test.hpp" + +#include "datahandlinglibs/ReadoutTypes.hpp" + +#include +#include + +using namespace dunedaq::datahandlinglibs; + +BOOST_AUTO_TEST_SUITE(datahandlinglibs_ReadoutTypes_test) + +/** + * @brief Tests setting and retrieving maximum timestamp and key values. + */ +BOOST_AUTO_TEST_CASE(ReadoutTypes_get_set_timestamp_max) +{ + types::DUMMY_FRAME_STRUCT test; + + /** + * @test ensures the set and get functions for the timestamp works for maximum possible 64-bit unsigned integer + */ + test.set_timestamp(UINT64_MAX); + BOOST_REQUIRE_EQUAL(test.get_timestamp(), UINT64_MAX); + + /** + * @test Tests setting the maximum 64-bit unsigned integer for another_key. + * checks by accessing it directly as there is no getter + */ + test.set_another_key(UINT64_MAX); + BOOST_REQUIRE_EQUAL(test.another_key, UINT64_MAX); +} + +/** + * @brief Tests less than operator + * comparable based on composite key (timestamp + other unique keys) + */ +BOOST_AUTO_TEST_CASE(ReadoutTypes_less_than_operator) +{ + types::DUMMY_FRAME_STRUCT test1, test2; + + test1.set_timestamp(UINT64_MAX); + test1.set_another_key(1000); + + test2.set_timestamp(UINT64_MAX); + test2.set_another_key(500); + + BOOST_REQUIRE_EQUAL(test1 < test2, false); + BOOST_REQUIRE_EQUAL(test2 < test1, true); +} + +/** + * @brief Tests the begin() and end() methods of DUMMY_FRAME_STRUCT. + */ +BOOST_AUTO_TEST_CASE(ReadoutTypes_begin_end) +{ + types::DUMMY_FRAME_STRUCT frame; + + /** + * @test 'begin()' returns a pointer to the frame itself + */ + types::DUMMY_FRAME_STRUCT* fr_begin = frame.begin(); + BOOST_REQUIRE_EQUAL(fr_begin, &frame); + + /** + * @test 'end()' returns a pointer to one past the frame + */ + types::DUMMY_FRAME_STRUCT* fr_end = frame.end(); + BOOST_REQUIRE_EQUAL(fr_end, &frame + 1); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/unittest/datahandlinglibs_SkiplistLatencyBufferModel_test.cxx b/unittest/datahandlinglibs_SkiplistLatencyBufferModel_test.cxx new file mode 100644 index 0000000..b9f9bb6 --- /dev/null +++ b/unittest/datahandlinglibs_SkiplistLatencyBufferModel_test.cxx @@ -0,0 +1,180 @@ +/** + * @file datahandlinglibs_SkiplistLatencyBufferModel_test.cxx Unit Tests for SkiplistLatencyBufferModel + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +/** + * @brief Name of this test module + */ +#define BOOST_TEST_MODULE datahandlinglibs_SkiplistLatencyBufferModel_test // NOLINT + +#include "boost/test/unit_test.hpp" + +#include "datahandlinglibs/opmon/datahandling_info.pb.h" +#include "datahandlinglibs/models/SkipListLatencyBufferModel.hpp" + +#include "datahandlinglibs/testutils/UnitTestUtilities.hpp" +#include "folly/ConcurrentSkipList.h" + +using namespace dunedaq::datahandlinglibs; + +BOOST_AUTO_TEST_SUITE(datahandlinglibs_SkiplistLatencyBufferModel_test) + +/** + * @brief Tests occupancy, write,put,read, flush funtions functions + */ +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_put) +{ + SkipListLatencyBufferModel skip_list; + + unittest::FakeReadoutType dummy_put; + int size = 5; + + for (int i = 0; i < size; i++) { + dummy_put.set_timestamp(i * i); + BOOST_REQUIRE(skip_list.put(dummy_put)); + } + + BOOST_REQUIRE_EQUAL(skip_list.occupancy(), size); +} + +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_write) +{ + SkipListLatencyBufferModel skip_list; + int size = 5; + + for (int i = 0; i < size; i++) { + unittest::FakeReadoutType dummy_move; + dummy_move.set_timestamp(i * i); + BOOST_REQUIRE(skip_list.write(std::move(dummy_move))); + } + + BOOST_REQUIRE_EQUAL(skip_list.occupancy(), size); +} + +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_pop) +{ + SkipListLatencyBufferModel skip_list; + + unittest::FakeReadoutType dummy_put; + for (int i = 0; i < 10; i++) { + dummy_put.set_timestamp(i * i); + skip_list.put(dummy_put); + } + + skip_list.pop(3); + + BOOST_REQUIRE_EQUAL(skip_list.occupancy(), 7); +} + +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_read) +{ + SkipListLatencyBufferModel skip_list; + + unittest::FakeReadoutType dummy_put; + for (int i = 0; i < 10; i++) { + dummy_put.set_timestamp(i * i); + skip_list.put(dummy_put); + } + + unittest::FakeReadoutType dummy_read; + + BOOST_REQUIRE(skip_list.read(dummy_read)); + BOOST_CHECK_EQUAL(dummy_read.get_timestamp(), 0); + BOOST_REQUIRE_EQUAL(skip_list.occupancy(), 10); +} + +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_flush) +{ + SkipListLatencyBufferModel skip_list; + + unittest::FakeReadoutType dummy_put; + for (int i = 0; i < 10; i++) { + dummy_put.set_timestamp(i * i); + skip_list.put(dummy_put); + } + + skip_list.flush(); + + BOOST_REQUIRE_EQUAL(skip_list.occupancy(), 0); +} + +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_lower_bound) +{ + SkipListLatencyBufferModel skip_list; + unittest::FakeReadoutType dummy_put; + + for (int i = 0; i < 5; i++) { + dummy_put.set_timestamp(i); + skip_list.put(dummy_put); + } + + unittest::FakeReadoutType dummy_bound; + dummy_bound.set_timestamp(4); + auto it_lower = skip_list.lower_bound(dummy_bound); + + unittest::FakeReadoutType& d = *it_lower; + BOOST_REQUIRE_EQUAL(d.get_timestamp(), 4); +} +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_iterators) +{ + SkipListLatencyBufferModel skip_list; + unittest::FakeReadoutType dummy_put; + + for (int i = 0; i < 5; i++) { + dummy_put.set_timestamp(i); + skip_list.put(dummy_put); + } + + auto it = skip_list.begin(); + auto it_end = skip_list.end(); + + int val = 0; + while (it != it_end) { + BOOST_REQUIRE_EQUAL(it->get_timestamp(), val); + BOOST_REQUIRE(it.good()); + ++it; + ++val; + } + BOOST_REQUIRE(!it.good()); +} + +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_front_and_back_pointers) +{ + SkipListLatencyBufferModel skip_list; + unittest::FakeReadoutType dummy_put; + + for (int i = 0; i < 5; i++) { + dummy_put.set_timestamp(i); + skip_list.put(dummy_put); + } + + const unittest::FakeReadoutType* front_ptr = skip_list.front(); + const unittest::FakeReadoutType* back_ptr = skip_list.back(); + + BOOST_REQUIRE(front_ptr != nullptr); + BOOST_REQUIRE(back_ptr != nullptr); + + BOOST_REQUIRE_EQUAL(front_ptr->get_timestamp(), 0); + BOOST_REQUIRE_EQUAL(back_ptr->get_timestamp(), 4); +} + +BOOST_AUTO_TEST_CASE(SkiplistLatencyBufferModel_get_skiplist) +{ + SkipListLatencyBufferModel skip_list; + unittest::FakeReadoutType dummy_put; + + for (int i = 0; i < 5; i++) { + dummy_put.set_timestamp(i); + skip_list.put(dummy_put); + } + + folly::ConcurrentSkipList::Accessor acc(skip_list.get_skip_list()); + + BOOST_REQUIRE_EQUAL(skip_list.front(), acc.first()); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/unittest/datahandlinglibs_TaskRawDataProcessorModel_test.cxx b/unittest/datahandlinglibs_TaskRawDataProcessorModel_test.cxx new file mode 100644 index 0000000..23fb385 --- /dev/null +++ b/unittest/datahandlinglibs_TaskRawDataProcessorModel_test.cxx @@ -0,0 +1,241 @@ +/** + * @file datahandlinglibs_TaskRawDataProcessorModel_test.cxx Unit Tests for TaskRawDataProcessorModel + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +/** + * @brief Name of this test module + */ +#define BOOST_TEST_MODULE datahandlinglibs_TaskRawDataProcessorModel_test // NOLINT + +#include "boost/test/unit_test.hpp" +#include "datahandlinglibs/models/TaskRawDataProcessorModel.hpp" +#include "datahandlinglibs/opmon/datahandling_info.pb.h" +#include "datahandlinglibs/models/SkipListLatencyBufferModel.hpp" +#include "datahandlinglibs/testutils/UnitTestUtilities.hpp" + +#include "utilities/ReusableThread.hpp" +#include + +#include + +using namespace dunedaq::datahandlinglibs; + +BOOST_AUTO_TEST_SUITE(datahandlinglibs_TaskRawDataProcessorModel_test) + +using ROType = unittest::FakeReadoutType; + +template +void +pre_func_one([[maybe_unused]] Type* elem) +{ + *elem = (*elem) * (*elem); +} +template +void +pre_func_two([[maybe_unused]] Type* elem) +{ + *elem = (*elem) + 10; +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_get_last_timestamp) +{ + auto error_registry = std::make_unique(); + TaskRawDataProcessorModel processor(error_registry, false); + processor.reset_last_daq_time(); + BOOST_REQUIRE_EQUAL(processor.get_last_daq_time(), 0); +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_invoke_preprocess) +{ + auto error_registry = std::make_unique(); + TaskRawDataProcessorModel processor(error_registry, false); + processor.add_preprocess_task(pre_func_one); + processor.add_preprocess_task(pre_func_two); + + ROType* pre_pro_result = new ROType(); + ROType* intended_result = new ROType(); + pre_pro_result->set_timestamp(2); + intended_result->set_timestamp(pre_pro_result->get_timestamp()); + + pre_func_one(intended_result); + pre_func_two(intended_result); + processor.invoke_all_preprocess_functions(pre_pro_result); + + BOOST_REQUIRE_EQUAL(*pre_pro_result, *intended_result); + + delete pre_pro_result; + delete intended_result; +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_launch_preprocess) +{ + auto error_registry = std::make_unique(); + TaskRawDataProcessorModel processor(error_registry, false); + bool pre_func_one_called = false; + bool pre_func_two_called = false; + processor.add_preprocess_task([&pre_func_one_called](ROType* /*elem*/) { pre_func_one_called = true; }); + processor.add_preprocess_task([&pre_func_two_called](ROType* /*elem*/) { pre_func_two_called = true; }); + + ROType* pre_pro_result = new ROType(); + pre_pro_result->set_timestamp(2); + processor.launch_all_preprocess_functions(pre_pro_result); + + BOOST_REQUIRE(pre_func_one_called); + BOOST_REQUIRE(pre_func_two_called); + + delete pre_pro_result; +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_preprocess_item) +{ + auto error_registry = std::make_unique(); + TaskRawDataProcessorModel processor(error_registry, false); + processor.add_preprocess_task(pre_func_one); + processor.add_preprocess_task(pre_func_two); + + ROType* pre_pro_result = new ROType(); + ROType* intended_result = new ROType(); + pre_pro_result->set_timestamp(2); + intended_result->set_timestamp(pre_pro_result->get_timestamp()); + + pre_func_one(intended_result); + pre_func_two(intended_result); + processor.preprocess_item(pre_pro_result); + + BOOST_REQUIRE_EQUAL(*pre_pro_result, *intended_result); + + delete pre_pro_result; + delete intended_result; +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_postprocess_add_task) +{ + auto error_registry = std::make_unique(); + unittest::FakeRawDataProcessorType processor(error_registry, true); + bool post_func_one_called = false; + bool post_func_two_called = false; + + processor.add_postprocess_task([&post_func_one_called](const ROType* /*elem*/) { post_func_one_called = true; }); + processor.add_postprocess_task([&post_func_two_called](const ROType* /*elem*/) { post_func_two_called = true; }); + BOOST_REQUIRE_EQUAL(processor.get_post_threads().size(), 2); + BOOST_REQUIRE_EQUAL(processor.get_post_functions().size(), 2); +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_postprocess_queue) +{ + auto error_registry = std::make_unique(); + unittest::FakeRawDataProcessorType processor(error_registry, true); + + bool post_func_one_called = false; + bool post_func_two_called = false; + + processor.add_postprocess_task([&post_func_one_called](const ROType* /*elem*/) { post_func_one_called = true; }); + processor.add_postprocess_task([&post_func_two_called](const ROType* /*elem*/) { post_func_two_called = true; }); + + processor.make_queues(32); + BOOST_REQUIRE_EQUAL(processor.get_post_queues().size(), 2); +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_add_to_queue) +{ + auto error_registry = std::make_unique(); + unittest::FakeRawDataProcessorType processor(error_registry, true); + + bool post_func_one_called = false; + bool post_func_two_called = false; + + processor.add_postprocess_task([&post_func_one_called](const ROType* /*elem*/) { post_func_one_called = true; }); + processor.add_postprocess_task([&post_func_two_called](const ROType* /*elem*/) { post_func_two_called = true; }); + processor.make_queues(32); + + ROType* post_pro_elem = new ROType(); + post_pro_elem->set_timestamp(2); + processor.postprocess_item(post_pro_elem); + + // check size of each queue + for (unsigned long i = 0; i < processor.get_post_queues().size(); i++) { + BOOST_REQUIRE_EQUAL(processor.get_post_queues()[i]->sizeGuess(), 1); + } +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_postprocess_run_threads_after_queue) +{ + auto error_registry = std::make_unique(); + unittest::FakeRawDataProcessorType processor(error_registry, true); + + bool post_func_one_called = false; + bool post_func_two_called = false; + + processor.add_postprocess_task([&post_func_one_called](const ROType* /*elem*/) { post_func_one_called = true; }); + processor.add_postprocess_task([&post_func_two_called](const ROType* /*elem*/) { post_func_two_called = true; }); + processor.make_queues(32); + ROType* post_pro_elem = new ROType(); + post_pro_elem->set_timestamp(2); + processor.postprocess_item(post_pro_elem); + + processor.public_post_processing_threads(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // if not used main thread finishes early + + BOOST_REQUIRE(post_func_one_called); + BOOST_REQUIRE(post_func_two_called); +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_postprocess_run_threads_before_queue) +{ + auto error_registry = std::make_unique(); + unittest::FakeRawDataProcessorType processor(error_registry, true); + + bool post_func_one_called = false; + bool post_func_two_called = false; + + processor.add_postprocess_task([&post_func_one_called](const ROType* /*elem*/) { post_func_one_called = true; }); + processor.add_postprocess_task([&post_func_two_called](const ROType* /*elem*/) { post_func_two_called = true; }); + processor.make_queues(32); + + processor.run_marker_set(true); // m_run_marker.load() || queue.sizeGuess() > 0 + processor.public_post_processing_threads(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait before adding elements to the queue + + ROType* post_pro_elem = new ROType(); + post_pro_elem->set_timestamp(2); + processor.postprocess_item(post_pro_elem); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // if not used main thread finishes early + processor.run_marker_set(false); + + BOOST_REQUIRE(post_func_one_called); + BOOST_REQUIRE(post_func_two_called); +} + +BOOST_AUTO_TEST_CASE(TaskRawDataProcessorModel_postprocess_fail_postprocess_item) +{ + + setenv("DUNEDAQ_ERS_WARNING", "throw", 1); + + auto error_registry = std::make_unique(); + unittest::FakeRawDataProcessorType processor(error_registry, true); + + bool post_func_one_called = false; + bool post_func_two_called = false; + + processor.add_postprocess_task([&post_func_one_called](const ROType* /*elem*/) { post_func_one_called = true; }); + processor.add_postprocess_task([&post_func_two_called](const ROType* /*elem*/) { post_func_two_called = true; }); + processor.make_queues(2); // size is 2 + + ROType* post_pro_elem = new ROType(); + post_pro_elem->set_timestamp(2); + processor.postprocess_item(post_pro_elem); + + // when queue is full write fails + ROType* fail_elem = new ROType(); + fail_elem->set_timestamp(4); + + BOOST_CHECK_THROW(processor.postprocess_item(fail_elem), std::exception); +} + +BOOST_AUTO_TEST_SUITE_END()