Skip to content

Commit c8fe853

Browse files
committed
save add block_wait
Signed-off-by: JaySon-Huang <[email protected]>
1 parent d8b9afe commit c8fe853

19 files changed

+117
-24
lines changed

dbms/src/Interpreters/Settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ struct Settings
249249
M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \
250250
M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \
251251
M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \
252+
M(SettingBool, dt_enable_block_wait, false, "Whether block wait until remote file is cached in compute node") \
252253
M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \
253254
M(SettingDouble, dt_fetch_page_concurrency_scale, 4.0, "Concurrency of fetching pages of one query equals to num_streams * dt_fetch_page_concurrency_scale.") \
254255
M(SettingDouble, dt_prepare_stream_concurrency_scale, 2.0, "Concurrency of preparing streams of one query equals to num_streams * dt_prepare_stream_concurrency_scale.") \

dbms/src/Server/DTTool/DTToolInspect.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args)
288288
*dmfile,
289289
fp,
290290
nullptr,
291-
false,
291+
/*block_wait=*/false,
292+
/*set_cache_if_miss=*/false,
292293
col_id,
293294
nullptr,
294295
nullptr);

dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class MarkLoader
3535
auto mark_guard = S3::S3RandomAccessFile::setReadFileInfo({
3636
.size = reader.dmfile->getReadFileSize(col_id, colMarkFileName(file_name_base)),
3737
.scan_context = reader.scan_context,
38+
.block_wait = block_wait,
3839
});
3940

4041
if (likely(reader.dmfile->useMetaV2()))
@@ -59,17 +60,20 @@ class MarkLoader
5960
DMFileReader & reader_,
6061
ColId col_id_,
6162
const String & file_name_base_,
63+
bool block_wait_,
6264
const ReadLimiterPtr & read_limiter_)
6365
: reader(reader_)
6466
, col_id(col_id_)
6567
, file_name_base(file_name_base_)
6668
, read_limiter(read_limiter_)
69+
, block_wait(block_wait_)
6770
{}
6871

6972
DMFileReader & reader;
7073
ColId col_id;
7174
const String & file_name_base;
7275
ReadLimiterPtr read_limiter;
76+
bool block_wait;
7377

7478
private:
7579
MarksInCompressedFilePtr loadRawColMarkTo(const MarksInCompressedFilePtr & res, size_t bytes_size)
@@ -288,6 +292,7 @@ ColumnReadStream::ColumnReadStream(
288292
ColId col_id,
289293
const String & file_name_base,
290294
size_t max_read_buffer_size,
295+
bool block_wait,
291296
const LoggerPtr & log,
292297
const ReadLimiterPtr & read_limiter)
293298
: avg_size_hint(reader.dmfile->getColumnStat(col_id).avg_size)
@@ -297,9 +302,9 @@ ColumnReadStream::ColumnReadStream(
297302
if (reader.mark_cache)
298303
marks = reader.mark_cache->getOrSet(
299304
reader.dmfile->colMarkCacheKey(file_name_base),
300-
MarkLoader(reader, col_id, file_name_base, read_limiter));
305+
MarkLoader(reader, col_id, file_name_base, block_wait, read_limiter));
301306
else
302-
marks = MarkLoader(reader, col_id, file_name_base, read_limiter)();
307+
marks = MarkLoader(reader, col_id, file_name_base, block_wait, read_limiter)();
303308

304309
// skip empty dmfile
305310
size_t packs = reader.dmfile->getPacks();
@@ -309,6 +314,7 @@ ColumnReadStream::ColumnReadStream(
309314
auto data_guard = S3::S3RandomAccessFile::setReadFileInfo({
310315
.size = reader.dmfile->getReadFileSize(col_id, colDataFileName(file_name_base)),
311316
.scan_context = reader.scan_context,
317+
.block_wait = block_wait,
312318
});
313319

314320
// load column data read buffer

dbms/src/Storages/DeltaMerge/File/ColumnStream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class ColumnReadStream
3636
ColId col_id,
3737
const String & file_name_base,
3838
size_t max_read_buffer_size,
39+
bool block_wait,
3940
const LoggerPtr & log,
4041
const ReadLimiterPtr & read_limiter);
4142

dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::buildNoLocalIndex(
9595
dmfile,
9696
read_columns,
9797
is_common_handle,
98+
block_wait,
9899
enable_handle_clean_read,
99100
enable_del_clean_read,
100101
is_fast_scan,
@@ -136,6 +137,7 @@ SkippableBlockInputStreamPtr createSimpleBlockInputStream(
136137

137138
DMFileBlockInputStreamBuilder & DMFileBlockInputStreamBuilder::setFromSettings(const Settings & settings)
138139
{
140+
block_wait = settings.dt_enable_block_wait;
139141
enable_column_cache = settings.dt_enable_stable_column_cache;
140142
max_read_buffer_size = settings.max_read_buffer_size;
141143
max_sharing_column_bytes_for_all = settings.dt_max_sharing_column_bytes_for_all;
@@ -217,6 +219,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::buildForVectorIndex(
217219
dmfile,
218220
*vec_index_ctx->rest_col_defs,
219221
is_common_handle,
222+
block_wait,
220223
enable_handle_clean_read,
221224
enable_del_clean_read,
222225
is_fast_scan,

dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class DMFileBlockInputStreamBuilder
244244
IdSetPtr read_packs;
245245
MarkCachePtr mark_cache;
246246
MinMaxIndexCachePtr index_cache;
247+
bool block_wait = false;
247248
// column cache
248249
bool enable_column_cache = false;
249250
ColumnCachePtr column_cache;

dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ DMFilePackFilterResultPtr DMFilePackFilter::load()
5858
auto read_all_packs = (rowkey_ranges.size() == 1 && rowkey_ranges[0].all()) || rowkey_ranges.empty();
5959
if (!read_all_packs)
6060
{
61-
tryLoadIndex(result.param, MutSup::extra_handle_id);
61+
tryLoadIndex(result.param, MutSup::extra_handle_id, block_wait);
6262
std::vector<RSOperatorPtr> handle_filters;
6363
for (auto & rowkey_range : rowkey_ranges)
6464
handle_filters.emplace_back(toFilter(rowkey_range));
@@ -129,7 +129,7 @@ DMFilePackFilterResultPtr DMFilePackFilter::load()
129129
ColIds ids = filter->getColumnIDs();
130130
for (const auto & id : ids)
131131
{
132-
tryLoadIndex(result.param, id);
132+
tryLoadIndex(result.param, id, block_wait);
133133
}
134134

135135
const auto check_results = filter->roughCheck(0, pack_count, result.param);
@@ -189,13 +189,21 @@ void DMFilePackFilter::loadIndex(
189189
const DMFilePtr & dmfile,
190190
const FileProviderPtr & file_provider,
191191
const MinMaxIndexCachePtr & index_cache,
192+
bool block_wait,
192193
bool set_cache_if_miss,
193194
ColId col_id,
194195
const ReadLimiterPtr & read_limiter,
195196
const ScanContextPtr & scan_context)
196197
{
197-
auto [type, minmax_index]
198-
= loadIndex(*dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context);
198+
auto [type, minmax_index] = loadIndex(
199+
*dmfile,
200+
file_provider,
201+
index_cache,
202+
block_wait,
203+
set_cache_if_miss,
204+
col_id,
205+
read_limiter,
206+
scan_context);
199207
indexes.emplace(col_id, RSIndex(type, minmax_index));
200208
}
201209

@@ -214,6 +222,7 @@ class MinMaxIndexLoader
214222
auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({
215223
.size = dmfile.getReadFileSize(col_id, colIndexFileName(file_name_base)),
216224
.scan_context = scan_context,
225+
.block_wait = block_wait,
217226
});
218227

219228
if (likely(dmfile.useMetaV2()))
@@ -238,6 +247,7 @@ class MinMaxIndexLoader
238247
const DMFile & dmfile_,
239248
const FileProviderPtr & file_provider_,
240249
ColId col_id_,
250+
bool block_wait_,
241251
const ReadLimiterPtr & read_limiter_,
242252
const ScanContextPtr & scan_context_)
243253
: dmfile(dmfile_)
@@ -246,6 +256,7 @@ class MinMaxIndexLoader
246256
, file_provider(file_provider_)
247257
, read_limiter(read_limiter_)
248258
, scan_context(scan_context_)
259+
, block_wait(block_wait_)
249260
{}
250261

251262
const DMFile & dmfile;
@@ -254,6 +265,7 @@ class MinMaxIndexLoader
254265
FileProviderPtr file_provider;
255266
ReadLimiterPtr read_limiter;
256267
ScanContextPtr scan_context;
268+
bool block_wait = false;
257269

258270
private:
259271
MinMaxIndexPtr loadRawMinMaxIndex(const DataTypePtr & type, size_t index_file_size) const
@@ -339,6 +351,7 @@ std::pair<DataTypePtr, MinMaxIndexPtr> DMFilePackFilter::loadIndex(
339351
const DMFile & dmfile,
340352
const FileProviderPtr & file_provider,
341353
const MinMaxIndexCachePtr & index_cache,
354+
bool block_wait,
342355
bool set_cache_if_miss,
343356
ColId col_id,
344357
const ReadLimiterPtr & read_limiter,
@@ -350,7 +363,7 @@ std::pair<DataTypePtr, MinMaxIndexPtr> DMFilePackFilter::loadIndex(
350363
MinMaxIndexPtr minmax_index;
351364
if (index_cache && set_cache_if_miss)
352365
{
353-
auto loader = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context);
366+
auto loader = MinMaxIndexLoader(dmfile, file_provider, col_id, block_wait, read_limiter, scan_context);
354367
minmax_index = index_cache->getOrSet(dmfile.colIndexCacheKey(file_name_base), loader);
355368
}
356369
else
@@ -359,12 +372,12 @@ std::pair<DataTypePtr, MinMaxIndexPtr> DMFilePackFilter::loadIndex(
359372
if (index_cache)
360373
minmax_index = index_cache->get(dmfile.colIndexCacheKey(file_name_base));
361374
if (minmax_index == nullptr)
362-
minmax_index = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context)();
375+
minmax_index = MinMaxIndexLoader(dmfile, file_provider, col_id, block_wait, read_limiter, scan_context)();
363376
}
364377
return {type, minmax_index};
365378
}
366379

367-
void DMFilePackFilter::tryLoadIndex(RSCheckParam & param, ColId col_id)
380+
void DMFilePackFilter::tryLoadIndex(RSCheckParam & param, ColId col_id, bool block_wait)
368381
{
369382
if (param.indexes.count(col_id))
370383
return;
@@ -373,7 +386,16 @@ void DMFilePackFilter::tryLoadIndex(RSCheckParam & param, ColId col_id)
373386
return;
374387

375388
Stopwatch watch;
376-
loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context);
389+
loadIndex(
390+
param.indexes,
391+
dmfile,
392+
file_provider,
393+
index_cache,
394+
block_wait,
395+
set_cache_if_miss,
396+
col_id,
397+
read_limiter,
398+
scan_context);
377399
}
378400

379401
std::pair<std::vector<DMFilePackFilter::Range>, DMFilePackFilterResults> DMFilePackFilter::getSkippedRangeAndFilter(

dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ class DMFilePackFilter
154154
const DMFile & dmfile,
155155
const FileProviderPtr & file_provider,
156156
const MinMaxIndexCachePtr & index_cache,
157+
bool block_wait,
157158
bool set_cache_if_miss,
158159
ColId col_id,
159160
const ReadLimiterPtr & read_limiter,
@@ -190,17 +191,19 @@ class DMFilePackFilter
190191
const DMFilePtr & dmfile,
191192
const FileProviderPtr & file_provider,
192193
const MinMaxIndexCachePtr & index_cache,
194+
bool block_wait,
193195
bool set_cache_if_miss,
194196
ColId col_id,
195197
const ReadLimiterPtr & read_limiter,
196198
const ScanContextPtr & scan_context);
197199

198-
void tryLoadIndex(RSCheckParam & param, ColId col_id);
200+
void tryLoadIndex(RSCheckParam & param, ColId col_id, bool block_wait);
199201

200202
private:
201203
DMFilePtr dmfile;
202204

203205
MinMaxIndexCachePtr index_cache;
206+
bool block_wait;
204207
bool set_cache_if_miss;
205208
RowKeyRanges rowkey_ranges;
206209
RSOperatorPtr filter;

dbms/src/Storages/DeltaMerge/File/DMFilePackFilterResult.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ void DMFilePackFilterResult::tryLoadIndex(
7878
dmfile,
7979
file_provider,
8080
index_cache,
81+
/*block_wait=*/false,
8182
/*set_cache_if_miss=*/true,
8283
col_id,
8384
read_limiter,

dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ DMFileReader::DMFileReader(
5454
const DMFilePtr & dmfile_,
5555
const ColumnDefines & read_columns_,
5656
bool is_common_handle_,
57+
bool block_wait,
5758
// clean read
5859
bool enable_handle_clean_read_,
5960
bool enable_del_clean_read_,
@@ -114,6 +115,7 @@ DMFileReader::DMFileReader(
114115
cd.id,
115116
stream_name,
116117
max_read_buffer_size,
118+
block_wait,
117119
log,
118120
read_limiter);
119121
column_streams.emplace(stream_name, std::move(stream));

0 commit comments

Comments
 (0)