Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions cloud/blockstore/libs/storage/api/volume.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,14 @@ struct TEvVolume

struct TDiskRegistryBasedPartitionCounters
{
TPartitionDiskCountersPtr DiskCounters;
TString DiskId;
ui64 NetworkBytes = 0;
TDuration CpuUsage;
TPartNonreplCountersData CountersData;

TDiskRegistryBasedPartitionCounters(
TPartitionDiskCountersPtr diskCounters,
TString diskId,
ui64 networkBytes,
TDuration cpuUsage)
: DiskCounters(std::move(diskCounters))
, DiskId(std::move(diskId))
, NetworkBytes(networkBytes)
, CpuUsage(cpuUsage)
TPartNonreplCountersData countersData)
: DiskId(std::move(diskId))
, CountersData(std::move(countersData))
{}
};

Expand Down
22 changes: 17 additions & 5 deletions cloud/blockstore/libs/storage/core/disk_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ namespace NCloud::NBlockStore::NStorage {

////////////////////////////////////////////////////////////////////////////////

struct TPartitionDiskCounters;
struct TVolumeSelfCounters;

using TPartitionDiskCountersPtr = std::unique_ptr<TPartitionDiskCounters>;
using TVolumeSelfCountersPtr = std::unique_ptr<TVolumeSelfCounters>;

////////////////////////////////////////////////////////////////////////////////

enum class EPublishingPolicy
{
All,
Expand Down Expand Up @@ -835,6 +843,15 @@ struct TPartitionDiskCounters

////////////////////////////////////////////////////////////////////////////////

struct TPartNonreplCountersData
{
TPartitionDiskCountersPtr DiskCounters;
ui64 NetworkBytes = 0;
TDuration CpuUsage;
};

////////////////////////////////////////////////////////////////////////////////

struct TVolumeSelfCounters
{
TVolumeSelfSimpleCounters Simple;
Expand All @@ -861,11 +878,6 @@ struct TVolumeSelfCounters

////////////////////////////////////////////////////////////////////////////////

using TPartitionDiskCountersPtr = std::unique_ptr<TPartitionDiskCounters>;
using TVolumeSelfCountersPtr = std::unique_ptr<TVolumeSelfCounters>;

////////////////////////////////////////////////////////////////////////////////

TPartitionDiskCountersPtr CreatePartitionDiskCounters(
EPublishingPolicy policy,
EHistogramCounterOptions histCounterOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ TMirrorPartitionActor::~TMirrorPartitionActor() = default;
void TMirrorPartitionActor::Bootstrap(const TActorContext& ctx)
{
SetupPartitions(ctx);
ScheduleCountersUpdate(ctx);

if (!Config->GetUsePullSchemeForVolumeStatistics()) {
ScheduleCountersUpdate(ctx);
}
if (Config->GetDataScrubbingEnabled() && !ResyncActorId) {
StartScrubbingRange(ctx, 0);
}
Expand Down Expand Up @@ -887,6 +888,15 @@ STFUNC(TMirrorPartitionActor::StateWork)

HFunc(TEvPartition::TEvReleaseRange, HandleReleaseRange);

HFunc(
TEvNonreplPartitionPrivate::
TEvGetDiskRegistryBasedPartCountersRequest,
HandleGetDiskRegistryBasedPartCounters);
HFunc(
TEvNonreplPartitionPrivate::
TEvDiskRegistryBasedPartCountersCombined,
HandleDiskRegistryBasedPartCountersCombined);

HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
IgnoreFunc(TEvents::TEvPoisonTaken);

Expand Down Expand Up @@ -945,6 +955,11 @@ STFUNC(TMirrorPartitionActor::StateZombie)

IgnoreFunc(TEvPartition::TEvReleaseRange);

IgnoreFunc(TEvNonreplPartitionPrivate::
TEvGetDiskRegistryBasedPartCountersRequest);
IgnoreFunc(TEvNonreplPartitionPrivate::
TEvDiskRegistryBasedPartCountersCombined);

IgnoreFunc(TEvents::TEvPoisonPill);
HFunc(TEvents::TEvPoisonTaken, HandlePoisonTaken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class TMirrorPartitionActor final
const size_t MultiAgentWriteRequestSizeThreshold = 0;
size_t MultiAgentWriteRoundRobinSeed = 0;

TRequestInfoPtr StatisticRequestInfo;

public:
TMirrorPartitionActor(
TStorageConfigPtr config,
Expand Down Expand Up @@ -150,6 +152,12 @@ class TMirrorPartitionActor final
EWriteRequestType SuggestWriteRequestType(
const NActors::TActorContext& ctx,
TBlockRange64 range);
TPartNonreplCountersData ExtractPartCounters(
const NActors::TActorContext& ctx);
void UpdateCounters(
const NActors::TActorContext& ctx,
const NActors::TActorId& sender,
TPartNonreplCountersData partCountersData);

private:
STFUNC(StateWork);
Expand Down Expand Up @@ -228,6 +236,16 @@ class TMirrorPartitionActor final
const NActors::TEvents::TEvPoisonTaken::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleGetDiskRegistryBasedPartCounters(
const TEvNonreplPartitionPrivate::
TEvGetDiskRegistryBasedPartCountersRequest::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleDiskRegistryBasedPartCountersCombined(
const TEvNonreplPartitionPrivate::
TEvDiskRegistryBasedPartCountersCombined::TPtr& ev,
const NActors::TActorContext& ctx);

template <typename TMethod>
void MirrorRequest(
const typename TMethod::TRequest::TPtr& ev,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,31 @@

#include <cloud/blockstore/libs/storage/api/volume.h>
#include <cloud/blockstore/libs/storage/core/config.h>
#include <cloud/blockstore/libs/storage/volume/actors/disk_registry_based_partition_statistics_collector_actor.h>

namespace NCloud::NBlockStore::NStorage {

using namespace NActors;

////////////////////////////////////////////////////////////////////////////////

void TMirrorPartitionActor::HandlePartCounters(
const TEvVolume::TEvDiskRegistryBasedPartitionCounters::TPtr& ev,
const TActorContext& ctx)
void TMirrorPartitionActor::UpdateCounters(
const TActorContext& ctx,
const TActorId& sender,
TPartNonreplCountersData partCountersData)
{
auto* msg = ev->Get();

const ui32 replicaIndex = State.GetReplicaIndex(ev->Sender);
const ui32 replicaIndex = State.GetReplicaIndex(sender);
if (replicaIndex < ReplicaCounters.size()) {
ReplicaCounters[replicaIndex] = std::move(msg->DiskCounters);
NetworkBytes += msg->NetworkBytes;
CpuUsage += CpuUsage;
ReplicaCounters[replicaIndex] =
std::move(partCountersData.DiskCounters);
NetworkBytes += partCountersData.NetworkBytes;
CpuUsage += partCountersData.CpuUsage;
} else {
LOG_INFO(ctx, TBlockStoreComponents::PARTITION,
LOG_INFO(
ctx,
TBlockStoreComponents::PARTITION,
"Partition %s for disk %s counters not found",
ToString(ev->Sender).c_str(),
ToString(sender).c_str(),
State.GetReplicaInfos()[0].Config->GetName().Quote().c_str());

Y_DEBUG_ABORT_UNLESS(0);
Expand All @@ -32,12 +35,19 @@ void TMirrorPartitionActor::HandlePartCounters(

////////////////////////////////////////////////////////////////////////////////

void TMirrorPartitionActor::SendStats(const TActorContext& ctx)
void TMirrorPartitionActor::HandlePartCounters(
const TEvVolume::TEvDiskRegistryBasedPartitionCounters::TPtr& ev,
const TActorContext& ctx)
{
if (!StatActorId) {
return;
}
auto* msg = ev->Get();
UpdateCounters(ctx, ev->Sender, std::move(msg->CountersData));
}

////////////////////////////////////////////////////////////////////////////////

TPartNonreplCountersData TMirrorPartitionActor::ExtractPartCounters(
const TActorContext& ctx)
{
auto stats = CreatePartitionDiskCounters(
EPublishingPolicy::DiskRegistryBased,
DiagnosticsConfig->GetHistogramCounterOptions());
Expand All @@ -54,35 +64,20 @@ void TMirrorPartitionActor::SendStats(const TActorContext& ctx)
stats->Simple.IORequestsInFlight.Reset();
for (const auto& counters: ReplicaCounters) {
if (counters) {
stats->Simple.BytesCount.Value = Max(
stats->Simple.BytesCount.Value,
counters->Simple.BytesCount.Value);
stats->Simple.IORequestsInFlight.Value = Max(
stats->Simple.IORequestsInFlight.Value,
counters->Simple.IORequestsInFlight.Value);
stats->Simple.BytesCount.Value =
Max(stats->Simple.BytesCount.Value,
counters->Simple.BytesCount.Value);
stats->Simple.IORequestsInFlight.Value =
Max(stats->Simple.IORequestsInFlight.Value,
counters->Simple.IORequestsInFlight.Value);
}
}

stats->Simple.ChecksumMismatches.Value = ChecksumMismatches;
stats->Simple.ScrubbingProgress.Value =
100 * GetScrubbingRange().Start / State.GetBlockCount();
stats->Cumulative.ScrubbingThroughput.Value = ScrubbingThroughput;
auto request =
std::make_unique<TEvVolume::TEvDiskRegistryBasedPartitionCounters>(
MakeIntrusive<TCallContext>(),
std::move(stats),
DiskId,
NetworkBytes,
CpuUsage);

NetworkBytes = 0;
CpuUsage = {};
ScrubbingThroughput = 0;

NCloud::Send(
ctx,
StatActorId,
std::move(request));
stats->Cumulative.ScrubbingThroughput.Value =
std::exchange(ScrubbingThroughput, {});

const bool scrubbingEnabled =
Config->GetDataScrubbingEnabled() && !ResyncActorId;
Expand All @@ -95,6 +90,121 @@ void TMirrorPartitionActor::SendStats(const TActorContext& ctx)
std::move(Fixed),
std::move(FixedPartial));
NCloud::Send(ctx, StatActorId, std::move(scrubberCounters));

return {
.DiskCounters = std::move(stats),
.NetworkBytes = std::exchange(NetworkBytes, {}),
.CpuUsage = std::exchange(CpuUsage, {}),
};
}

void TMirrorPartitionActor::SendStats(const TActorContext& ctx)
{
if (!StatActorId) {
return;
}

auto request =
std::make_unique<TEvVolume::TEvDiskRegistryBasedPartitionCounters>(
MakeIntrusive<TCallContext>(),
DiskId,
ExtractPartCounters(ctx));

NCloud::Send(
ctx,
StatActorId,
std::move(request));

}

////////////////////////////////////////////////////////////////////////////////

void TMirrorPartitionActor::HandleGetDiskRegistryBasedPartCounters(
const TEvNonreplPartitionPrivate::
TEvGetDiskRegistryBasedPartCountersRequest::TPtr& ev,
const TActorContext& ctx)
{
if (StatisticRequestInfo) {
NCloud::Reply(
ctx,
*StatisticRequestInfo,
std::make_unique<TEvNonreplPartitionPrivate::
TEvGetDiskRegistryBasedPartCountersResponse>(
MakeError(E_REJECTED, "Mirror actor got new request"),
SelfId(),
DiskId,
TPartNonreplCountersData{}));
StatisticRequestInfo.Reset();
}

auto statActorIds = State.GetReplicaActorsBypassingProxies();

if (statActorIds.empty()) {
NCloud::Reply(
ctx,
*ev,
std::make_unique<TEvNonreplPartitionPrivate::
TEvGetDiskRegistryBasedPartCountersResponse>(
MakeError(E_INVALID_STATE, "Mirror actor hasn't replicas"),
SelfId(),
DiskId,
ExtractPartCounters(ctx)));
return;
}

StatisticRequestInfo =
CreateRequestInfo(ev->Sender, ev->Cookie, ev->Get()->CallContext);

NCloud::Register<TDiskRegistryBasedPartitionStatisticsCollectorActor>(
ctx,
SelfId(),
std::move(statActorIds));
}

void TMirrorPartitionActor::HandleDiskRegistryBasedPartCountersCombined(
const TEvNonreplPartitionPrivate::TEvDiskRegistryBasedPartCountersCombined::
TPtr& ev,
const TActorContext& ctx)
{
if (!StatisticRequestInfo) {
LOG_ERROR(
ctx,
TBlockStoreComponents::PARTITION_NONREPL,
"[%s] Failed to send mirror actor statistics due to empty "
"StatisticRequestInfo.",
DiskId.Quote().c_str());
return;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь (и не только здесь) нужно условие про HasError(msg->GetError()). Если ошибка есть,: залогировать её WARN'ом

auto* msg = ev->Get();

if(HasError(msg->Error)) {
LOG_WARN(
ctx,
TBlockStoreComponents::PARTITION_NONREPL,
"[%s] Failed to send mirror actor statistics due to error: %s",
DiskId.Quote().c_str(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogTitle.GetWithTime().c_str()

msg->Error.GetMessage().c_str());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FormatError(msg->Error).c_str()

}

for (auto& counters: msg->Counters) {
if(!counters.CountersData.DiskCounters) {
continue;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Такой код может привести к неверной статистике. Если не вызвать UpdateCounters(), то внутри деки ReplicaCounters останутся старые счетчики, которые снова учтутся при суммирровании (внутри метода ExtractPartCounters()).

UpdateCounters(ctx, counters.ActorId, std::move(counters.CountersData));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут (и не только тут) нужно сделать проверку на nullptr у счетчиков


NCloud::Reply(
ctx,
*StatisticRequestInfo,
std::make_unique<TEvNonreplPartitionPrivate::
TEvGetDiskRegistryBasedPartCountersResponse>(
msg->Error,
SelfId(),
DiskId,
ExtractPartCounters(ctx)));

StatisticRequestInfo.Reset();
}

} // namespace NCloud::NBlockStore::NStorage
Loading
Loading