Skip to content

Commit d8b9afe

Browse files
committed
Add evict http api
Signed-off-by: JaySon-Huang <[email protected]>
1 parent 005352d commit d8b9afe

File tree

4 files changed

+174
-27
lines changed

4 files changed

+174
-27
lines changed

dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <Storages/KVStore/FFI/ProxyFFICommon.h>
2424
#include <Storages/KVStore/KVStore.h>
2525
#include <Storages/KVStore/TMTContext.h>
26+
#include <Storages/S3/FileCache.h>
2627
#include <Storages/S3/S3GCManager.h>
2728
#include <TiDB/OwnerManager.h>
2829
#include <TiDB/Schema/SchemaSyncService.h>
@@ -32,6 +33,7 @@
3233

3334
#include <boost/algorithm/string.hpp>
3435
#include <magic_enum.hpp>
36+
#include <string>
3537

3638
namespace DB
3739
{
@@ -181,16 +183,19 @@ HttpRequestRes HandleHttpRequestStoreStatus(
181183

182184
// Check whether the disaggregated mode is enabled.
183185
// If not, return a HttpRequestRes with error message.
184-
std::optional<HttpRequestRes> allowDisaggAPI(Context & global_ctx, std::string_view message)
186+
std::optional<HttpRequestRes> allowDisaggAPI(
187+
Context & global_ctx,
188+
DisaggregatedMode expect_mode,
189+
std::string_view message)
185190
{
186-
if (!global_ctx.getSharedContextDisagg()->isDisaggregatedStorageMode())
191+
if (global_ctx.getSharedContextDisagg()->disaggregated_mode != expect_mode)
187192
{
188193
auto * body = RawCppString::New(fmt::format(
189194
R"json({{"message":"{}, disagg_mode={}"}})json",
190195
message,
191196
magic_enum::enum_name(global_ctx.getSharedContextDisagg()->disaggregated_mode)));
192197
return HttpRequestRes{
193-
.status = HttpRequestStatus::ErrorParam,
198+
.status = HttpRequestStatus::Ok,
194199
.res = CppStrWithView{
195200
.inner = GenRawCppPtr(body, RawCppPtrTypeImpl::String),
196201
.view = BaseBuffView{body->data(), body->size()},
@@ -209,7 +214,7 @@ HttpRequestRes HandleHttpRequestRemoteReUpload(
209214
std::string_view)
210215
{
211216
auto & global_ctx = server->tmt->getContext();
212-
if (auto err_resp = allowDisaggAPI(global_ctx, "can not sync remote store"); err_resp)
217+
if (auto err_resp = allowDisaggAPI(global_ctx, DisaggregatedMode::Storage, "can not sync remote store"); err_resp)
213218
{
214219
return err_resp.value();
215220
}
@@ -234,7 +239,7 @@ HttpRequestRes HandleHttpRequestRemoteOwnerInfo(
234239
std::string_view)
235240
{
236241
auto & global_ctx = server->tmt->getContext();
237-
if (auto err_resp = allowDisaggAPI(global_ctx, "can not get gc owner"); err_resp)
242+
if (auto err_resp = allowDisaggAPI(global_ctx, DisaggregatedMode::Storage, "can not get gc owner"); err_resp)
238243
{
239244
return err_resp.value();
240245
}
@@ -263,7 +268,7 @@ HttpRequestRes HandleHttpRequestRemoteOwnerResign(
263268
std::string_view)
264269
{
265270
auto & global_ctx = server->tmt->getContext();
266-
if (auto err_resp = allowDisaggAPI(global_ctx, "can not resign gc owner"); err_resp)
271+
if (auto err_resp = allowDisaggAPI(global_ctx, DisaggregatedMode::Storage, "can not resign gc owner"); err_resp)
267272
{
268273
return err_resp.value();
269274
}
@@ -290,7 +295,7 @@ HttpRequestRes HandleHttpRequestRemoteGC(
290295
std::string_view)
291296
{
292297
auto & global_ctx = server->tmt->getContext();
293-
if (auto err_resp = allowDisaggAPI(global_ctx, "can not trigger gc"); err_resp)
298+
if (auto err_resp = allowDisaggAPI(global_ctx, DisaggregatedMode::Storage, "can not trigger gc"); err_resp)
294299
{
295300
return err_resp.value();
296301
}
@@ -317,6 +322,79 @@ HttpRequestRes HandleHttpRequestRemoteGC(
317322
};
318323
}
319324

325+
HttpRequestRes HandleHttpRequestRemoteCacheEvict(
326+
EngineStoreServerWrap * server,
327+
std::string_view path,
328+
const std::string & api_name,
329+
std::string_view,
330+
std::string_view)
331+
{
332+
auto & global_ctx = server->tmt->getContext();
333+
if (auto err_resp = allowDisaggAPI(global_ctx, DisaggregatedMode::Compute, "can not trigger remote cache evict");
334+
err_resp)
335+
{
336+
return err_resp.value();
337+
}
338+
339+
auto log = Logger::get("HandleHttpRequestRemoteCacheEvict");
340+
LOG_INFO(log, "handling remote cache evict request, path={} api_name={}", path, api_name);
341+
FileSegment::FileType evict_until_type;
342+
{
343+
// schema: /tiflash/remote/cache/evict/{file_type_int}
344+
auto query = path.substr(api_name.size());
345+
if (query.empty() || query[0] != '/')
346+
{
347+
auto * body = RawCppString::New(
348+
fmt::format(R"json({{"message":"invalid remote cache evict request: {}"}})json", path));
349+
return HttpRequestRes{
350+
.status = HttpRequestStatus::Ok,
351+
.res = CppStrWithView{
352+
.inner = GenRawCppPtr(body, RawCppPtrTypeImpl::String),
353+
.view = BaseBuffView{body->data(), body->size()},
354+
},
355+
};
356+
}
357+
query.remove_prefix(1);
358+
std::optional<FileSegment::FileType> opt_evict_until_type = std::nullopt;
359+
try
360+
{
361+
auto itype = std::stoll(query.data());
362+
opt_evict_until_type = magic_enum::enum_cast<FileSegment::FileType>(itype);
363+
}
364+
catch (...)
365+
{
366+
// ignore
367+
}
368+
if (!opt_evict_until_type.has_value())
369+
{
370+
auto * body = RawCppString::New(
371+
fmt::format(R"json({{"message":"invalid file_type in remote cache evict request: {}"}})json", path));
372+
return HttpRequestRes{
373+
.status = HttpRequestStatus::Ok,
374+
.res = CppStrWithView{
375+
.inner = GenRawCppPtr(body, RawCppPtrTypeImpl::String),
376+
.view = BaseBuffView{body->data(), body->size()},
377+
},
378+
};
379+
}
380+
// successfully parse the file type
381+
evict_until_type = opt_evict_until_type.value();
382+
}
383+
384+
auto released_size = DB::FileCache::instance()->evictUntil(evict_until_type);
385+
auto * body = RawCppString::New(fmt::format(
386+
R"json({{"file_type":"{}","released_size":"{}"}})json",
387+
magic_enum::enum_name(evict_until_type),
388+
released_size));
389+
return HttpRequestRes{
390+
.status = HttpRequestStatus::Ok,
391+
.res = CppStrWithView{
392+
.inner = GenRawCppPtr(body, RawCppPtrTypeImpl::String),
393+
.view = BaseBuffView{body->data(), body->size()},
394+
},
395+
};
396+
}
397+
320398
// Acquiring the all the region ids created in this TiFlash node with given keyspace id.
321399
HttpRequestRes HandleHttpRequestSyncRegion(
322400
EngineStoreServerWrap * server,
@@ -502,6 +580,7 @@ static const std::map<std::string, HANDLE_HTTP_URI_METHOD> AVAILABLE_HTTP_URI =
502580
{"/tiflash/remote/owner/resign", HandleHttpRequestRemoteOwnerResign},
503581
{"/tiflash/remote/gc", HandleHttpRequestRemoteGC},
504582
{"/tiflash/remote/upload", HandleHttpRequestRemoteReUpload},
583+
{"/tiflash/remote/cache/evict", HandleHttpRequestRemoteCacheEvict},
505584
};
506585

507586
uint8_t CheckHttpUriAvailable(BaseBuffView path_)
@@ -530,9 +609,9 @@ HttpRequestRes HandleHttpRequest(
530609
return method(
531610
server,
532611
path,
533-
str,
534-
std::string_view(query.data, query.len),
535-
std::string_view(body.data, body.len));
612+
/*api_name=*/str,
613+
/*query=*/std::string_view(query.data, query.len),
614+
/*body=*/std::string_view(body.data, body.len));
536615
}
537616
}
538617
return HttpRequestRes{

dbms/src/Storages/S3/FileCache.cpp

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -241,27 +241,27 @@ FileSegmentPtr FileCache::get(const S3::S3FilenameView & s3_fname, const std::op
241241
return nullptr;
242242
}
243243

244-
// File not exists, try to download and cache it in backgroud.
244+
// File not exists, try to download and cache it in background.
245245

246246
// We don't know the exact size of a object/file, but we need reserve space to save the object/file.
247247
// A certain amount of space is reserved for each file type.
248-
auto estimzted_size = filesize ? *filesize : getEstimatedSizeOfFileType(file_type);
249-
if (!reserveSpaceImpl(file_type, estimzted_size, EvictMode::TryEvict))
248+
auto estimated_size = filesize ? *filesize : getEstimatedSizeOfFileType(file_type);
249+
if (!reserveSpaceImpl(file_type, estimated_size, EvictMode::TryEvict))
250250
{
251251
// Space not enough.
252252
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_full).Increment();
253253
LOG_DEBUG(
254254
log,
255-
"s3_key={} space not enough(capacity={} used={} estimzted_size={}), skip cache",
255+
"s3_key={} space not enough(capacity={} used={} estimated_size={}), skip cache",
256256
s3_key,
257257
cache_capacity,
258258
cache_used,
259-
estimzted_size);
259+
estimated_size);
260260
return nullptr;
261261
}
262262

263263
auto file_seg
264-
= std::make_shared<FileSegment>(toLocalFilename(s3_key), FileSegment::Status::Empty, estimzted_size, file_type);
264+
= std::make_shared<FileSegment>(toLocalFilename(s3_key), FileSegment::Status::Empty, estimated_size, file_type);
265265
table.set(s3_key, file_seg);
266266
bgDownload(s3_key, file_seg);
267267

@@ -300,7 +300,7 @@ FileSegmentPtr FileCache::getOrWait(const S3::S3FilenameView & s3_fname, const s
300300
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_full).Increment();
301301
LOG_INFO(
302302
log,
303-
"s3_key={} space not enough(capacity={} used={} estimzted_size={}), skip cache",
303+
"s3_key={} space not enough(capacity={} used={} estimated_size={}), skip cache",
304304
s3_key,
305305
cache_capacity,
306306
cache_used,
@@ -378,7 +378,7 @@ std::pair<Int64, std::list<String>::iterator> FileCache::removeImpl(
378378
FileSegmentPtr & f,
379379
bool force)
380380
{
381-
// Except currenly thread and the FileTable,
381+
// Except current thread and the FileTable,
382382
// there are other threads hold this FileSegment object.
383383
if (f.use_count() > 2 && !force)
384384
{
@@ -422,30 +422,50 @@ bool FileCache::reserveSpaceImpl(FileType reserve_for, UInt64 size, EvictMode ev
422422

423423
// The basic evict logic:
424424
// Distinguish cache priority according to file type. The larger the file type, the lower the priority.
425+
// If evict_same_type_first is true,
425426
// First, try to evict files which not be used recently with the same type. => Try to evict old files.
426427
// Second, try to evict files with lower priority. => Try to evict lower priority files.
427428
// Finally, evict files with higher priority, if space is still not sufficient. Higher priority files
428429
// are usually smaller. If we don't evict them, it is very possible that cache is full of these higher
429430
// priority small files and we can't effectively cache any lower-priority large files.
430-
std::vector<FileType> FileCache::getEvictFileTypes(FileType evict_for)
431+
std::vector<FileType> FileCache::getEvictFileTypes(FileType evict_for, bool evict_same_type_first)
431432
{
432-
std::vector<FileType> evict_types;
433-
evict_types.push_back(evict_for); // First, try evict with the same file type.
434433
constexpr auto all_file_types = magic_enum::enum_values<FileType>(); // all_file_types are sorted by enum value.
435-
// Second, try evict from the lower proirity file type.
436-
for (auto itr = std::rbegin(all_file_types); itr != std::rend(all_file_types); ++itr)
434+
if (evict_same_type_first)
435+
{
436+
std::vector<FileType> evict_types;
437+
evict_types.push_back(evict_for); // First, try evict with the same file type.
438+
// Second, try evict from the lower priority file type.
439+
for (auto itr = std::rbegin(all_file_types); itr != std::rend(all_file_types); ++itr)
440+
{
441+
if (*itr != evict_for)
442+
evict_types.push_back(*itr);
443+
}
444+
return evict_types;
445+
}
446+
else
437447
{
438-
if (*itr != evict_for)
448+
std::vector<FileType> evict_types;
449+
// Evict from the lower priority file type first.
450+
for (auto itr = std::rbegin(all_file_types); itr != std::rend(all_file_types); ++itr)
451+
{
439452
evict_types.push_back(*itr);
453+
if (*itr == evict_for)
454+
{
455+
// Do not evict higher priority file type
456+
break;
457+
;
458+
}
459+
}
460+
return evict_types;
440461
}
441-
return evict_types;
442462
}
443463

444464
void FileCache::tryEvictFile(FileType evict_for, UInt64 size, EvictMode evict)
445465
{
446466
RUNTIME_CHECK(evict != EvictMode::NoEvict);
447467

448-
auto file_types = getEvictFileTypes(evict_for);
468+
auto file_types = getEvictFileTypes(evict_for, /*evict_same_type_first*/ true);
449469
for (auto evict_from : file_types)
450470
{
451471
auto evicted_size = tryEvictFrom(evict_for, size, evict_from);
@@ -1015,4 +1035,45 @@ void FileCache::updateConfig(const Settings & settings)
10151035
}
10161036
}
10171037

1038+
UInt64 FileCache::evictUntil(FileSegment::FileType file_type)
1039+
{
1040+
std::lock_guard lock(mtx);
1041+
auto file_types = getEvictFileTypes(file_type, /*evict_same_type_first*/ false);
1042+
UInt64 total_released_size = 0;
1043+
for (auto evict_from : file_types)
1044+
{
1045+
UInt64 curr_released_size = 0;
1046+
auto & table = tables[static_cast<UInt64>(evict_from)];
1047+
for (auto itr = table.begin(); itr != table.end();)
1048+
{
1049+
auto s3_key = *itr;
1050+
auto f = table.get(s3_key, /*update_lru*/ false);
1051+
auto [released_size, next_itr] = removeImpl(table, s3_key, f, /*force*/ true);
1052+
if (released_size < 0) // not remove
1053+
{
1054+
++itr;
1055+
}
1056+
else // removed
1057+
{
1058+
itr = next_itr;
1059+
curr_released_size += released_size;
1060+
}
1061+
}
1062+
total_released_size += curr_released_size;
1063+
LOG_INFO(
1064+
log,
1065+
"evictUntil layer evict finish, evict_from={} file_type={} released_size={} tot_release_size={}",
1066+
magic_enum::enum_name(evict_from),
1067+
magic_enum::enum_name(file_type),
1068+
curr_released_size,
1069+
total_released_size);
1070+
}
1071+
LOG_INFO(
1072+
log,
1073+
"evictUntil finish, file_type={} total_released_size={}",
1074+
magic_enum::enum_name(file_type),
1075+
total_released_size);
1076+
return total_released_size;
1077+
}
1078+
10181079
} // namespace DB

dbms/src/Storages/S3/FileCache.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,9 @@ class FileCache
261261

262262
void updateConfig(const Settings & settings);
263263

264+
// evict the cached files until no file of >= `file_type` is in cache.
265+
UInt64 evictUntil(FileSegment::FileType file_type);
266+
264267
#ifndef DBMS_PUBLIC_GTEST
265268
private:
266269
#else
@@ -346,7 +349,9 @@ class FileCache
346349
void releaseSpace(UInt64 size);
347350
bool reserveSpace(FileSegment::FileType reserve_for, UInt64 size, EvictMode evict);
348351
bool finalizeReservedSize(FileSegment::FileType reserve_for, UInt64 reserved_size, UInt64 content_length);
349-
static std::vector<FileSegment::FileType> getEvictFileTypes(FileSegment::FileType evict_for);
352+
static std::vector<FileSegment::FileType> getEvictFileTypes(
353+
FileSegment::FileType evict_for,
354+
bool evict_same_type_first);
350355
void tryEvictFile(FileSegment::FileType evict_for, UInt64 size, EvictMode evict);
351356
UInt64 tryEvictFrom(FileSegment::FileType evict_for, UInt64 size, FileSegment::FileType evict_from);
352357
UInt64 forceEvict(UInt64 size);

dbms/src/Storages/StorageDisaggregatedRemote.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ DM::SegmentReadTasks StorageDisaggregated::buildReadTaskWithBackoff(
209209
}
210210
}
211211

212+
LOG_INFO(log, "build read task done, num_read_tasks={} elapsed_s={:.3f}", read_task.size(), build_read_task_watch.elapsedSeconds());
212213
return read_task;
213214
}
214215

@@ -400,6 +401,7 @@ void StorageDisaggregated::buildReadTaskForWriteNode(
400401
elapsed_seconds = watch.elapsedSeconds();
401402
scan_context->disagg_parse_read_task_ms += elapsed_seconds * 1000;
402403
GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_parse_read_tasks).Observe(elapsed_seconds);
404+
LOG_INFO(log, "build read task for write node done, wn_addr={} elapsed_s={:.3f}", req->address(), elapsed_seconds);
403405
});
404406
// Now we have successfully established disaggregated read for this write node.
405407
// Let's parse the result and generate actual segment read tasks.

0 commit comments

Comments
 (0)