diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 3f3c47246e4..0724d32910b 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -778,7 +778,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva F(type_wait_on_join_build, {"type", "wait_on_join_build"}), \ F(type_wait_on_join_probe, {"type", "wait_on_join_probe"}), \ F(type_wait_on_result_queue_write, {"type", "wait_on_result_queue_write"}), \ - F(type_type_wait_on_cte_read, {"type", "type_wait_on_cte_read"})) \ + F(type_wait_on_cte_read, {"type", "type_wait_on_cte_read"}), \ + F(type_wait_on_cte_io, {"type", "type_wait_on_cte_io"})) \ M(tiflash_pipeline_task_duration_seconds, \ "Bucketed histogram of pipeline task duration in seconds", \ Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \ diff --git a/dbms/src/Core/Spiller.h b/dbms/src/Core/Spiller.h index 435bcdc396a..f5bac67e3fc 100644 --- a/dbms/src/Core/Spiller.h +++ b/dbms/src/Core/Spiller.h @@ -161,7 +161,7 @@ class Spiller /// file is read, otherwise, the spilled file will be released when destruct the spiller. Currently, all the spilled /// file can be released on restore since it is only read once, but in the future if SharedScan(shared cte) need spill, /// the data may be restored multiple times and release_spilled_file_on_restore need to be set to false. - const bool release_spilled_file_on_restore; + bool release_spilled_file_on_restore; bool enable_append_write = false; }; diff --git a/dbms/src/DataStreams/NativeBlockOutputStream.cpp b/dbms/src/DataStreams/NativeBlockOutputStream.cpp index 4c981c06336..960d7a7cb8b 100644 --- a/dbms/src/DataStreams/NativeBlockOutputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockOutputStream.cpp @@ -140,5 +140,4 @@ void NativeBlockOutputStream::write(const Block & block) } } } - } // namespace DB diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 275670f9c1a..6890df7849d 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -151,6 +151,7 @@ void updateSettingsFromTiDB(const grpc::ServerContext * grpc_context, ContextPtr std::make_pair("tidb_max_bytes_before_tiflash_external_join", "max_bytes_before_external_join"), std::make_pair("tidb_max_bytes_before_tiflash_external_group_by", "max_bytes_before_external_group_by"), std::make_pair("tidb_max_bytes_before_tiflash_external_sort", "max_bytes_before_external_sort"), + std::make_pair("tidb_max_bytes_before_tiflash_cte_spill", "max_bytes_before_cte_spill"), std::make_pair("tiflash_mem_quota_query_per_node", "max_memory_usage"), std::make_pair("tiflash_query_spill_ratio", "auto_memory_revoke_trigger_threshold"), std::make_pair("tiflash_use_hash_join_v2", "enable_hash_join_v2"), @@ -188,6 +189,11 @@ void updateSettingsForAutoSpill(ContextPtr & context, const LoggerPtr & log) need_log_warning = true; context->setSetting("max_bytes_before_external_join", "0"); } + if (context->getSettingsRef().max_bytes_before_cte_spill > 0) + { + need_log_warning = true; + context->setSetting("max_bytes_before_cte_spill", "0"); + } if (need_log_warning) LOG_WARNING(log, "auto spill is enabled, so per operator's memory threshold is disabled"); } diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index e9c0e864b32..f7194efbde2 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -43,7 +43,6 @@ #include #include - namespace DB { namespace ErrorCodes diff --git a/dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp b/dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp index 31724125e36..6dfe93cfaef 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp @@ -40,7 +40,8 @@ WaitReactor::WaitReactor(TaskScheduler & scheduler_) GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_join_probe).Set(0); GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_result_queue_write).Set(0); GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_shared_queue_write).Set(0); - GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_type_wait_on_cte_read).Set(0); + GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_cte_read).Set(0); + GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_cte_io).Set(0); thread = std::thread(&WaitReactor::loop, this); } diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h index a17fe2d4ab5..9904aa1ae98 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h @@ -58,7 +58,10 @@ class PipeConditionVariable GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_shared_queue_write).Increment(change); break; case NotifyType::WAIT_ON_CTE_READ: - GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_type_wait_on_cte_read).Increment(change); + GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_cte_read).Increment(change); + break; + case NotifyType::WAIT_ON_CTE_IO: + GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_cte_io).Increment(change); break; case NotifyType::WAIT_ON_NOTHING: throw Exception("task notify type should be set before register or notify"); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index 35103787b90..8b525b662c1 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -59,6 +59,7 @@ enum class NotifyType WAIT_ON_JOIN_PROBE_FINISH, WAIT_ON_RESULT_QUEUE_WRITE, WAIT_ON_CTE_READ, + WAIT_ON_CTE_IO, WAIT_ON_NOTHING, }; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalCTESink.cpp b/dbms/src/Flash/Planner/Plans/PhysicalCTESink.cpp index 72b334d3de6..12caf8d5073 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalCTESink.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalCTESink.cpp @@ -52,6 +52,23 @@ void PhysicalCTESink::buildPipelineExecGroupImpl( builder.setSinkOp(std::make_unique(exec_context, log->identifier(), cte, id)); id++; }); + + const Settings & settings = context.getSettingsRef(); + SpillConfig spill_config( + context.getTemporaryPath(), + " ", + settings.max_cached_data_bytes_in_spiller, + settings.max_spilled_rows_per_file, + settings.max_spilled_bytes_per_file, + context.getFileProvider(), + settings.max_threads, + settings.max_block_size); + + cte->initCTESpillContextAndPartitionConfig( + spill_config, + group_builder.getCurrentHeader(), + settings.max_bytes_before_cte_spill, + context); } void PhysicalCTESink::finalizeImpl(const Names & parent_require) diff --git a/dbms/src/Interpreters/CTESpillContext.cpp b/dbms/src/Interpreters/CTESpillContext.cpp new file mode 100644 index 00000000000..c4cce759319 --- /dev/null +++ b/dbms/src/Interpreters/CTESpillContext.cpp @@ -0,0 +1,56 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include + +namespace DB +{ +Int64 CTESpillContext::triggerSpillImpl(Int64 expected_released_memories) +{ + size_t partition_num = this->partitions.size(); + std::vector> part_idx_with_mem_usages; + part_idx_with_mem_usages.reserve(partition_num); + for (size_t i = 0; i < partition_num; i++) + part_idx_with_mem_usages.push_back(std::make_pair(this->partitions[i]->memory_usage.load(), i)); + + std::sort( + part_idx_with_mem_usages.begin(), + part_idx_with_mem_usages.end(), + [](const std::pair & l, const std::pair & r) { return l.first > r.first; }); + + for (const auto & item : part_idx_with_mem_usages) + { + auto memory_usage = item.first; + const std::shared_ptr & partition = partitions[item.second]; + std::lock_guard aux_lock(*(partition->aux_lock)); + if (partition->status != CTEPartitionStatus::NORMAL) + continue; + + if (memory_usage == 0) + continue; + + partition->status = CTEPartitionStatus::NEED_SPILL; + + expected_released_memories = std::max(expected_released_memories - memory_usage, 0); + if (expected_released_memories <= 0) + return expected_released_memories; + } + return expected_released_memories; +} +} // namespace DB diff --git a/dbms/src/Interpreters/CTESpillContext.h b/dbms/src/Interpreters/CTESpillContext.h new file mode 100644 index 00000000000..12bb3f81dca --- /dev/null +++ b/dbms/src/Interpreters/CTESpillContext.h @@ -0,0 +1,55 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +class CTESpillContext final : public OperatorSpillContext +{ +public: + CTESpillContext( + UInt64 operator_spill_threshold_, + const String & query_id_and_cte_id_, + std::vector> partitions_) + : OperatorSpillContext(operator_spill_threshold_, "cte", Logger::get(query_id_and_cte_id_)) + , partitions(partitions_) + {} + + bool supportAutoTriggerSpill() const override { return true; } + Int64 triggerSpillImpl(Int64 expected_released_memories) override; + LoggerPtr getLog() const { return this->log; } + +protected: + Int64 getTotalRevocableMemoryImpl() override + { + Int64 total_memory = 0; + for (auto & partition : this->partitions) + total_memory += partition->memory_usage.load(); + return total_memory; + } + +private: + std::vector> partitions; +}; +} // namespace DB diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index c18ba194308..39f88275970 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -313,6 +313,7 @@ struct Settings M(SettingUInt64, async_cqs, 1, "grpc async cqs") \ M(SettingUInt64, preallocated_request_count_per_poller, 20, "grpc preallocated_request_count_per_poller") \ M(SettingUInt64, max_bytes_before_external_join, 0, "max bytes used by join before spill, 0 as the default value, 0 means no limit") \ + M(SettingUInt64, max_bytes_before_cte_spill, 0, "max bytes used by cte before spill, 0 as the default value, 0 means no limit") \ M(SettingInt64, join_restore_concurrency, 0, "join restore concurrency, negative value means restore join serially, 0 means TiFlash choose restore concurrency automatically, 0 as the default value") \ M(SettingUInt64, max_cached_data_bytes_in_spiller, 1024ULL * 1024 * 20, "Max cached data bytes in spiller before spilling, 20 MB as the default value, 0 means no limit") \ M(SettingUInt64, max_spilled_rows_per_file, 200000, "Max spilled data rows per spill file, 200000 as the default value, 0 means no limit.") \ diff --git a/dbms/src/Operators/CTE.cpp b/dbms/src/Operators/CTE.cpp index c706a01ffde..48b42d561ac 100644 --- a/dbms/src/Operators/CTE.cpp +++ b/dbms/src/Operators/CTE.cpp @@ -12,70 +12,153 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include -#include #include +#include #include #include +#include namespace DB { +void CTE::initCTESpillContextAndPartitionConfig( + const SpillConfig & spill_config, + const Block & spill_block_schema, + UInt64 operator_spill_threshold, + Context & context) +{ + std::unique_lock lock(this->rw_lock); + if (this->cte_spill_context) + // Initialization has been executed before + return; + + const auto & query_id_and_cte_id = context.getDAGContext()->getQueryIDAndCTEIDForSink(); + this->cte_spill_context + = std::make_shared(operator_spill_threshold, query_id_and_cte_id, this->partitions); + + this->partition_config = std::make_shared( + operator_spill_threshold == 0 ? 0 : operator_spill_threshold / this->partition_num, + spill_config, + spill_block_schema, + query_id_and_cte_id, + this->cte_spill_context->getLog(), + this->partition_num); + + for (auto & partition : this->partitions) + partition->setSharedConfig(this->partition_config); + + context.getDAGContext()->registerOperatorSpillContext(this->cte_spill_context); +} + CTEOpStatus CTE::tryGetBlockAt(size_t cte_reader_id, size_t partition_id, Block & block) { std::shared_lock rw_lock(this->rw_lock); + if unlikely (this->is_cancelled) + return CTEOpStatus::CANCELLED; if unlikely (!this->areAllSinksRegistered()) return CTEOpStatus::SINK_NOT_REGISTERED; - std::lock_guard lock(*this->partitions[partition_id].mu); - - auto status = this->checkBlockAvailableNoLock(cte_reader_id, partition_id); - if (status != CTEOpStatus::OK) + auto status = this->partitions[partition_id]->tryGetBlock(cte_reader_id, block); + std::lock_guard lock(this->mu_test); + switch (status) + { + case CTEOpStatus::OK: + // TODO delete --------------------- + { + auto [iter, _] = this->total_fetch_blocks.insert(std::make_pair(cte_reader_id, 0)); + iter->second.fetch_add(1); + } + { + auto [iter, _] = this->total_fetch_rows.insert(std::make_pair(cte_reader_id, 0)); + iter->second.fetch_add(block.rows()); + } + // --------------------- + return status; + case CTEOpStatus::BLOCK_NOT_AVAILABLE: + return this->is_eof ? CTEOpStatus::END_OF_FILE : CTEOpStatus::BLOCK_NOT_AVAILABLE; + default: return status; + } +} +template +CTEOpStatus CTE::pushBlock(size_t partition_id, const Block & block) +{ + std::shared_lock rw_lock(this->rw_lock); + if unlikely (this->is_cancelled) + return CTEOpStatus::CANCELLED; + + if unlikely (block.rows() == 0) + return CTEOpStatus::OK; - auto idx = this->partitions[partition_id].fetch_block_idxs[cte_reader_id]++; - block = this->partitions[partition_id].blocks[idx].block; - if ((--this->partitions[partition_id].blocks[idx].counter) == 0) - this->partitions[partition_id].blocks[idx].block.clear(); - return status; + // TODO delete ------------------ + this->total_recv_blocks.fetch_add(1); + this->total_recv_rows.fetch_add(block.rows()); + // ------------------ + + return this->partitions[partition_id]->pushBlock(block); } -template -bool CTE::pushBlock(size_t partition_id, const Block & block) +CTEOpStatus CTE::getBlockFromDisk(size_t cte_reader_id, size_t partition_id, Block & block) { { - std::shared_lock rw_lock(this->rw_lock); + std::shared_lock lock(this->rw_lock); if unlikely (this->is_cancelled) - return false; + return CTEOpStatus::CANCELLED; } - if unlikely (block.rows() == 0) - return true; - - std::lock_guard lock(*this->partitions[partition_id].mu); - this->partitions[partition_id].memory_usages += block.bytes(); - this->partitions[partition_id].blocks.push_back( - BlockWithCounter(block, static_cast(this->expected_source_num))); - if constexpr (for_test) - this->partitions[partition_id].cv_for_test->notify_all(); - else - this->partitions[partition_id].pipe_cv->notifyAll(); - return true; + auto ret = this->partitions[partition_id]->getBlockFromDisk(cte_reader_id, block); + // --------------------- + // TODO delete it + std::lock_guard lock(this->mu_test); + if (ret == CTEOpStatus::OK && block) + { + { + auto [iter, _] = this->total_fetch_blocks_in_disk.insert(std::make_pair(cte_reader_id, 0)); + iter->second.fetch_add(1); + } + { + auto [iter, _] = this->total_fetch_rows_in_disk.insert(std::make_pair(cte_reader_id, 0)); + iter->second.fetch_add(block.rows()); + } + } + // --------------------- + return ret; } -void CTE::registerTask(size_t partition_id, TaskPtr && task, NotifyType type) +CTEOpStatus CTE::spillBlocks(size_t partition_id) { - task->setNotifyType(type); - this->partitions[partition_id].pipe_cv->registerTask(std::move(task)); + { + std::shared_lock lock(this->rw_lock); + if unlikely (this->is_cancelled) + return CTEOpStatus::CANCELLED; + } + + return this->partitions[partition_id]->spillBlocks(this->total_spilled_blocks, this->total_spilled_rows); } void CTE::checkBlockAvailableAndRegisterTask(TaskPtr && task, size_t cte_reader_id, size_t partition_id) { std::shared_lock rw_lock(this->rw_lock); - std::lock_guard lock(*this->partitions[partition_id].mu); - CTEOpStatus status = this->checkBlockAvailableNoLock(cte_reader_id, partition_id); + if (this->is_cancelled) + { + this->notifyTaskDirectly(partition_id, std::move(task)); + return; + } + std::lock_guard aux_lock(*(this->partitions[partition_id]->aux_lock)); + if (this->partitions[partition_id]->status == CTEPartitionStatus::IN_SPILLING) + { + this->notifyTaskDirectly(partition_id, std::move(task)); + return; + } + + std::lock_guard lock(*(this->partitions[partition_id]->mu)); + CTEOpStatus status = this->checkBlockAvailableNoLock(cte_reader_id, partition_id); if (status == CTEOpStatus::BLOCK_NOT_AVAILABLE) { this->registerTask(partition_id, std::move(task), NotifyType::WAIT_ON_CTE_READ); @@ -85,13 +168,29 @@ void CTE::checkBlockAvailableAndRegisterTask(TaskPtr && task, size_t cte_reader_ this->notifyTaskDirectly(partition_id, std::move(task)); } +void CTE::checkInSpillingAndRegisterTask(TaskPtr && task, size_t partition_id) +{ + std::shared_lock rw_lock(this->rw_lock); + if (this->is_cancelled) + { + this->notifyTaskDirectly(partition_id, std::move(task)); + return; + } + + std::lock_guard aux_lock(*(this->partitions[partition_id]->aux_lock)); + if (this->partitions[partition_id]->status == CTEPartitionStatus::IN_SPILLING) + this->registerTask(partition_id, std::move(task), NotifyType::WAIT_ON_CTE_IO); + else + this->notifyTaskDirectly(partition_id, std::move(task)); +} + CTEOpStatus CTE::checkBlockAvailableForTest(size_t cte_reader_id, size_t partition_id) { std::shared_lock rw_lock(this->rw_lock); - std::lock_guard lock(*this->partitions[partition_id].mu); + std::lock_guard lock(*this->partitions[partition_id]->mu); return this->checkBlockAvailableNoLock(cte_reader_id, partition_id); } -template bool CTE::pushBlock(size_t, const Block &); -template bool CTE::pushBlock(size_t, const Block &); +template CTEOpStatus CTE::pushBlock(size_t, const Block &); +template CTEOpStatus CTE::pushBlock(size_t, const Block &); } // namespace DB diff --git a/dbms/src/Operators/CTE.h b/dbms/src/Operators/CTE.h index 5aa0154c20d..ca8439850d2 100644 --- a/dbms/src/Operators/CTE.h +++ b/dbms/src/Operators/CTE.h @@ -17,9 +17,13 @@ #include #include #include +#include +#include +#include #include #include +#include #include #include #include @@ -28,6 +32,15 @@ namespace DB { +// TODO delete +inline String genInfo(const String & name, const std::map & data) +{ + String info = fmt::format("{}: ", name); + for (const auto & item : data) + info = fmt::format("{}, <{}, {}>", info, item.first, item.second.load()); + return info; +} + class CTE { public: @@ -38,10 +51,11 @@ class CTE { for (size_t i = 0; i < this->partition_num; i++) { - this->partitions.push_back(CTEPartition()); - this->partitions.back().fetch_block_idxs.resize(this->expected_source_num, 0); - this->partitions.back().mu = std::make_unique(); - this->partitions.back().pipe_cv = std::make_unique(); + this->partitions.push_back(std::make_shared(i, expected_source_num_)); + for (size_t cte_reader_id = 0; cte_reader_id < expected_source_num_; cte_reader_id++) + this->partitions.back()->fetch_block_idxs.insert(std::make_pair(cte_reader_id, 0)); + this->partitions.back()->mu = std::make_unique(); + this->partitions.back()->pipe_cv = std::make_unique(); } } @@ -49,11 +63,62 @@ class CTE { for (size_t i = 0; i < this->partition_num; i++) { - this->partitions[i].mu_for_test = std::make_unique(); - this->partitions[i].cv_for_test = std::make_unique(); + this->partitions[i]->mu_for_test = std::make_unique(); + this->partitions[i]->cv_for_test = std::make_unique(); } } + // ------------------------ + // TODO remove, for test + std::mutex mu_test; + std::atomic_size_t total_recv_blocks = 0; + std::atomic_size_t total_recv_rows = 0; + std::atomic_size_t total_spilled_blocks = 0; + std::atomic_size_t total_spilled_rows = 0; + std::map total_fetch_blocks; + std::map total_fetch_rows; + std::map total_fetch_blocks_in_disk; + std::map total_fetch_rows_in_disk; + // ------------------------ + + ~CTE() + { + // TODO delete --------------- + String info; + info = fmt::format( + "total_recv_blocks: {}, total_recv_rows: {}, total_spilled_blocks: {}, total_spilled_rows: {}, ", + total_recv_blocks.load(), + total_recv_rows.load(), + total_spilled_blocks.load(), + total_spilled_rows.load()); + info = fmt::format("{} | {}", info, genInfo("total_fetch_blocks", this->total_fetch_blocks)); + info = fmt::format("{} | {}", info, genInfo("total_fetch_rows", this->total_fetch_rows)); + info = fmt::format("{} | {}", info, genInfo("total_fetch_blocks_in_disk", this->total_fetch_blocks_in_disk)); + info = fmt::format("{} | {}", info, genInfo("total_fetch_rows_in_disk", this->total_fetch_rows_in_disk)); + + auto * log = &Poco::Logger::get("LRUCache"); + LOG_INFO(log, fmt::format("xzxdebug CTE {}", info)); + + for (auto & p : this->partitions) + p->debugOutput(); + // TODO --------------- + } + + void initCTESpillContextAndPartitionConfig( + const SpillConfig & spill_config, + const Block & spill_block_schema, + UInt64 operator_spill_threshold, + Context & context); + + void checkPartitionNum(size_t partition_num) const + { + RUNTIME_CHECK_MSG( + this->partition_num == partition_num, + "expect partition num: {}, actual: {}", + this->partition_num, + partition_num); + } + size_t getCTEReaderID() { std::unique_lock lock(this->rw_lock); @@ -62,15 +127,12 @@ class CTE "next_cte_reader_id: {}, expected_source_num: {}", this->next_cte_reader_id, this->expected_source_num); - auto cte_reader_id = this->next_cte_reader_id; - this->next_cte_reader_id++; - return cte_reader_id; + return this->next_cte_reader_id++; } CTEOpStatus tryGetBlockAt(size_t cte_reader_id, size_t partition_id, Block & block); - template - bool pushBlock(size_t partition_id, const Block & block); + CTEOpStatus pushBlock(size_t partition_id, const Block & block); template void notifyEOF() { @@ -88,13 +150,23 @@ class CTE return this->err_msg; } - void checkBlockAvailableAndRegisterTask(TaskPtr && task, size_t cte_reader_id, size_t partition_id); CTEOpStatus checkBlockAvailableForTest(size_t cte_reader_id, size_t partition_id); - void registerTask(size_t partition_id, TaskPtr && task, NotifyType type); + void checkBlockAvailableAndRegisterTask(TaskPtr && task, size_t cte_reader_id, size_t partition_id); + void checkInSpillingAndRegisterTask(TaskPtr && task, size_t partition_id); + + CTEOpStatus getBlockFromDisk(size_t cte_reader_id, size_t partition_id, Block & block); + CTEOpStatus spillBlocks(size_t partition_id); + + void registerTask(size_t partition_id, TaskPtr && task, NotifyType type) + { + task->setNotifyType(type); + this->partitions[partition_id]->pipe_cv->registerTask(std::move(task)); + } + void notifyTaskDirectly(size_t partition_id, TaskPtr && task) { - this->partitions[partition_id].pipe_cv->notifyTaskDirectly(std::move(task)); + this->partitions[partition_id]->pipe_cv->notifyTaskDirectly(std::move(task)); } void addResp(const tipb::SelectResponse & resp) @@ -133,6 +205,8 @@ class CTE } } + LoggerPtr getLog() const { return this->partition_config->log; } + void checkSourceConcurrency(size_t concurrency) const { RUNTIME_CHECK_MSG( @@ -182,14 +256,17 @@ class CTE return this->checkBlockAvailableImpl(cte_reader_id, partition_id); } - CTEPartition & getPartitionForTest(size_t partition_idx) { return this->partitions[partition_idx]; } + std::shared_ptr & getPartitionForTest(size_t partition_idx) + { + return this->partitions[partition_idx]; + } private: template CTEOpStatus checkBlockAvailableImpl(size_t cte_reader_id, size_t partition_id) { std::shared_lock cte_lock(this->rw_lock, std::defer_lock); - std::unique_lock partition_lock(*(this->partitions[partition_id].mu), std::defer_lock); + std::unique_lock partition_lock(*(this->partitions[partition_id]->mu), std::defer_lock); if constexpr (need_lock) { @@ -200,11 +277,11 @@ class CTE if unlikely (this->is_cancelled) return CTEOpStatus::CANCELLED; - if (this->partitions[partition_id].blocks.size() - <= this->partitions[partition_id].fetch_block_idxs[cte_reader_id]) - return this->is_eof ? CTEOpStatus::END_OF_FILE : CTEOpStatus::BLOCK_NOT_AVAILABLE; + if (this->partitions[partition_id]->isBlockAvailableInDiskNoLock(cte_reader_id) + || this->partitions[partition_id]->isBlockAvailableInMemoryNoLock(cte_reader_id)) + return CTEOpStatus::OK; - return CTEOpStatus::OK; + return this->is_eof ? CTEOpStatus::END_OF_FILE : CTEOpStatus::BLOCK_NOT_AVAILABLE; } Int32 getTotalExitNumNoLock() const noexcept { return this->sink_exit_num + this->source_exit_num; } @@ -230,14 +307,14 @@ class CTE for (auto & partition : this->partitions) { if constexpr (for_test) - partition.cv_for_test->notify_all(); + partition->cv_for_test->notify_all(); else - partition.pipe_cv->notifyAll(); + partition->pipe_cv->notifyAll(); } } const size_t partition_num; - std::vector partitions; + std::vector> partitions; std::shared_mutex rw_lock; size_t next_cte_reader_id = 0; @@ -255,5 +332,26 @@ class CTE Int32 sink_exit_num = 0; Int32 source_exit_num = 0; + + std::shared_ptr cte_spill_context; + std::shared_ptr partition_config; +}; + +class CTEIONotifier : public NotifyFuture +{ +public: + CTEIONotifier(std::shared_ptr cte_, size_t partition_id_) + : cte(cte_) + , partition_id(partition_id_) + {} + + void registerTask(TaskPtr && task) override + { + this->cte->checkInSpillingAndRegisterTask(std::move(task), this->partition_id); + } + +private: + std::shared_ptr cte; + size_t partition_id; }; } // namespace DB diff --git a/dbms/src/Operators/CTEPartition.cpp b/dbms/src/Operators/CTEPartition.cpp new file mode 100644 index 00000000000..85b7e98cfef --- /dev/null +++ b/dbms/src/Operators/CTEPartition.cpp @@ -0,0 +1,294 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include + +namespace DB +{ +size_t CTEPartition::getIdxInMemoryNoLock(size_t cte_reader_id) +{ + auto idx = this->fetch_block_idxs[cte_reader_id]; + auto total_evicted = this->getTotalEvictedBlockNumNoLock(); + RUNTIME_CHECK_MSG( + idx >= total_evicted, + "partition id: {}, idx: {}, total_evicted: {}", + this->partition_id, + idx, + total_evicted); + return idx - total_evicted; +} + +CTEOpStatus CTEPartition::tryGetBlock(size_t cte_reader_id, Block & block) +{ + std::lock_guard aux_lock(*(this->aux_lock)); + if (this->status == CTEPartitionStatus::IN_SPILLING) + return CTEOpStatus::WAIT_SPILL; + + std::lock_guard lock(*(this->mu)); + + this->putTmpBlocksIntoBlocksNoLock(); + + if (this->isBlockAvailableInDiskNoLock(cte_reader_id)) + return CTEOpStatus::IO_IN; + + if (!this->isBlockAvailableInMemoryNoLock(cte_reader_id)) + return CTEOpStatus::BLOCK_NOT_AVAILABLE; + + const auto idx = this->getIdxInMemoryNoLock(cte_reader_id); + block = this->blocks[idx].block; + assert(this->blocks[idx].counter > 0); + + if ((--this->blocks[idx].counter) == 0) + { + this->memory_usage.fetch_sub(this->blocks[idx].block.bytes()); + this->blocks[idx].block.clear(); + } + // TODO delete ------------- + { + auto [iter, _] = this->fetch_in_mem_idxs.insert(std::make_pair(cte_reader_id, 0)); + iter->second.push_back(this->fetch_block_idxs[cte_reader_id]); + } + // ------------- + this->addIdxNoLock(cte_reader_id); + return CTEOpStatus::OK; +} + +template +CTEOpStatus CTEPartition::pushBlock(const Block & block) +{ + std::unique_lock aux_lock(*(this->aux_lock)); + this->total_blocks.fetch_add(1); // TODO delete + CTEOpStatus ret_status = CTEOpStatus::OK; + if unlikely (this->status != CTEPartitionStatus::NORMAL) + this->tmp_blocks.push_back(block); + + switch (this->status) + { + case CTEPartitionStatus::NEED_SPILL: + return CTEOpStatus::NEED_SPILL; + case CTEPartitionStatus::IN_SPILLING: + return CTEOpStatus::WAIT_SPILL; + case CTEPartitionStatus::NORMAL: + break; + } + + // mu must be held after aux_lock so that we will not be blocked by spill. + // Blocked in cpu pool is very bad. + std::lock_guard lock(*(this->mu)); + + this->memory_usage.fetch_add(block.bytes()); + this->blocks.push_back(BlockWithCounter(block, static_cast(this->expected_source_num))); + if constexpr (for_test) + this->cv_for_test->notify_all(); + else + this->pipe_cv->notifyAll(); + + + if unlikely (this->exceedMemoryThreshold()) + { + this->status = CTEPartitionStatus::NEED_SPILL; + ret_status = CTEOpStatus::NEED_SPILL; + } + return ret_status; +} + +CTEOpStatus CTEPartition::spillBlocks(std::atomic_size_t & block_num, std::atomic_size_t & row_num) +{ + std::unique_lock lock(*(this->mu), std::defer_lock); + { + std::lock_guard aux_lock(*(this->aux_lock)); + switch (this->status) + { + case CTEPartitionStatus::NORMAL: + return CTEOpStatus::OK; + case CTEPartitionStatus::IN_SPILLING: + return CTEOpStatus::WAIT_SPILL; + case CTEPartitionStatus::NEED_SPILL: + this->status = CTEPartitionStatus::IN_SPILLING; + break; + } + + lock.lock(); + this->putTmpBlocksIntoBlocksNoLock(); + } + + if (this->first_log) + { + // TODO remove + LOG_INFO( + this->config->log, + fmt::format( + "xzxdebug Partition {} starts cte spill for {}", + this->partition_id, + this->config->query_id_and_cte_id)); + this->first_log = false; + } + + // Key represents logical index + // Value represents physical index in `this->blocks` + std::map split_idxs; + auto evicted_block_num = this->getTotalEvictedBlockNumNoLock(); + split_idxs.insert(std::make_pair(evicted_block_num, 0)); + for (const auto & [cte_reader_id, logical_idx] : this->fetch_block_idxs) + { + if (logical_idx > evicted_block_num) + split_idxs.insert(std::make_pair(logical_idx, logical_idx - evicted_block_num)); + } + + auto split_iter = split_idxs.begin(); + auto blocks_begin_iter = this->blocks.begin(); + auto total_block_in_memory_num = this->blocks.size(); + while (split_iter != split_idxs.end()) + { + // No more blocks can be spilled + if (split_iter->second == this->blocks.size()) + break; + + auto next_iter = std::next(split_iter); + + Blocks spilled_blocks; + auto iter = blocks_begin_iter + split_iter->second; + decltype(iter) end_iter; + if (next_iter == split_idxs.end() || next_iter->second >= total_block_in_memory_num) + { + this->spill_ranges.push_back( + std::make_pair(split_iter->first, this->blocks.size() - split_iter->second + split_iter->first)); + end_iter = this->blocks.end(); + } + else + { + this->spill_ranges.push_back(std::make_pair(split_iter->first, next_iter->first)); + end_iter = blocks_begin_iter + next_iter->second; + } + + bool counter_is_zero = false; + if (iter->counter == 0) + // In one slice, all blocks' counter should be 0 or not be 0. Check it. + counter_is_zero = true; + + while (iter != end_iter) + { + if (counter_is_zero) + { + RUNTIME_CHECK(iter->counter == 0); + this->total_block_released_num++; + } + else + { + RUNTIME_CHECK(iter->counter != 0); + spilled_blocks.push_back(iter->block); + } + ++iter; + } + + if (counter_is_zero) + { + split_iter = next_iter; + continue; + } + + RUNTIME_CHECK(!spilled_blocks.empty()); + + this->total_block_in_disk_num += spilled_blocks.size(); + + auto spiller = this->config->getSpiller(this->partition_id, this->spillers.size()); + + // TODO delete ----------------- + this->total_spill_blocks.fetch_add(spilled_blocks.size()); + block_num.fetch_add(spilled_blocks.size()); + for (auto & block : spilled_blocks) + row_num.fetch_add(block.rows()); + // ----------------- + + spiller->spillBlocks(std::move(spilled_blocks), this->partition_id); + spiller->finishSpill(); + this->spillers.insert(std::make_pair(split_iter->first, std::move(spiller))); + split_iter = next_iter; + } + + this->blocks.clear(); + this->memory_usage.store(0); + + std::lock_guard aux_lock(*(this->aux_lock)); + this->status = CTEPartitionStatus::NORMAL; + + // Many tasks may be waiting for the finish of spill + this->pipe_cv->notifyAll(); + return CTEOpStatus::OK; +} + +CTEOpStatus CTEPartition::getBlockFromDisk(size_t cte_reader_id, Block & block) +{ + std::unique_lock lock(*(this->mu), std::defer_lock); + { + std::lock_guard aux_lock(*(this->aux_lock)); + if (this->status == CTEPartitionStatus::IN_SPILLING) + return CTEOpStatus::WAIT_SPILL; + + lock.lock(); + } + + RUNTIME_CHECK_MSG(this->isSpillTriggeredNoLock(), "Spill should be triggered"); + RUNTIME_CHECK_MSG(this->isBlockAvailableInDiskNoLock(cte_reader_id), "Requested block is not in disk"); + + bool retried = false; + while (true) + { + auto [iter, _] = this->cte_reader_restore_streams.insert(std::make_pair(cte_reader_id, nullptr)); + if (iter->second == nullptr) + { + auto spiller_iter = this->spillers.find(this->fetch_block_idxs[cte_reader_id]); + if (spiller_iter == this->spillers.end()) + // All blocks in disk have been consumed + return CTEOpStatus::OK; + + auto streams = spiller_iter->second->restoreBlocks(this->partition_id, 1); + RUNTIME_CHECK(streams.size() == 1); + iter->second = streams[0]; + iter->second->readPrefix(); + } + + block = iter->second->read(); + if (!block) + { + RUNTIME_CHECK(!retried); + + iter->second->readSuffix(); + iter->second = nullptr; + retried = true; + continue; + } + + // TODO delete ------------- + { + auto [iter, _] = this->fetch_in_disk_idxs.insert(std::make_pair(cte_reader_id, 0)); + iter->second.push_back(this->fetch_block_idxs[cte_reader_id]); + } + // ------------- + this->addIdxNoLock(cte_reader_id); + break; + }; + + return CTEOpStatus::OK; +} + +template CTEOpStatus CTEPartition::pushBlock(const Block &); +template CTEOpStatus CTEPartition::pushBlock(const Block &); +} // namespace DB diff --git a/dbms/src/Operators/CTEPartition.h b/dbms/src/Operators/CTEPartition.h index c5eff67492d..902369acab3 100644 --- a/dbms/src/Operators/CTEPartition.h +++ b/dbms/src/Operators/CTEPartition.h @@ -14,23 +14,89 @@ #pragma once +#include +#include +#include #include #include -#include +#include +#include #include +#include +#include namespace DB { +enum CTEPartitionStatus +{ + NORMAL = 0, + NEED_SPILL, + IN_SPILLING, +}; + enum class CTEOpStatus { OK, - BLOCK_NOT_AVAILABLE, + BLOCK_NOT_AVAILABLE, // It means that we do not have specified block so far + WAIT_SPILL, + IO_IN, + NEED_SPILL, END_OF_FILE, CANCELLED, SINK_NOT_REGISTERED }; +struct CTEPartitionSharedConfig +{ + CTEPartitionSharedConfig( + size_t memory_threshold_, + SpillConfig spill_config_, + Block spill_block_schema_, + String query_id_and_cte_id_, + LoggerPtr log_, + size_t partition_num_) + : memory_threshold(memory_threshold_) + , spill_config(spill_config_) + , spill_block_schema(spill_block_schema_) + , query_id_and_cte_id(query_id_and_cte_id_) + , log(log_) + , partition_num(partition_num_) + { + auto * log = &Poco::Logger::get("LRUCache"); + LOG_INFO(log, fmt::format("xzxdebug memory threshold is set for {}", this->memory_threshold)); + } + + SpillerPtr getSpiller(size_t partition_id, size_t spill_id) + { + SpillConfig config( + this->spill_config.spill_dir, + fmt::format("cte_spill_{}_{}", partition_id, spill_id), + this->spill_config.max_cached_data_bytes_in_spiller, + this->spill_config.max_spilled_rows_per_file, + this->spill_config.max_spilled_bytes_per_file, + this->spill_config.file_provider, + this->spill_config.for_all_constant_max_streams, + this->spill_config.for_all_constant_block_size); + + return std::make_unique( + config, + false, + this->partition_num, + this->spill_block_schema, + this->log, + 1, + false); + } + + size_t memory_threshold; + SpillConfig spill_config; + Block spill_block_schema; + String query_id_and_cte_id; + LoggerPtr log; + size_t partition_num; +}; + struct BlockWithCounter { BlockWithCounter(const Block & block_, Int16 counter_) @@ -43,12 +109,135 @@ struct BlockWithCounter struct CTEPartition { + CTEPartition(size_t partition_id_, size_t expected_source_num_) + : partition_id(partition_id_) + , aux_lock(std::make_unique()) + , status(CTEPartitionStatus::NORMAL) + , mu(std::make_unique()) + , pipe_cv(std::make_unique()) + , expected_source_num(expected_source_num_) + {} + + // TODO remove it + void debugOutput() + { + // String total_info = fmt::format("total_blocks: {}, total_spill_blocks: {}", total_blocks, total_spill_blocks); + // String spill_ranges_info = "spill_ranges: "; + // for (auto & range : this->spill_ranges) + // spill_ranges_info = fmt::format("{}, ({} - {})", spill_ranges_info, range.first, range.second); + // String fetch_in_mem_idxs_info = "fetch_in_mem_idxs: "; + // for (const auto & v : this->fetch_in_mem_idxs) + // { + // String tmp_info; + // for (auto idx : v.second) + // tmp_info = fmt::format("{}, {}", tmp_info, idx); + // fetch_in_mem_idxs_info = fmt::format("{} <{}: {}>", fetch_in_mem_idxs_info, v.first, tmp_info); + // } + // String fetch_in_disk_idxs_info = "fetch_in_disk_idxs: "; + // for (const auto & v : this->fetch_in_disk_idxs) + // { + // String tmp_info; + // for (auto idx : v.second) + // tmp_info = fmt::format("{}, {}", tmp_info, idx); + // fetch_in_disk_idxs_info = fmt::format("{} <{}: {}>", fetch_in_disk_idxs_info, v.first, tmp_info); + // } + + // auto * log = &Poco::Logger::get("LRUCache"); + // LOG_INFO(log, fmt::format("xzxdebug | {} | {} | {} | {}", total_info, spill_ranges_info, fetch_in_mem_idxs_info, fetch_in_disk_idxs_info)); + } + + void setSharedConfig(std::shared_ptr config) { this->config = config; } + + size_t getIdxInMemoryNoLock(size_t cte_reader_id); + + UInt64 getTotalEvictedBlockNumNoLock() const + { + return this->total_block_released_num + this->total_block_in_disk_num; + } + + bool isBlockAvailableInDiskNoLock(size_t cte_reader_id) + { + auto idx = this->fetch_block_idxs[cte_reader_id]; + RUNTIME_CHECK_MSG( + idx >= this->total_block_released_num, + "partition: {}, idx: {}, total_block_released_num: {}", + this->partition_id, + idx, + this->total_block_released_num); + return idx < this->getTotalEvictedBlockNumNoLock(); + } + + bool isBlockAvailableInMemoryNoLock(size_t cte_reader_id) + { + return this->getIdxInMemoryNoLock(cte_reader_id) < this->blocks.size(); + } + + bool isSpillTriggeredNoLock() const { return this->total_block_in_disk_num > 0; } + void addIdxNoLock(size_t cte_reader_id) { ++this->fetch_block_idxs[cte_reader_id]; } + bool exceedMemoryThreshold() const + { + // config will be nullptr in test + if unlikely (this->config == nullptr) + return false; + + if (this->config->memory_threshold == 0) + return false; + return this->memory_usage.load() >= this->config->memory_threshold; + } + + template + CTEOpStatus pushBlock(const Block & block); + CTEOpStatus tryGetBlock(size_t cte_reader_id, Block & block); + CTEOpStatus spillBlocks(std::atomic_size_t & block_num, std::atomic_size_t & row_num); + CTEOpStatus getBlockFromDisk(size_t cte_reader_id, Block & block); + + // Need aux_lock and mu + void putTmpBlocksIntoBlocksNoLock() + { + for (const auto & block : this->tmp_blocks) + { + this->memory_usage.fetch_add(block.bytes()); + this->blocks.push_back(BlockWithCounter(block, static_cast(this->expected_source_num))); + } + tmp_blocks.clear(); + } + + // ----------- + // TODO delete them + std::atomic_size_t total_blocks = 0; + std::atomic_size_t total_spill_blocks = 0; + + std::vector> spill_ranges; + std::map> fetch_in_mem_idxs; + std::map> fetch_in_disk_idxs; + + bool first_log = true; + // ----------- + + size_t total_byte_usage = 0; + + size_t partition_id; + + // Protect `status` and `tmp_blocks` variables + std::unique_ptr aux_lock; + CTEPartitionStatus status; + Blocks tmp_blocks; + std::unique_ptr mu; std::vector blocks; - std::vector fetch_block_idxs; - size_t memory_usages = 0; + std::unordered_map fetch_block_idxs; std::unique_ptr pipe_cv; + std::atomic_size_t memory_usage = 0; + const size_t expected_source_num; + + std::unordered_map spillers; + std::unordered_map cte_reader_restore_streams; + UInt64 total_block_in_disk_num = 0; + UInt64 total_block_released_num = 0; + + std::shared_ptr config; + std::unique_ptr mu_for_test; std::unique_ptr cv_for_test; }; diff --git a/dbms/src/Operators/CTEReader.cpp b/dbms/src/Operators/CTEReader.cpp index 873a407d403..30f4ac509c9 100644 --- a/dbms/src/Operators/CTEReader.cpp +++ b/dbms/src/Operators/CTEReader.cpp @@ -31,18 +31,47 @@ CTEOpStatus CTEReader::fetchNextBlock(size_t source_id, Block & block) if (this->resp.execution_summaries_size() == 0) this->cte->tryToGetResp(this->resp); } - default: + case CTEOpStatus::WAIT_SPILL: + case CTEOpStatus::NEED_SPILL: + case CTEOpStatus::IO_IN: + case CTEOpStatus::SINK_NOT_REGISTERED: + case CTEOpStatus::BLOCK_NOT_AVAILABLE: + case CTEOpStatus::CANCELLED: + // TODO merge return ret; + case CTEOpStatus::OK: + // TODO remove + this->total_fetch_blocks.fetch_add(1); + this->total_fetch_rows.fetch_add(block.rows()); + this->total_fetch_from_mem.fetch_add(1); + return ret; + } + throw Exception("Should not reach here"); +} + +CTEOpStatus CTEReader::fetchBlockFromDisk(size_t source_id, Block & block) +{ + bool expected_bool = false; + if (this->is_first_log.compare_exchange_weak(expected_bool, true)) + LOG_INFO(this->cte->getLog(), "Begin restore data from disk for cte."); + auto ret = this->cte->getBlockFromDisk(this->cte_reader_id, source_id, block); + if (block) + { + // TODO remove + this->total_fetch_blocks.fetch_add(1); + this->total_fetch_rows.fetch_add(block.rows()); + this->total_fetch_from_disk.fetch_add(1); } + return ret; } CTEOpStatus CTEReader::waitForBlockAvailableForTest(size_t partition_idx) { auto & partition = this->cte->getPartitionForTest(partition_idx); - std::unique_lock lock(*(partition.mu_for_test)); + std::unique_lock lock(*(partition->mu_for_test)); while (true) { - partition.cv_for_test->wait(lock); + partition->cv_for_test->wait(lock); auto status = this->cte->checkBlockAvailable(this->cte_reader_id, partition_idx); switch (status) { diff --git a/dbms/src/Operators/CTEReader.h b/dbms/src/Operators/CTEReader.h index 40293079ade..00ec3a4c454 100644 --- a/dbms/src/Operators/CTEReader.h +++ b/dbms/src/Operators/CTEReader.h @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -39,6 +40,14 @@ class CTEReader RUNTIME_CHECK(cte); } + // TODO remove -------------- + std::atomic_size_t total_fetch_blocks = 0; + std::atomic_size_t total_fetch_rows = 0; + + std::atomic_size_t total_fetch_from_mem = 0; + std::atomic_size_t total_fetch_from_disk = 0; + // -------------- + // For Test CTEReader(const String & query_id_and_cte_id_, CTEManager * cte_manager_, std::shared_ptr cte_) : query_id_and_cte_id(query_id_and_cte_id_) @@ -51,9 +60,22 @@ class CTEReader { this->cte.reset(); this->cte_manager->releaseCTEBySource(this->query_id_and_cte_id); + + // TODO remove it + auto * log = &Poco::Logger::get("LRUCache"); + LOG_INFO( + log, + fmt::format( + "xzxdebug CTEReader {} fb: {} fr: {}, from mem: {}, from disk: {}", + this->cte_reader_id, + this->total_fetch_blocks.load(), + this->total_fetch_rows.load(), + this->total_fetch_from_mem.load(), + this->total_fetch_from_disk.load())); } CTEOpStatus fetchNextBlock(size_t source_id, Block & block); + CTEOpStatus fetchBlockFromDisk(size_t source_id, Block & block); bool getResp(tipb::SelectResponse & resp) { @@ -81,5 +103,7 @@ class CTEReader std::mutex mu; bool resp_fetched = false; tipb::SelectResponse resp; + + std::atomic_bool is_first_log = false; }; } // namespace DB diff --git a/dbms/src/Operators/CTESinkOp.cpp b/dbms/src/Operators/CTESinkOp.cpp index e3047844a33..88174fc583c 100644 --- a/dbms/src/Operators/CTESinkOp.cpp +++ b/dbms/src/Operators/CTESinkOp.cpp @@ -12,8 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include +#include + +#include namespace DB { @@ -28,8 +33,39 @@ OperatorStatus CTESinkOp::writeImpl(Block && block) return OperatorStatus::FINISHED; this->total_rows += block.rows(); - if (this->cte->pushBlock(this->id, block)) + auto status = this->cte->pushBlock(this->id, block); + switch (status) + { + case CTEOpStatus::WAIT_SPILL: + // CTE is spilling blocks to disk, we need to wait the finish of spill + setNotifyFuture(&(this->io_notifier)); + return OperatorStatus::WAIT_FOR_NOTIFY; + case CTEOpStatus::NEED_SPILL: + return OperatorStatus::IO_OUT; + case CTEOpStatus::OK: + return OperatorStatus::NEED_INPUT; + case CTEOpStatus::CANCELLED: + return OperatorStatus::CANCELLED; + default: + throw Exception(fmt::format("Get unexpected CTEOpStatus: {}", magic_enum::enum_name(status))); + } +} + +OperatorStatus CTESinkOp::executeIOImpl() +{ + auto status = this->cte->spillBlocks(this->id); + switch (status) + { + case CTEOpStatus::OK: return OperatorStatus::NEED_INPUT; - return OperatorStatus::CANCELLED; + case CTEOpStatus::WAIT_SPILL: + // CTE is spilling blocks to disk, we need to wait the finish of spill + setNotifyFuture(&(this->io_notifier)); + return OperatorStatus::WAIT_FOR_NOTIFY; + case CTEOpStatus::CANCELLED: + return OperatorStatus::CANCELLED; + default: + throw Exception(fmt::format("Unexpected status {}", magic_enum::enum_name(status))); + } } } // namespace DB diff --git a/dbms/src/Operators/CTESinkOp.h b/dbms/src/Operators/CTESinkOp.h index 01bb8fcd623..f6db5d7cbf4 100644 --- a/dbms/src/Operators/CTESinkOp.h +++ b/dbms/src/Operators/CTESinkOp.h @@ -27,6 +27,7 @@ class CTESinkOp : public SinkOp : SinkOp(exec_context_, req_id) , cte(cte_) , id(id_) + , io_notifier(this->cte, id) {} String getName() const override { return "CTESinkOp"; } @@ -34,10 +35,12 @@ class CTESinkOp : public SinkOp protected: void operateSuffixImpl() override; OperatorStatus writeImpl(Block && block) override; + OperatorStatus executeIOImpl() override; private: std::shared_ptr cte; size_t total_rows = 0; size_t id; + CTEIONotifier io_notifier; }; } // namespace DB diff --git a/dbms/src/Operators/CTESourceOp.cpp b/dbms/src/Operators/CTESourceOp.cpp index 8129c1269bb..f24e91c3219 100644 --- a/dbms/src/Operators/CTESourceOp.cpp +++ b/dbms/src/Operators/CTESourceOp.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -27,6 +28,12 @@ void CTESourceOp::operateSuffixImpl() OperatorStatus CTESourceOp::readImpl(Block & block) { + if unlikely (this->block_from_disk) + { + block.swap(block_from_disk); + return OperatorStatus::HAS_OUTPUT; + } + auto ret = this->cte_reader->fetchNextBlock(this->id, block); switch (ret) { @@ -36,14 +43,42 @@ OperatorStatus CTESourceOp::readImpl(Block & block) case CTEOpStatus::OK: this->total_rows += block.rows(); return OperatorStatus::HAS_OUTPUT; + case CTEOpStatus::IO_IN: + // Expected block is in disk, we need to read it from disk + return OperatorStatus::IO_IN; + case CTEOpStatus::WAIT_SPILL: + // CTE is spilling blocks to disk, we need to wait the finish of spill + DB::setNotifyFuture(&(this->io_notifier)); + return OperatorStatus::WAIT_FOR_NOTIFY; + case CTEOpStatus::CANCELLED: + return OperatorStatus::CANCELLED; case CTEOpStatus::BLOCK_NOT_AVAILABLE: DB::setNotifyFuture(&(this->notifier)); return OperatorStatus::WAIT_FOR_NOTIFY; case CTEOpStatus::SINK_NOT_REGISTERED: this->sw.start(); return OperatorStatus::WAITING; + default: + throw Exception("Should not reach here"); + } +} + +OperatorStatus CTESourceOp::executeIOImpl() +{ + RUNTIME_CHECK(!this->block_from_disk); + auto status = this->cte_reader->fetchBlockFromDisk(this->id, this->block_from_disk); + switch (status) + { + case CTEOpStatus::OK: + return OperatorStatus::HAS_OUTPUT; + case CTEOpStatus::WAIT_SPILL: + // CTE is spilling blocks to disk, we need to wait the finish of spill + DB::setNotifyFuture(&(this->io_notifier)); + return OperatorStatus::WAIT_FOR_NOTIFY; case CTEOpStatus::CANCELLED: - throw Exception(this->cte_reader->getCTE()->getError()); + return OperatorStatus::CANCELLED; + default: + throw Exception(fmt::format("Get unexpected status {}", magic_enum::enum_name(status))); } } } // namespace DB diff --git a/dbms/src/Operators/CTESourceOp.h b/dbms/src/Operators/CTESourceOp.h index ed5312c6a6c..16c8c13ea1d 100644 --- a/dbms/src/Operators/CTESourceOp.h +++ b/dbms/src/Operators/CTESourceOp.h @@ -58,11 +58,12 @@ class CTESourceOp : public SourceOp const NamesAndTypes & schema, const String & query_id_and_cte_id_) : SourceOp(exec_context_, req_id) + , query_id_and_cte_id(query_id_and_cte_id_) , cte_reader(cte_reader_) , io_profile_info(IOProfileInfo::createForRemote(profile_info_ptr, 1)) , id(id_) , notifier(this->cte_reader->getCTE(), this->cte_reader->getID(), this->id) - , query_id_and_cte_id(query_id_and_cte_id_) + , io_notifier(this->cte_reader->getCTE(), this->id) { setHeader(Block(getColumnWithTypeAndName(schema))); } @@ -74,6 +75,7 @@ class CTESourceOp : public SourceOp void operateSuffixImpl() override; OperatorStatus readImpl(Block & block) override; + OperatorStatus executeIOImpl() override; OperatorStatus awaitImpl() override { @@ -88,13 +90,17 @@ class CTESourceOp : public SourceOp } private: + String query_id_and_cte_id; + Block block_from_disk; + std::shared_ptr cte_reader; uint64_t total_rows{}; + IOProfileInfoPtr io_profile_info; tipb::SelectResponse resp; size_t id; CTESourceNotifyFuture notifier; + CTEIONotifier io_notifier; Stopwatch sw; - String query_id_and_cte_id; }; } // namespace DB diff --git a/dbms/src/Operators/tests/gtest_cte.cpp b/dbms/src/Operators/tests/gtest_cte.cpp index 7de68025632..3ba6cf47e02 100644 --- a/dbms/src/Operators/tests/gtest_cte.cpp +++ b/dbms/src/Operators/tests/gtest_cte.cpp @@ -92,7 +92,7 @@ try size_t row_num = di(dre); Blocks sink_blocks = generateBlocks(0, row_num); for (size_t i = 0; i < sink_blocks.size(); i++) - ASSERT_TRUE(cte->pushBlock(i % PARTITION_NUM, sink_blocks[i])); + ASSERT_EQ(CTEOpStatus::OK, cte->pushBlock(i % PARTITION_NUM, sink_blocks[i])); Blocks received_blocks; std::vector received_results; @@ -219,7 +219,10 @@ void concurrentTest() { std::lock_guard lock(*(source_mus[source_idx])); received_blocks[source_idx].push_back(block); + break; } + default: + throw Exception("Should not reach here"); } } if (partition_idx == 0)