Skip to content

Commit 546b5ac

Browse files
authored
ddl: speed up add index by split handle range from pd region. (#5964)
1 parent 27dc8f3 commit 546b5ac

23 files changed

+508
-97
lines changed

ddl/ddl.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,18 @@ var (
8787
errIncorrectPrefixKey = terror.ClassDDL.New(codeIncorrectPrefixKey, "Incorrect prefix key; the used key part isn't a string, the used length is longer than the key part, or the storage engine doesn't support unique prefix keys")
8888
errTooLongKey = terror.ClassDDL.New(codeTooLongKey,
8989
fmt.Sprintf("Specified key was too long; max key length is %d bytes", maxPrefixLength))
90-
errKeyColumnDoesNotExits = terror.ClassDDL.New(codeKeyColumnDoesNotExits, "this key column doesn't exist in table")
91-
errDupKeyName = terror.ClassDDL.New(codeDupKeyName, "duplicate key name")
92-
errUnknownTypeLength = terror.ClassDDL.New(codeUnknownTypeLength, "Unknown length for type tp %d")
93-
errUnknownFractionLength = terror.ClassDDL.New(codeUnknownFractionLength, "Unknown Length for type tp %d and fraction %d")
94-
errInvalidJobVersion = terror.ClassDDL.New(codeInvalidJobVersion, "DDL job with version %d greater than current %d")
95-
errFileNotFound = terror.ClassDDL.New(codeFileNotFound, "Can't find file: './%s/%s.frm'")
96-
errErrorOnRename = terror.ClassDDL.New(codeErrorOnRename, "Error on rename of './%s/%s' to './%s/%s'")
97-
errBadField = terror.ClassDDL.New(codeBadField, "Unknown column '%s' in '%s'")
98-
errInvalidUseOfNull = terror.ClassDDL.New(codeInvalidUseOfNull, "Invalid use of NULL value")
99-
errTooManyFields = terror.ClassDDL.New(codeTooManyFields, "Too many columns")
90+
errKeyColumnDoesNotExits = terror.ClassDDL.New(codeKeyColumnDoesNotExits, "this key column doesn't exist in table")
91+
errDupKeyName = terror.ClassDDL.New(codeDupKeyName, "duplicate key name")
92+
errUnknownTypeLength = terror.ClassDDL.New(codeUnknownTypeLength, "Unknown length for type tp %d")
93+
errUnknownFractionLength = terror.ClassDDL.New(codeUnknownFractionLength, "Unknown Length for type tp %d and fraction %d")
94+
errInvalidJobVersion = terror.ClassDDL.New(codeInvalidJobVersion, "DDL job with version %d greater than current %d")
95+
errFileNotFound = terror.ClassDDL.New(codeFileNotFound, "Can't find file: './%s/%s.frm'")
96+
errErrorOnRename = terror.ClassDDL.New(codeErrorOnRename, "Error on rename of './%s/%s' to './%s/%s'")
97+
errBadField = terror.ClassDDL.New(codeBadField, "Unknown column '%s' in '%s'")
98+
errInvalidUseOfNull = terror.ClassDDL.New(codeInvalidUseOfNull, "Invalid use of NULL value")
99+
errTooManyFields = terror.ClassDDL.New(codeTooManyFields, "Too many columns")
100+
errInvalidSplitRegionRanges = terror.ClassDDL.New(codeInvalidRanges, "Failed to split region ranges")
101+
errReorgWorkerNotRunnable = terror.ClassDDL.New(codeReorgWorkerNotRunnable, "reorg worker is not runnable.")
100102

101103
// errWrongKeyColumn is for table column cannot be indexed.
102104
errWrongKeyColumn = terror.ClassDDL.New(codeWrongKeyColumn, mysql.MySQLErrName[mysql.ErrWrongKeyColumn])
@@ -273,7 +275,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
273275
lease: lease,
274276
ddlJobCh: make(chan struct{}, 1),
275277
ddlJobDoneCh: make(chan struct{}, 1),
276-
reorgCtx: &reorgCtx{notifyCancelReorgJob: make(chan struct{}, 1)},
278+
reorgCtx: &reorgCtx{notifyCancelReorgJob: 0},
277279
ownerManager: manager,
278280
schemaSyncer: syncer,
279281
workerVars: variable.NewSessionVars(),
@@ -500,17 +502,19 @@ func (d *ddl) WorkerVars() *variable.SessionVars {
500502

501503
// DDL error codes.
502504
const (
503-
codeInvalidWorker terror.ErrCode = 1
504-
codeNotOwner = 2
505-
codeInvalidDDLJob = 3
506-
codeInvalidJobFlag = 5
507-
codeRunMultiSchemaChanges = 6
508-
codeWaitReorgTimeout = 7
509-
codeInvalidStoreVer = 8
510-
codeUnknownTypeLength = 9
511-
codeUnknownFractionLength = 10
512-
codeInvalidJobVersion = 11
513-
codeCancelledDDLJob = 12
505+
codeInvalidWorker terror.ErrCode = 1
506+
codeNotOwner = 2
507+
codeInvalidDDLJob = 3
508+
codeInvalidJobFlag = 5
509+
codeRunMultiSchemaChanges = 6
510+
codeWaitReorgTimeout = 7
511+
codeInvalidStoreVer = 8
512+
codeUnknownTypeLength = 9
513+
codeUnknownFractionLength = 10
514+
codeInvalidJobVersion = 11
515+
codeCancelledDDLJob = 12
516+
codeInvalidRanges = 13
517+
codeReorgWorkerNotRunnable = 14
514518

515519
codeInvalidDBState = 100
516520
codeInvalidTableState = 101

ddl/ddl_db_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,11 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
313313
var checkErr error
314314
hook := &ddl.TestDDLCallback{}
315315
first := true
316+
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
317+
// let hook.OnJobUpdatedExported has chance to cancel the job.
318+
// the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob.
319+
// After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case.
320+
ddl.ReorgWaitTimeout = 50 * time.Millisecond
316321
hook.OnJobUpdatedExported = func(job *model.Job) {
317322
addIndexNotFirstReorg := job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0
318323
// If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes.
@@ -391,6 +396,7 @@ LOOP:
391396
}
392397

393398
s.mustExec(c, "drop table t1")
399+
ddl.ReorgWaitTimeout = oldReorgWaitTimeout
394400
}
395401

396402
func (s *testDBSuite) TestAddAnonymousIndex(c *C) {

ddl/ddl_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64, err error) {
283283
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
284284
if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
285285
log.Infof("[ddl] run the cancelling DDL job %s", job)
286-
asyncNotify(d.reorgCtx.notifyCancelReorgJob)
286+
d.reorgCtx.notifyReorgCancel()
287287
} else {
288288
job.State = model.JobStateCancelled
289289
job.Error = errCancelledDDLJob

ddl/ddl_worker_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ type testDDLSuite struct{}
3737

3838
const testLease = 5 * time.Millisecond
3939

40+
func (s *testDDLSuite) SetUpSuite(c *C) {
41+
// set ReorgWaitTimeout to small value, make test to be faster.
42+
ReorgWaitTimeout = 50 * time.Millisecond
43+
}
44+
4045
func (s *testDDLSuite) TestCheckOwner(c *C) {
4146
defer testleak.AfterTest(c)()
4247
store := testCreateStore(c, "test_owner")

0 commit comments

Comments
 (0)