Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ grpc::Status FlashService::IsAlive(
return check_result;

auto & tmt_context = context->getTMTContext();
response->set_available(tmt_context.checkRunning());
response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable());
response->set_mpp_version(DB::GetMppVersion());
return grpc::Status::OK;
}
Expand Down
34 changes: 34 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPTask.h>
Expand Down Expand Up @@ -82,6 +83,39 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
return ptr;
}

void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context)
{
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown";
// The default value of flash.graceful_wait_before_shutdown
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600;
auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(
GRACEFUL_WIAT_BEFORE_SHUTDOWN,
DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown);
Stopwatch watch;
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
std::this_thread::sleep_for(std::chrono::seconds(1));
while (true)
{
auto elapsed_ms = watch.elapsedMilliseconds();
{
std::unique_lock lock(mu);
if (monitored_tasks.empty())
{
LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms);
break;
}
}
if (elapsed_ms >= graceful_wait_before_shutdown * 1000)
{
LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE)
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ struct MPPTaskMonitor
return monitored_tasks.find(task_unique_id) != monitored_tasks.end();
}

void waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context);

std::mutex mu;
std::condition_variable cv;
bool is_shutdown = false;
Expand Down Expand Up @@ -220,6 +222,8 @@ class MPPTaskManager : private boost::noncopyable

std::shared_ptr<MPPTaskMonitor> monitor;

std::atomic<bool> is_available{true};

public:
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler);

Expand Down Expand Up @@ -270,6 +274,9 @@ class MPPTaskManager : private boost::noncopyable

bool isTaskExists(const MPPTaskId & id);

void setUnavailable() { is_available = false; }
bool isAvailable() { return is_available; }

private:
MPPQueryPtr addMPPQuery(
const MPPQueryId & query_id,
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,12 @@ try
LOG_INFO(log, "Start to wait for terminal signal");
waitForTerminationRequest();

// Note: `waitAllMPPTasksFinish` must be called before stopping the proxy.
// Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully.
LOG_INFO(log, "Set unavailable for MPPTask");
tmt_context.getMPPTaskManager()->setUnavailable();
tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context);

{
// Set limiters stopping and wakeup threads in waitting queue.
global_context->getIORateLimiter().setStop();
Expand Down