Skip to content
Open
Show file tree
Hide file tree
Changes from 138 commits
Commits
Show all changes
139 commits
Select commit Hold shift + click to select a range
a5797b4
save
xzhangxian1008 Mar 25, 2025
385b051
save
xzhangxian1008 Mar 25, 2025
a8d3052
save
xzhangxian1008 Mar 28, 2025
dd96684
Merge branch 'master' of ssh://github.com/pingcap/tiflash into cte
xzhangxian1008 Mar 31, 2025
97f44e0
save
xzhangxian1008 Mar 31, 2025
00580c4
save
xzhangxian1008 Apr 1, 2025
966fa71
save
xzhangxian1008 Apr 2, 2025
c2fb233
save
xzhangxian1008 Apr 3, 2025
56df905
format
xzhangxian1008 Apr 3, 2025
265da87
save
xzhangxian1008 Apr 3, 2025
77d7652
save
xzhangxian1008 Apr 3, 2025
15af97f
save
xzhangxian1008 Apr 7, 2025
aa01ff6
tweaking
xzhangxian1008 Apr 7, 2025
218f0ed
tweaking
xzhangxian1008 Apr 7, 2025
eb04c29
tweaking
xzhangxian1008 Apr 8, 2025
e048782
Merge branch 'master' into cte
xzhangxian1008 Apr 8, 2025
c0d617a
fix
xzhangxian1008 Apr 8, 2025
c1512c4
tweaking
xzhangxian1008 Apr 8, 2025
6fb66a5
tweaking
xzhangxian1008 Apr 8, 2025
3dcad01
save
xzhangxian1008 Apr 9, 2025
d2efe61
Merge branch 'master' into cte
xzhangxian1008 Apr 14, 2025
3f78cfb
save
xzhangxian1008 Apr 16, 2025
bd30e21
save
xzhangxian1008 Apr 18, 2025
f3649f0
tweaking
xzhangxian1008 Apr 21, 2025
482daec
save
xzhangxian1008 Apr 21, 2025
6671078
save
xzhangxian1008 Apr 21, 2025
3610df7
save
xzhangxian1008 Apr 21, 2025
97e4611
save
xzhangxian1008 Apr 24, 2025
75e3855
save
xzhangxian1008 May 6, 2025
4515c3b
save
xzhangxian1008 May 8, 2025
25a9cf5
fix bugs
xzhangxian1008 May 8, 2025
6e903ef
fix more bugs
xzhangxian1008 May 9, 2025
7ad85b0
fix bugs
xzhangxian1008 May 12, 2025
4729e8e
fix bugs
xzhangxian1008 May 13, 2025
1052c61
moderate refactor
xzhangxian1008 May 13, 2025
e075e68
fix bug
xzhangxian1008 May 13, 2025
ccca5ab
Revert "fix bug"
xzhangxian1008 May 13, 2025
2393817
fix bugs
xzhangxian1008 May 13, 2025
5f0ace7
fix bug
xzhangxian1008 May 13, 2025
2a2d667
save
xzhangxian1008 May 15, 2025
9ff7f33
save
xzhangxian1008 May 15, 2025
8943a6c
minor fix
xzhangxian1008 May 15, 2025
53e4ff5
fix bug
xzhangxian1008 May 16, 2025
8396207
save
xzhangxian1008 May 27, 2025
ac93797
remove useless debug
xzhangxian1008 May 27, 2025
78f450a
fix bugs
xzhangxian1008 May 28, 2025
f6ee482
fix bugs
xzhangxian1008 May 29, 2025
751b852
fix
xzhangxian1008 May 29, 2025
e087d9e
fix some bugs
xzhangxian1008 May 29, 2025
a4cd337
fix cancel bug and remove xzxdebug
xzhangxian1008 Jun 3, 2025
1da2e3c
Merge branch 'master' into cte
xzhangxian1008 Jun 10, 2025
dbd68f1
merge
xzhangxian1008 Jun 10, 2025
8548f97
tweaking
xzhangxian1008 Jun 10, 2025
b8f30dd
tweaking
xzhangxian1008 Jun 10, 2025
e5e6378
update tipb
xzhangxian1008 Jun 10, 2025
2f81b3a
update tipb
xzhangxian1008 Jun 11, 2025
d203b70
Merge branch 'cte' into cte-spill
xzhangxian1008 Jun 11, 2025
03aa4e6
modify get chunk interface
xzhangxian1008 Jun 11, 2025
9632f48
merge cte branch
xzhangxian1008 Jun 11, 2025
6e8b133
tweaking
xzhangxian1008 Jun 11, 2025
676f743
refactor codes
xzhangxian1008 Jun 11, 2025
4ce7bde
Merge branch 'master' into cte
xzhangxian1008 Jun 11, 2025
1a23b99
refine
xzhangxian1008 Jun 12, 2025
2331d9a
merge cte
xzhangxian1008 Jun 12, 2025
2a1e45c
tweaking
xzhangxian1008 Jun 13, 2025
f3d8d5f
tweaking
xzhangxian1008 Jun 13, 2025
5dacc26
address comments
xzhangxian1008 Jun 16, 2025
fe3b63d
tweaking
xzhangxian1008 Jun 16, 2025
381944c
minor fix
xzhangxian1008 Jun 16, 2025
ebd7268
merge cte branch
xzhangxian1008 Jun 17, 2025
51ab8c1
tweaking
xzhangxian1008 Jun 17, 2025
cce3d0c
change WAITING to WAITING_FOR_NOTIFY
xzhangxian1008 Jun 17, 2025
ca61bd2
pass error message by cte
xzhangxian1008 Jun 19, 2025
e68baa2
refine
xzhangxian1008 Jun 20, 2025
53e4ea8
add concurrency for cte
xzhangxian1008 Jun 20, 2025
5f9b624
tweaking
xzhangxian1008 Jun 20, 2025
a3a96fd
tweaking
xzhangxian1008 Jun 22, 2025
8107ac7
fix hang
xzhangxian1008 Jun 22, 2025
52574dc
merge master
xzhangxian1008 Jun 24, 2025
a1c1d5d
tweaking
xzhangxian1008 Jun 24, 2025
55bccf5
remove fine grained
xzhangxian1008 Jun 24, 2025
2f59cc2
tweaking
xzhangxian1008 Jun 24, 2025
17ac8bd
fix ci
xzhangxian1008 Jun 25, 2025
bd86c78
address comments
xzhangxian1008 Jun 25, 2025
b34eec2
merge cte branch
xzhangxian1008 Jun 26, 2025
65548d3
save
xzhangxian1008 Jun 27, 2025
bff95e3
save
xzhangxian1008 Jun 27, 2025
c32757c
remove IdxWithPadding
xzhangxian1008 Jul 1, 2025
3b0fae4
merge cte branch
xzhangxian1008 Jul 1, 2025
d832edd
save
xzhangxian1008 Jul 1, 2025
0f6f8fa
add io_notifier
xzhangxian1008 Jul 1, 2025
19a3abc
fix hang
xzhangxian1008 Jul 2, 2025
00894bf
merge cte branch
xzhangxian1008 Jul 2, 2025
5c3918d
save
xzhangxian1008 Jul 2, 2025
1501ac6
address comments
xzhangxian1008 Jul 3, 2025
ef89300
Merge branch 'master' of https://github.com/pingcap/tiflash into cte
xzhangxian1008 Jul 3, 2025
3362d44
Merge branch 'cte' into cte-spill
xzhangxian1008 Jul 3, 2025
288b679
save
xzhangxian1008 Jul 3, 2025
bfd65c5
tweaking
xzhangxian1008 Jul 4, 2025
6f09386
fix bugs
xzhangxian1008 Jul 4, 2025
7f79f20
address comments
xzhangxian1008 Jul 4, 2025
aa27b1f
add log
xzhangxian1008 Jul 7, 2025
5d53c4c
fix
xzhangxian1008 Jul 7, 2025
81e5570
Merge branch 'cte' into cte-spill
xzhangxian1008 Jul 7, 2025
209e9ad
fix bugs, more bugs
xzhangxian1008 Jul 8, 2025
d20cf30
fix bugs
xzhangxian1008 Jul 9, 2025
f443e8e
remove
xzhangxian1008 Jul 9, 2025
9bb484c
fix bugs
xzhangxian1008 Jul 9, 2025
9b8a535
add debug logs
xzhangxian1008 Jul 14, 2025
b556b09
tweaking
xzhangxian1008 Jul 15, 2025
ee6917c
refactor
xzhangxian1008 Jul 15, 2025
0f1c0f2
fix exception
xzhangxian1008 Jul 16, 2025
562c402
merge cte branch
xzhangxian1008 Jul 16, 2025
a11fc8c
tweaking
xzhangxian1008 Jul 16, 2025
c5a12bb
Merge branch 'cte' into cte-spill
xzhangxian1008 Jul 16, 2025
a5160a5
fix
xzhangxian1008 Jul 16, 2025
8326817
remove useless codes
xzhangxian1008 Jul 16, 2025
7e8b7d6
remove useless codes
xzhangxian1008 Jul 16, 2025
80dc7b4
add log
xzhangxian1008 Jul 16, 2025
f091e35
support auto spill
xzhangxian1008 Jul 18, 2025
2a64cf3
fix ci
xzhangxian1008 Jul 21, 2025
607c0e4
Merge branch 'cte' into cte-spill
xzhangxian1008 Jul 21, 2025
6b90cad
fix
xzhangxian1008 Jul 21, 2025
4660656
fix ci
xzhangxian1008 Jul 22, 2025
0384d2d
add logs
xzhangxian1008 Jul 22, 2025
834b20d
fix block missing
xzhangxian1008 Jul 23, 2025
3a058e8
remove debug logs
xzhangxian1008 Jul 29, 2025
39209bd
fix bug
xzhangxian1008 Jul 29, 2025
fc7611a
tweaking
xzhangxian1008 Jul 30, 2025
7197740
merge and resolve conflicts
xzhangxian1008 Aug 25, 2025
5e288ad
fix and format
xzhangxian1008 Aug 25, 2025
dc65e96
refine
xzhangxian1008 Aug 25, 2025
c019712
fix bugs
xzhangxian1008 Sep 2, 2025
b97b49a
fix ut
xzhangxian1008 Sep 4, 2025
9e514e1
clean codes
xzhangxian1008 Sep 5, 2025
87743d9
refine
xzhangxian1008 Sep 5, 2025
ae768bd
fix bug
xzhangxian1008 Sep 8, 2025
53a4141
refine memory_usage
xzhangxian1008 Sep 11, 2025
f7373af
address comments
xzhangxian1008 Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_wait_on_join_build, {"type", "wait_on_join_build"}), \
F(type_wait_on_join_probe, {"type", "wait_on_join_probe"}), \
F(type_wait_on_result_queue_write, {"type", "wait_on_result_queue_write"}), \
F(type_type_wait_on_cte_read, {"type", "type_wait_on_cte_read"})) \
F(type_wait_on_cte_read, {"type", "type_wait_on_cte_read"}), \
F(type_wait_on_cte_io, {"type", "type_wait_on_cte_io"})) \
M(tiflash_pipeline_task_duration_seconds, \
"Bucketed histogram of pipeline task duration in seconds", \
Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Core/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class Spiller
/// file is read, otherwise, the spilled file will be released when destruct the spiller. Currently, all the spilled
/// file can be released on restore since it is only read once, but in the future if SharedScan(shared cte) need spill,
/// the data may be restored multiple times and release_spilled_file_on_restore need to be set to false.
const bool release_spilled_file_on_restore;
bool release_spilled_file_on_restore;
bool enable_append_write = false;
};

Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/NativeBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,4 @@ void NativeBlockOutputStream::write(const Block & block)
}
}
}

} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ void updateSettingsFromTiDB(const grpc::ServerContext * grpc_context, ContextPtr
std::make_pair("tidb_max_bytes_before_tiflash_external_join", "max_bytes_before_external_join"),
std::make_pair("tidb_max_bytes_before_tiflash_external_group_by", "max_bytes_before_external_group_by"),
std::make_pair("tidb_max_bytes_before_tiflash_external_sort", "max_bytes_before_external_sort"),
std::make_pair("tidb_max_bytes_before_tiflash_cte_spill", "max_bytes_before_cte_spill"),
std::make_pair("tiflash_mem_quota_query_per_node", "max_memory_usage"),
std::make_pair("tiflash_query_spill_ratio", "auto_memory_revoke_trigger_threshold"),
std::make_pair("tiflash_use_hash_join_v2", "enable_hash_join_v2"),
Expand Down Expand Up @@ -188,6 +189,11 @@ void updateSettingsForAutoSpill(ContextPtr & context, const LoggerPtr & log)
need_log_warning = true;
context->setSetting("max_bytes_before_external_join", "0");
}
if (context->getSettingsRef().max_bytes_before_cte_spill > 0)
{
need_log_warning = true;
context->setSetting("max_bytes_before_cte_spill", "0");
}
if (need_log_warning)
LOG_WARNING(log, "auto spill is enabled, so per operator's memory threshold is disabled");
}
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include <ext/scope_guard.h>
#include <magic_enum.hpp>


namespace DB
{
namespace ErrorCodes
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ WaitReactor::WaitReactor(TaskScheduler & scheduler_)
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_join_probe).Set(0);
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_result_queue_write).Set(0);
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_shared_queue_write).Set(0);
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_type_wait_on_cte_read).Set(0);
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_cte_read).Set(0);
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_cte_io).Set(0);
thread = std::thread(&WaitReactor::loop, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ class PipeConditionVariable
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_shared_queue_write).Increment(change);
break;
case NotifyType::WAIT_ON_CTE_READ:
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_type_wait_on_cte_read).Increment(change);
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_cte_read).Increment(change);
break;
case NotifyType::WAIT_ON_CTE_IO:
GET_METRIC(tiflash_pipeline_wait_on_notify_tasks, type_wait_on_cte_io).Increment(change);
break;
case NotifyType::WAIT_ON_NOTHING:
throw Exception("task notify type should be set before register or notify");
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum class NotifyType
WAIT_ON_JOIN_PROBE_FINISH,
WAIT_ON_RESULT_QUEUE_WRITE,
WAIT_ON_CTE_READ,
WAIT_ON_CTE_IO,
WAIT_ON_NOTHING,
};

Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalCTESink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ void PhysicalCTESink::buildPipelineExecGroupImpl(
builder.setSinkOp(std::make_unique<CTESinkOp>(exec_context, log->identifier(), cte, id));
id++;
});

const Settings & settings = context.getSettingsRef();
SpillConfig spill_config(
context.getTemporaryPath(),
" ",
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
context.getFileProvider(),
settings.max_threads,
settings.max_block_size);

cte->initCTESpillContextAndPartitionConfig(
spill_config,
group_builder.getCurrentHeader(),
settings.max_bytes_before_cte_spill,
context);
}

void PhysicalCTESink::finalizeImpl(const Names & parent_require)
Expand Down
56 changes: 56 additions & 0 deletions dbms/src/Interpreters/CTESpillContext.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Interpreters/CTESpillContext.h>

#include <algorithm>
#include <mutex>
#include <utility>

namespace DB
{
Int64 CTESpillContext::triggerSpillImpl(Int64 expected_released_memories)
{
size_t partition_num = this->partitions.size();
std::vector<std::pair<size_t, size_t>> part_idx_with_mem_usages;
part_idx_with_mem_usages.reserve(partition_num);
for (size_t i = 0; i < partition_num; i++)
part_idx_with_mem_usages.push_back(std::make_pair(this->partitions[i]->memory_usage.load(), i));

std::sort(
part_idx_with_mem_usages.begin(),
part_idx_with_mem_usages.end(),
[](const std::pair<size_t, size_t> & l, const std::pair<size_t, size_t> & r) { return l.first > r.first; });

for (const auto & item : part_idx_with_mem_usages)
{
auto memory_usage = item.first;
const std::shared_ptr<CTEPartition> & partition = partitions[item.second];
std::lock_guard<std::mutex> aux_lock(*(partition->aux_lock));
if (partition->status != CTEPartitionStatus::NORMAL)
continue;

if (memory_usage == 0)
continue;

partition->status = CTEPartitionStatus::NEED_SPILL;

expected_released_memories = std::max(expected_released_memories - memory_usage, 0);
if (expected_released_memories <= 0)
return expected_released_memories;
}
return expected_released_memories;
}
} // namespace DB
55 changes: 55 additions & 0 deletions dbms/src/Interpreters/CTESpillContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Core/Block.h>
#include <Core/OperatorSpillContext.h>
#include <Operators/CTEPartition.h>

#include <memory>
#include <mutex>

namespace DB
{
class CTESpillContext final : public OperatorSpillContext
{
public:
CTESpillContext(
UInt64 operator_spill_threshold_,
const String & query_id_and_cte_id_,
std::vector<std::shared_ptr<CTEPartition>> partitions_)
: OperatorSpillContext(operator_spill_threshold_, "cte", Logger::get(query_id_and_cte_id_))
, partitions(partitions_)
{}

bool supportAutoTriggerSpill() const override { return true; }
Int64 triggerSpillImpl(Int64 expected_released_memories) override;
LoggerPtr getLog() const { return this->log; }

protected:
Int64 getTotalRevocableMemoryImpl() override
{
Int64 total_memory = 0;
for (auto & partition : this->partitions)
total_memory += partition->memory_usage.load();
return total_memory;
}

private:
std::vector<std::shared_ptr<CTEPartition>> partitions;
};
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ struct Settings
M(SettingUInt64, async_cqs, 1, "grpc async cqs") \
M(SettingUInt64, preallocated_request_count_per_poller, 20, "grpc preallocated_request_count_per_poller") \
M(SettingUInt64, max_bytes_before_external_join, 0, "max bytes used by join before spill, 0 as the default value, 0 means no limit") \
M(SettingUInt64, max_bytes_before_cte_spill, 0, "max bytes used by cte before spill, 0 as the default value, 0 means no limit") \
M(SettingInt64, join_restore_concurrency, 0, "join restore concurrency, negative value means restore join serially, 0 means TiFlash choose restore concurrency automatically, 0 as the default value") \
M(SettingUInt64, max_cached_data_bytes_in_spiller, 1024ULL * 1024 * 20, "Max cached data bytes in spiller before spilling, 20 MB as the default value, 0 means no limit") \
M(SettingUInt64, max_spilled_rows_per_file, 200000, "Max spilled data rows per spill file, 200000 as the default value, 0 means no limit.") \
Expand Down
Loading