Skip to content
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b13f4be
[pick-to-7.5] resource_control: add ru details in ExecDetails (#1070)…
nolouch Feb 18, 2024
fa310f8
add a helper function for new ru details (#1078) (#1159)
nolouch Feb 19, 2024
98ed21b
[Cherry-pick tidb-7.5] Check kill signals (#1158)
ekexium Feb 19, 2024
b4e2443
[Cherry-pick-to-7.5] Add grpc window size config and change the defau…
gengliqi Mar 21, 2024
2de9b7d
cherry pick #1127 to fix #1111 (#1324)
zyguan Apr 28, 2024
6aedd99
pick #1250 to tidb-7.5 (#1325)
zyguan Apr 29, 2024
e7f530c
save work (#1108) (#1334)
windtalker May 10, 2024
19ed51b
region_cache: cherry-pick #1262 to tidb-7.5 (#1347)
zyguan May 21, 2024
f41b068
tikvrpc: avoid data race on `XxxRequest.Context` (#1242) (#1351)
zyguan May 28, 2024
7a74511
fix rpc client panic cause by concurrent close (#1358)
crazycs520 May 31, 2024
199ac6f
region cache: check if the pd returned regions covers the ranges (#13…
you06 Jul 10, 2024
2cd3a74
Fix the problem that statement being oom-killed within DoneAggressive…
MyonKeminta Jul 16, 2024
73815ad
fix issue that store's liveness may incorrectly marked as unreachable…
crazycs520 Aug 7, 2024
f0e74f9
region_request: remove backoff for stale read (#1423) (#1428)
you06 Aug 21, 2024
491e9b1
OWNERS: Auto Sync OWNERS files from community membership (#1447)
ti-chi-bot Aug 27, 2024
c9c69f1
OWNERS: Auto Sync OWNERS files from community membership (#1425) (#1441)
ti-chi-bot Aug 27, 2024
bc6139e
improve region request log for diagnose #1300 (#1452)
crazycs520 Aug 28, 2024
db90b0d
memdb: fix memdb snapshot get/iter is not actually snapshot (#1393) (…
you06 Sep 18, 2024
3725b31
[pick-7.5]region_request: ignore resource group errors that not relat…
nolouch Sep 20, 2024
5ce6bf1
membuffer: fix memory leak in red-black tree (#1483) (#1498)
you06 Nov 25, 2024
0dc4129
Cherry pick bugfix #1054 & #1484 to tidb-7.5 branch (#1515)
MyonKeminta Dec 12, 2024
c1b98e6
Try to validate read ts for all RPC requests (#1513) (#1548)
you06 Jan 15, 2025
f90c6c6
(tidb-7.5) Do not set txn scope in snapshot (#1570)
ekexium Feb 13, 2025
d561c89
(tidb-7.5) Use global txn scope when validating ts (#1581)
ekexium Feb 18, 2025
d4383a9
(tidb-7.5) Validate ts only for stale read (#1594)
ekexium Feb 26, 2025
126eab4
merge from tidb-7.5
zeminzhou Jun 18, 2025
d2a2a19
fix lint
zeminzhou Jun 18, 2025
fef099e
fix lint
zeminzhou Jun 18, 2025
32b8794
fix lint
zeminzhou Jun 18, 2025
17c7172
skip test
zeminzhou Jun 18, 2025
e7559d4
fix it
zeminzhou Jun 18, 2025
e821dc3
skip test
zeminzhou Jun 18, 2025
fab6249
add it back
zeminzhou Jun 18, 2025
36824aa
add it back
zeminzhou Jun 18, 2025
2ff5e78
Merge branch 'tidb-cse-7.5' into rebase-tidb-7.5-2
zeminzhou Jun 20, 2025
4024592
Merge branch 'tidb-cse-7.5' into rebase-tidb-7.5-2
zeminzhou Jul 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ jobs:
go generate ./...
git diff --exit-code

- name: Go generate and check diff
run: |
go generate ./...
git diff --exit-code

- name: Lint
uses: golangci/golangci-lint-action@v3
with:
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ linters:
disable-all: true
enable:
- bodyclose
#- depguard
- exportloopref
- gofmt
- goimports
Expand Down
1 change: 0 additions & 1 deletion examples/txnkv/async_commit/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ require (
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240509073708-a674e668f8a5 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/v3 v3.5.10 // indirect
Expand Down
1 change: 0 additions & 1 deletion examples/txnkv/delete_range/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ require (
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240509073708-a674e668f8a5 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/v3 v3.5.10 // indirect
Expand Down
2 changes: 0 additions & 2 deletions examples/txnkv/unsafedestoryrange/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
Expand All @@ -30,7 +29,6 @@ require (
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240509073708-a674e668f8a5 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/v3 v3.5.10 // indirect
Expand Down
7 changes: 5 additions & 2 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ require (
go.uber.org/goleak v1.3.0
)

require (
github.com/dolthub/maphash v0.1.0 // indirect
github.com/dolthub/swiss v0.2.1 // indirect
)

require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
Expand All @@ -39,8 +44,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/dolthub/swiss v0.2.1 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
github.com/go-ldap/ldap/v3 v3.4.4 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions integration_tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"), // TODO: fix ttlManager goroutine leak
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,15 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == 0 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the store's min resolved ts is not regarded as MaxUint64.
require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Fallback to KV Request when PD server not support get min resolved ts.
Expand Down
11 changes: 6 additions & 5 deletions integration_tests/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,13 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe() {

func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
reqStats := tikv.NewRegionRequestRuntimeStats()
tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second)
tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond)
reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second)
reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond)
snapshot := s.store.GetSnapshot(0)
snapshot.SetRuntimeStats(&txnkv.SnapshotRuntimeStats{})
snapshot.MergeRegionRequestStats(reqStats.Stats)
snapshot.MergeRegionRequestStats(reqStats.Stats)
runtimeStats := &txnkv.SnapshotRuntimeStats{}
snapshot.SetRuntimeStats(runtimeStats)
snapshot.MergeRegionRequestStats(reqStats)
snapshot.MergeRegionRequestStats(reqStats)
bo := tikv.NewBackofferWithVars(context.Background(), 2000, nil)
err := bo.BackoffWithMaxSleepTxnLockFast(5, errors.New("test"))
s.Nil(err)
Expand Down
86 changes: 70 additions & 16 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,14 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
util.HexRegionKeyStr(c.codec.EncodeRegionKey(startKey)), util.HexRegionKeyStr(c.codec.EncodeRegionKey(endKey)),
)
}

if regionsHaveGapInRange(startKey, endKey, regionsInfo, limit) {
backoffErr = errors.Errorf(
"PD returned regions have gaps, startKey: %q, endKey: %q, limit: %d",
startKey, endKey, limit,
)
continue
}
regions := make([]*Region, 0, len(regionsInfo))
for _, r := range regionsInfo {
// Leader id = 0 indicates no leader.
Expand All @@ -1849,13 +1857,54 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
return nil, errors.New("receive Regions with no peer")
}
if len(regions) < len(regionsInfo) {
logutil.Logger(context.Background()).Debug(
logutil.Logger(context.Background()).Warn(
"regionCache: scanRegion finished but some regions has no leader.")
}
return regions, nil
}
}

// regionsHaveGapInRange checks if the loaded regions can fully cover the key ranges.
// If there are any gaps between the regions, it returns true, then the requests might be retried.
// TODO: remove this function after PD client supports gap detection and handling it.
func regionsHaveGapInRange(start, end []byte, regionsInfo []*router.Region, limit int) bool {
if len(regionsInfo) == 0 {
return true
}
var lastEndKey []byte
for i, r := range regionsInfo {
if r.Meta == nil {
return true
}
if i == 0 {
if bytes.Compare(r.Meta.StartKey, start) > 0 {
// there is a gap between first returned region's start_key and start key.
return true
}
}
if i > 0 && bytes.Compare(r.Meta.StartKey, lastEndKey) > 0 {
// there is a gap between two regions.
return true
}
if len(r.Meta.EndKey) == 0 {
// the current region contains all the rest ranges.
return false
}
// note lastEndKey never be empty.
lastEndKey = r.Meta.EndKey
}
if limit > 0 && len(regionsInfo) == limit {
// the regionsInfo is limited by the limit, so there may be some ranges not covered.
// The rest range will be loaded in the next scanRegions call.
return false
}
if len(end) == 0 {
// the end key of the range is empty, but we can't cover it.
return true
}
return bytes.Compare(lastEndKey, end) < 0
}

// GetCachedRegionWithRLock returns region with lock.
func (c *RegionCache) GetCachedRegionWithRLock(regionID RegionVerID) (r *Region) {
c.mu.RLock()
Expand Down Expand Up @@ -2650,14 +2699,25 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
newStore.unreachableSince = s.unreachableSince
if newStore.getLivenessState() != reachable {
newStore.unreachableSince = s.unreachableSince
go newStore.checkUntilHealth(c, newStore.getLivenessState(), storeReResolveInterval)
}
c.storeMu.Lock()
if s.addr == addr {
newStore.slowScore = s.slowScore
}
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
s.setResolveState(deleted)
logutil.BgLogger().Info("store address or labels changed, add new store and mark old store deleted",
zap.Uint64("store", s.storeID),
zap.String("old-addr", s.addr),
zap.Any("old-labels", s.labels),
zap.String("old-liveness", s.getLivenessState().String()),
zap.String("new-addr", newStore.addr),
zap.Any("new-labels", newStore.labels),
zap.String("new-liveness", newStore.getLivenessState().String()))
return false, nil
}
s.changeResolveStateTo(needCheck, resolved)
Expand Down Expand Up @@ -2809,6 +2869,8 @@ func (s livenessState) String() string {
}
}

var storeReResolveInterval = 30 * time.Second

func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
// This mechanism doesn't support non-TiKV stores currently.
if s.storeType != tikvrpc.TiKV {
Expand All @@ -2820,7 +2882,7 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt
// It may be already started by another thread.
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
s.unreachableSince = time.Now()
reResolveInterval := 30 * time.Second
reResolveInterval := storeReResolveInterval
if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil {
if dur, err := time.ParseDuration(val.(string)); err == nil {
reResolveInterval = dur
Expand Down Expand Up @@ -2848,26 +2910,18 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol
case <-c.ctx.Done():
return
case <-ticker.C:
if s.getResolveState() == deleted {
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String()))
return
}
if time.Since(lastCheckPDTime) > reResolveInterval {
lastCheckPDTime = time.Now()

valid, err := s.reResolve(c)
if err != nil {
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
} else if !valid {
if s.getResolveState() == deleted {
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
c.storeMu.RLock()
newStore := c.storeMu.stores[s.storeID]
c.storeMu.RUnlock()
logutil.BgLogger().Info("[health check] store meta changed",
zap.Uint64("storeID", s.storeID),
zap.String("oldAddr", s.addr),
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
zap.String("newAddr", newStore.addr),
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
go newStore.checkUntilHealth(c, liveness, reResolveInterval)
}
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String()))
return
}
}
Expand Down
Loading
Loading