Skip to content

Commit 713ce97

Browse files
committed
resume from checkpoint
1 parent 7c5fda8 commit 713ce97

File tree

8 files changed

+108
-59
lines changed

8 files changed

+108
-59
lines changed

go/base/context.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type MigrationContext struct {
103103
GoogleCloudPlatform bool
104104
AzureMySQL bool
105105
AttemptInstantDDL bool
106+
Resume bool
106107

107108
// SkipPortValidation allows skipping the port validation in `ValidateConnection`
108109
// This is useful when connecting to a MySQL instance where the external port
@@ -240,6 +241,7 @@ type MigrationContext struct {
240241
Iteration int64
241242
MigrationIterationRangeMinValues *sql.ColumnValues
242243
MigrationIterationRangeMaxValues *sql.ColumnValues
244+
InitialStreamerCoords mysql.BinlogCoordinates
243245
ForceTmpTableName string
244246

245247
IncludeTriggers bool

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ func main() {
146146
flag.BoolVar(&migrationContext.RemoveTriggerSuffix, "remove-trigger-suffix-if-exists", false, "Remove given suffix from name of trigger. Requires '--include-triggers' and '--trigger-suffix'")
147147
flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections")
148148
flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Use migration checkpoints")
149+
flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint")
149150

150151
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
151152
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits")

go/logic/applier.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (this *Applier) tableExists(tableName string) (tableFound bool) {
239239
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
240240
// or attempts to drop them if instructed to.
241241
func (this *Applier) ValidateOrDropExistingTables() error {
242-
if this.migrationContext.InitiallyDropGhostTable {
242+
if this.migrationContext.InitiallyDropGhostTable && !this.migrationContext.Resume {
243243
if err := this.DropGhostTable(); err != nil {
244244
return err
245245
}
@@ -431,6 +431,7 @@ func (this *Applier) CreateCheckpointTable() error {
431431
}
432432
colDefs := []string{
433433
"`gh_ost_chk_id` bigint auto_increment primary key",
434+
"`gh_ost_chk_timestamp` bigint",
434435
"`gh_ost_chk_coords` varchar(4096)",
435436
"`gh_ost_chk_iteration` bigint",
436437
}
@@ -630,26 +631,40 @@ func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
630631
return res.LastInsertId()
631632
}
632633

633-
func (this *Applier) ReadLastCheckpoint(chk *Checkpoint) error {
634+
func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
634635
row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName()))
636+
chk := &Checkpoint{
637+
IterationRangeMin: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()),
638+
IterationRangeMax: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()),
639+
}
635640

636641
var coordStr string
637-
ptrs := []interface{}{&chk.Id, &coordStr, &chk.Iteration}
642+
var timestamp int64
643+
ptrs := []interface{}{&chk.Id, &timestamp, &coordStr, &chk.Iteration}
638644
ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...)
639645
ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...)
640646
err := row.Scan(ptrs...)
641647
if err != nil {
642648
if errors.Is(err, gosql.ErrNoRows) {
643-
return NoCheckpointFoundError
649+
return nil, NoCheckpointFoundError
644650
}
645-
return err
651+
return nil, err
646652
}
647-
gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr)
648-
if err != nil {
649-
return err
653+
chk.Timestamp = time.Unix(timestamp, 0)
654+
if this.migrationContext.UseGTIDs {
655+
gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr)
656+
if err != nil {
657+
return nil, err
658+
}
659+
chk.LastTrxCoords = gtidCoords
660+
} else {
661+
fileCoords, err := mysql.ParseFileBinlogCoordinates(coordStr)
662+
if err != nil {
663+
return nil, err
664+
}
665+
chk.LastTrxCoords = fileCoords
650666
}
651-
chk.LastTrxCoords = gtidCoords
652-
return nil
667+
return chk, nil
653668
}
654669

655670
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.

go/logic/applier_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/*
22
Copyright 2022 GitHub Inc.
3-
See https://github.com/github/gh-ost/blob/master/LICENSE
3+
See https://github.com/git
4+
hub/gh-ost/blob/master/LICENSE
45
*/
56

67
package logic
@@ -689,13 +690,11 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
689690
suite.Require().NoError(err)
690691

691692
// checkpoint table is empty
692-
gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2), IterationRangeMax: sql.NewColumnValues(2)}
693-
err = applier.ReadLastCheckpoint(gotChk)
693+
_, err = applier.ReadLastCheckpoint()
694694
suite.Require().ErrorIs(err, NoCheckpointFoundError)
695695

696696
// write a checkpoint and read it back
697-
coords, err := mysql.NewGTIDBinlogCoordinates(`08dc06d7-c27c-11ea-b204-e4434b77a5ce:1-1497873603,0b4ff540-a712-11ea-9857-e4434b2a1c98:1-4315312982,19636248-246d-11e9-ab0d-0263df733a8e:1`)
698-
suite.Require().NoError(err)
697+
coords := mysql.NewFileBinlogCoordinates("mysql-bin.000003", int64(219202907))
699698

700699
chk := &Checkpoint{
701700
LastTrxCoords: coords,
@@ -707,7 +706,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
707706
suite.Require().NoError(err)
708707
suite.Require().Equal(int64(1), id)
709708

710-
err = applier.ReadLastCheckpoint(gotChk)
709+
gotChk, err := applier.ReadLastCheckpoint()
711710
suite.Require().NoError(err)
712711

713712
suite.Require().Equal(chk.Iteration, gotChk.Iteration)

go/logic/migrator.go

Lines changed: 60 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,14 @@ func (this *Migrator) Migrate() (err error) {
352352
if err := this.initiateInspector(); err != nil {
353353
return err
354354
}
355-
if err := this.initiateStreaming(); err != nil {
356-
return err
355+
// If we are resuming, we will initiateStreaming later when we know
356+
// the coordinates to resume streaming.
357+
// If not resuming, the streamer must be initiated before the applier,
358+
// so that the "GhostTableMigrated" event gets processed.
359+
if !this.migrationContext.Resume {
360+
if err := this.initiateStreaming(); err != nil {
361+
return err
362+
}
357363
}
358364
if err := this.initiateApplier(); err != nil {
359365
return err
@@ -384,9 +390,11 @@ func (this *Migrator) Migrate() (err error) {
384390
}
385391

386392
initialLag, _ := this.inspector.getReplicationLag()
387-
this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag)
388-
<-this.ghostTableMigrated
389-
this.migrationContext.Log.Debugf("ghost table migrated")
393+
if !this.migrationContext.Resume {
394+
this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag)
395+
<-this.ghostTableMigrated
396+
this.migrationContext.Log.Debugf("ghost table migrated")
397+
}
390398
// Yay! We now know the Ghost and Changelog tables are good to examine!
391399
// When running on replica, this means the replica has those tables. When running
392400
// on master this is always true, of course, and yet it also implies this knowledge
@@ -395,14 +403,33 @@ func (this *Migrator) Migrate() (err error) {
395403
return err
396404
}
397405

398-
if this.migrationContext.Checkpoint {
406+
// We can prepare some of the queries on the applier
407+
if err := this.applier.prepareQueries(); err != nil {
408+
return err
409+
}
410+
411+
// inspectOriginalAndGhostTables must be called before creating checkpoint table.
412+
if this.migrationContext.Checkpoint && !this.migrationContext.Resume {
399413
if err := this.applier.CreateCheckpointTable(); err != nil {
400414
this.migrationContext.Log.Errorf("Unable to create checkpoint table, see further error deatils.")
401415
}
416+
402417
}
403-
// We can prepare some of the queries on the applier
404-
if err := this.applier.prepareQueries(); err != nil {
405-
return err
418+
419+
if this.migrationContext.Resume {
420+
lastCheckpoint, err := this.applier.ReadLastCheckpoint()
421+
if err != nil {
422+
return this.migrationContext.Log.Errorf("No checkpoint found, unable to resume: %+v", err)
423+
}
424+
this.migrationContext.Log.Infof("Resuming from checkpoint coords=%+v range_min=%+v range_max=%+v",
425+
lastCheckpoint.LastTrxCoords, lastCheckpoint.IterationRangeMin.String(), lastCheckpoint.IterationRangeMax.String())
426+
427+
this.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin
428+
this.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax
429+
this.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords
430+
if err := this.initiateStreaming(); err != nil {
431+
return err
432+
}
406433
}
407434

408435
// Validation complete! We're good to execute this migration
@@ -1189,31 +1216,33 @@ func (this *Migrator) initiateApplier() error {
11891216
if err := this.applier.InitDBConnections(); err != nil {
11901217
return err
11911218
}
1192-
if err := this.applier.ValidateOrDropExistingTables(); err != nil {
1193-
return err
1194-
}
1195-
if err := this.applier.CreateChangelogTable(); err != nil {
1196-
this.migrationContext.Log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
1197-
return err
1198-
}
1199-
if err := this.applier.CreateGhostTable(); err != nil {
1200-
this.migrationContext.Log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
1201-
return err
1202-
}
1203-
if err := this.applier.AlterGhost(); err != nil {
1204-
this.migrationContext.Log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
1205-
return err
1206-
}
1207-
1208-
if this.migrationContext.OriginalTableAutoIncrement > 0 && !this.parser.IsAutoIncrementDefined() {
1209-
// Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override,
1210-
// so we should copy AUTO_INCREMENT value onto our ghost table.
1211-
if err := this.applier.AlterGhostAutoIncrement(); err != nil {
1212-
this.migrationContext.Log.Errorf("Unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out")
1219+
if !this.migrationContext.Resume {
1220+
if err := this.applier.ValidateOrDropExistingTables(); err != nil {
12131221
return err
12141222
}
1223+
if err := this.applier.CreateChangelogTable(); err != nil {
1224+
this.migrationContext.Log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
1225+
return err
1226+
}
1227+
if err := this.applier.CreateGhostTable(); err != nil {
1228+
this.migrationContext.Log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
1229+
return err
1230+
}
1231+
if err := this.applier.AlterGhost(); err != nil {
1232+
this.migrationContext.Log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
1233+
return err
1234+
}
1235+
1236+
if this.migrationContext.OriginalTableAutoIncrement > 0 && !this.parser.IsAutoIncrementDefined() {
1237+
// Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override,
1238+
// so we should copy AUTO_INCREMENT value onto our ghost table.
1239+
if err := this.applier.AlterGhostAutoIncrement(); err != nil {
1240+
this.migrationContext.Log.Errorf("Unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out")
1241+
return err
1242+
}
1243+
}
1244+
this.applier.WriteChangelogState(string(GhostTableMigrated))
12151245
}
1216-
this.applier.WriteChangelogState(string(GhostTableMigrated))
12171246
if err := this.applier.StateMetadataLockInstrument(); err != nil {
12181247
this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out")
12191248
return err

go/logic/streamer.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,13 @@ type EventsStreamer struct {
4949

5050
func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer {
5151
return &EventsStreamer{
52-
connectionConfig: migrationContext.InspectorConnectionConfig,
53-
migrationContext: migrationContext,
54-
listeners: [](*BinlogEventListener){},
55-
listenersMutex: &sync.Mutex{},
56-
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
57-
name: "streamer",
52+
connectionConfig: migrationContext.InspectorConnectionConfig,
53+
migrationContext: migrationContext,
54+
listeners: [](*BinlogEventListener){},
55+
listenersMutex: &sync.Mutex{},
56+
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
57+
name: "streamer",
58+
initialBinlogCoordinates: migrationContext.InitialStreamerCoords,
5859
}
5960
}
6061

@@ -114,8 +115,10 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
114115
return err
115116
}
116117
this.dbVersion = version
117-
if err := this.readCurrentBinlogCoordinates(); err != nil {
118-
return err
118+
if this.initialBinlogCoordinates == nil || this.initialBinlogCoordinates.IsEmpty() {
119+
if err := this.readCurrentBinlogCoordinates(); err != nil {
120+
return err
121+
}
119122
}
120123
if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil {
121124
return err

go/sql/builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns
123123
stmt := fmt.Sprintf(`
124124
insert /* gh-ost */
125125
into %s.%s
126-
(gh_ost_chk_coords, gh_ost_chk_iteration, %s, %s)
126+
(gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, %s, %s)
127127
values
128-
(?, ?, %s, %s)`,
128+
(unix_timestamp(now()), ?, ?, %s, %s)`,
129129
databaseName, tableName,
130130
strings.Join(minUniqueColNames, ", "),
131131
strings.Join(maxUniqueColNames, ", "),

localtests/test.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ sysbench_prepare() {
151151
--mysql-password=opensesame \
152152
--mysql-db=test \
153153
--tables=1 \
154-
--table-size=20000 \
154+
--table-size=100000 \
155155
prepare
156156
}
157157

@@ -254,7 +254,6 @@ test_single() {
254254

255255
table_name="gh_ost_test"
256256
ghost_table_name="_gh_ost_test_gho"
257-
trap cleanup EXIT INT TERM
258257
# test with sysbench oltp write load
259258
if [[ "$test_name" == "sysbench" ]]; then
260259
if ! command -v sysbench &>/dev/null; then
@@ -273,6 +272,7 @@ test_single() {
273272
echo -n "Started sysbench (PID $sysbench_pid): "
274273
echo $load_cmd
275274
fi
275+
trap cleanup SIGINT
276276

277277
#
278278
cmd="GOTRACEBACK=crash $ghost_binary \

0 commit comments

Comments
 (0)