Skip to content

Commit 0fdd255

Browse files
authored
metrics: clean up metrics when a store is removed from the cluster (#1751)
fix pingcap/tidb#63245 Signed-off-by: zyguan <[email protected]>
1 parent f3cd192 commit 0fdd255

File tree

5 files changed

+294
-110
lines changed

5 files changed

+294
-110
lines changed

internal/client/client.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,3 +743,56 @@ func (m *rpcMetrics) get(cmd tikvrpc.CmdType, stale bool, internal bool) prometh
743743
}
744744
return lat.(prometheus.Observer)
745745
}
746+
747+
type storeMetrics struct {
748+
storeID uint64
749+
rpcLatHist *rpcMetrics
750+
rpcSrcLatSum sync.Map
751+
rpcNetLatExternal prometheus.Observer
752+
rpcNetLatInternal prometheus.Observer
753+
}
754+
755+
func newStoreMetrics(storeID uint64) *storeMetrics {
756+
store := strconv.FormatUint(storeID, 10)
757+
m := &storeMetrics{
758+
storeID: storeID,
759+
rpcLatHist: deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: store})),
760+
rpcNetLatExternal: metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(store, "false"),
761+
rpcNetLatInternal: metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(store, "true"),
762+
}
763+
return m
764+
}
765+
766+
func (m *storeMetrics) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) {
767+
seconds := latency.Seconds()
768+
stale := req.GetStaleRead()
769+
source := req.GetRequestSource()
770+
internal := util.IsInternalRequest(req.GetRequestSource())
771+
772+
m.rpcLatHist.get(req.Type, stale, internal).Observe(seconds)
773+
774+
srcLatSum, ok := m.rpcSrcLatSum.Load(source)
775+
if !ok {
776+
srcLatSum = deriveRPCMetrics(metrics.TiKVSendReqBySourceSummary.MustCurryWith(
777+
prometheus.Labels{metrics.LblStore: strconv.FormatUint(m.storeID, 10), metrics.LblSource: source}))
778+
m.rpcSrcLatSum.Store(source, srcLatSum)
779+
}
780+
srcLatSum.(*rpcMetrics).get(req.Type, stale, internal).Observe(seconds)
781+
782+
if execDetail := resp.GetExecDetailsV2(); execDetail != nil {
783+
var totalRpcWallTimeNs uint64
784+
if execDetail.TimeDetailV2 != nil {
785+
totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs
786+
} else if execDetail.TimeDetail != nil {
787+
totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs
788+
}
789+
if totalRpcWallTimeNs > 0 {
790+
lat := latency - time.Duration(totalRpcWallTimeNs)
791+
if internal {
792+
m.rpcNetLatInternal.Observe(lat.Seconds())
793+
} else {
794+
m.rpcNetLatExternal.Observe(lat.Seconds())
795+
}
796+
}
797+
}
798+
}

internal/client/conn_pool.go

Lines changed: 11 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ import (
2323

2424
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
2525
"github.com/pkg/errors"
26-
"github.com/prometheus/client_golang/prometheus"
2726
"github.com/tikv/client-go/v2/config"
2827
tikverr "github.com/tikv/client-go/v2/error"
29-
"github.com/tikv/client-go/v2/metrics"
3028
"github.com/tikv/client-go/v2/tikvrpc"
31-
"github.com/tikv/client-go/v2/util"
3229
"google.golang.org/grpc"
3330
"google.golang.org/grpc/backoff"
3431
"google.golang.org/grpc/credentials"
@@ -55,12 +52,7 @@ type connPool struct {
5552

5653
monitor *connMonitor
5754

58-
metrics struct {
59-
rpcLatHist *rpcMetrics
60-
rpcSrcLatSum sync.Map
61-
rpcNetLatExternal prometheus.Observer
62-
rpcNetLatInternal prometheus.Observer
63-
}
55+
metrics atomic.Pointer[storeMetrics]
6456
}
6557

6658
func newConnPool(maxSize uint, addr string, ver uint64, security config.Security,
@@ -74,9 +66,6 @@ func newConnPool(maxSize uint, addr string, ver uint64, security config.Security
7466
dialTimeout: dialTimeout,
7567
monitor: m,
7668
}
77-
a.metrics.rpcLatHist = deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: addr}))
78-
a.metrics.rpcNetLatExternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "false")
79-
a.metrics.rpcNetLatInternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "true")
8069
if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil {
8170
return nil, err
8271
}
@@ -221,35 +210,15 @@ func (a *connPool) Close() {
221210
}
222211

223212
func (a *connPool) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) {
224-
seconds := latency.Seconds()
225-
stale := req.GetStaleRead()
226-
source := req.GetRequestSource()
227-
internal := util.IsInternalRequest(req.GetRequestSource())
228-
229-
a.metrics.rpcLatHist.get(req.Type, stale, internal).Observe(seconds)
230-
231-
srcLatSum, ok := a.metrics.rpcSrcLatSum.Load(source)
232-
if !ok {
233-
srcLatSum = deriveRPCMetrics(metrics.TiKVSendReqSummary.MustCurryWith(
234-
prometheus.Labels{metrics.LblStore: a.target, metrics.LblSource: source}))
235-
a.metrics.rpcSrcLatSum.Store(source, srcLatSum)
236-
}
237-
srcLatSum.(*rpcMetrics).get(req.Type, stale, internal).Observe(seconds)
238-
239-
if execDetail := resp.GetExecDetailsV2(); execDetail != nil {
240-
var totalRpcWallTimeNs uint64
241-
if execDetail.TimeDetailV2 != nil {
242-
totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs
243-
} else if execDetail.TimeDetail != nil {
244-
totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs
245-
}
246-
if totalRpcWallTimeNs > 0 {
247-
lat := latency - time.Duration(totalRpcWallTimeNs)
248-
if internal {
249-
a.metrics.rpcNetLatInternal.Observe(lat.Seconds())
250-
} else {
251-
a.metrics.rpcNetLatExternal.Observe(lat.Seconds())
252-
}
253-
}
213+
m := a.metrics.Load()
214+
storeID := req.Context.GetPeer().GetStoreId()
215+
if m == nil || m.storeID != storeID {
216+
// The client selects a connPool by addr via RPCClient.getConnPool, so it's possible that the storeID of the
217+
// selected connPool is not the same as the storeID in req.Context. We need to create a new storeMetrics for the
218+
// new storeID. Note that connPool.metrics just works as a cache, the metric data is stored in corresponding
219+
// MetricVec, so it's ok to overwrite it here.
220+
m = newStoreMetrics(storeID)
221+
a.metrics.Store(m)
254222
}
223+
m.updateRPCMetrics(req, resp, latency)
255224
}

internal/locate/region_cache.go

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,11 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *router.Region) (*R
353353
if err != nil {
354354
return nil, err
355355
}
356+
357+
if !exists {
358+
updateStoreLivenessGauge(store)
359+
}
360+
356361
// Filter out the peer on a tombstone or down store.
357362
if addr == "" || slices.ContainsFunc(pdRegion.DownPeers, func(dp *metapb.Peer) bool { return isSamePeer(dp, p) }) {
358363
continue
@@ -749,47 +754,11 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
749754
// cache GC is incompatible with cache refresh
750755
c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval)
751756
}
752-
c.bg.schedule(
753-
func(ctx context.Context, _ time.Time) bool {
754-
refreshFullStoreList(ctx, c.stores)
755-
return false
756-
}, refreshStoreListInterval,
757-
)
757+
updater := &storeCacheUpdater{stores: c.stores}
758+
c.bg.schedule(updater.tick, refreshStoreListInterval)
758759
return c
759760
}
760761

761-
// Try to refresh full store list. Errors are ignored.
762-
func refreshFullStoreList(ctx context.Context, stores storeCache) {
763-
storeList, err := stores.fetchAllStores(ctx)
764-
if err != nil {
765-
logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err))
766-
return
767-
}
768-
for _, store := range storeList {
769-
_, exist := stores.get(store.GetId())
770-
if exist {
771-
continue
772-
}
773-
// GetAllStores is supposed to return only Up and Offline stores.
774-
// This check is being defensive and to make it consistent with store resolve code.
775-
if store == nil || store.GetState() == metapb.StoreState_Tombstone {
776-
continue
777-
}
778-
addr := store.GetAddress()
779-
if addr == "" {
780-
continue
781-
}
782-
s := stores.getOrInsertDefault(store.GetId())
783-
// TODO: maybe refactor this, together with other places initializing Store
784-
s.addr = addr
785-
s.peerAddr = store.GetPeerAddress()
786-
s.saddr = store.GetStatusAddress()
787-
s.storeType = tikvrpc.GetStoreTypeByMeta(store)
788-
s.labels = store.GetLabels()
789-
s.changeResolveStateTo(unresolved, resolved)
790-
}
791-
}
792-
793762
// only used fot test.
794763
func newTestRegionCache() *RegionCache {
795764
c := &RegionCache{}
@@ -2760,6 +2729,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
27602729
const cleanCacheInterval = time.Second
27612730
const cleanRegionNumPerRound = 50
27622731
const refreshStoreListInterval = 10 * time.Second
2732+
const cleanStoreMetricsInterval = time.Minute
27632733

27642734
// gcScanItemHook is only used for testing
27652735
var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)])

0 commit comments

Comments
 (0)