Skip to content

Commit 7c5fda8

Browse files
committed
store min and max range values in checkpoint
1 parent e2f0a20 commit 7c5fda8

File tree

10 files changed

+147
-32
lines changed

10 files changed

+147
-32
lines changed

go/base/context.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ type MigrationContext struct {
195195
pointOfInterestTimeMutex *sync.Mutex
196196
lastHeartbeatOnChangelogTime time.Time
197197
lastHeartbeatOnChangelogMutex *sync.Mutex
198-
LastLockProcessedCoords mysql.BinlogCoordinates
199198
CurrentLag int64
200199
currentProgress uint64
201200
etaNanoseonds int64

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func main() {
145145
flag.StringVar(&migrationContext.TriggerSuffix, "trigger-suffix", "", "Add a suffix to the trigger name (i.e '_v2'). Requires '--include-triggers'")
146146
flag.BoolVar(&migrationContext.RemoveTriggerSuffix, "remove-trigger-suffix-if-exists", false, "Remove given suffix from name of trigger. Requires '--include-triggers' and '--trigger-suffix'")
147147
flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections")
148+
flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Use migration checkpoints")
148149

149150
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
150151
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits")

go/logic/applier.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/github/gh-ost/go/mysql"
2626
drivermysql "github.com/go-sql-driver/mysql"
2727
"github.com/openark/golib/sqlutils"
28+
"sync"
2829
)
2930

3031
const (
@@ -70,6 +71,13 @@ type Applier struct {
7071
finishedMigrating int64
7172
name string
7273

74+
CurrentCoordinatesMutex sync.Mutex
75+
CurrentCoordinates mysql.BinlogCoordinates
76+
77+
LastIterationRangeMutex sync.Mutex
78+
LastIterationRangeMinValues *sql.ColumnValues
79+
LastIterationRangeMaxValues *sql.ColumnValues
80+
7381
dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
7482
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
7583
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
@@ -415,6 +423,8 @@ func (this *Applier) CreateChangelogTable() error {
415423
}
416424

417425
// Create the checkpoint table to store the chunk copy and applier state.
426+
// There are two sets of columns with the same types as the shared unique key,
427+
// one for IterationMinValues and one for IterationMaxValues.
418428
func (this *Applier) CreateCheckpointTable() error {
419429
if err := this.DropCheckpointTable(); err != nil {
420430
return err
@@ -428,12 +438,21 @@ func (this *Applier) CreateCheckpointTable() error {
428438
if col.MySQLType == "" {
429439
return fmt.Errorf("CreateCheckpoinTable: column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name))
430440
}
431-
colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name), col.MySQLType)
441+
colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name+"_min"), col.MySQLType)
442+
if !col.Nullable {
443+
colDef += " NOT NULL"
444+
}
445+
colDefs = append(colDefs, colDef)
446+
}
447+
448+
for _, col := range this.migrationContext.UniqueKey.Columns.Columns() {
449+
colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name+"_max"), col.MySQLType)
432450
if !col.Nullable {
433451
colDef += " NOT NULL"
434452
}
435453
colDefs = append(colDefs, colDef)
436454
}
455+
437456
query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)",
438457
sql.EscapeName(this.migrationContext.DatabaseName),
439458
sql.EscapeName(this.migrationContext.GetCheckpointTableName()),
@@ -597,6 +616,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
597616
func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
598617
var insertId int64
599618
uniqueKeyArgs := sqlutils.Args(chk.IterationRangeMin.AbstractValues()...)
619+
uniqueKeyArgs = append(uniqueKeyArgs, chk.IterationRangeMax.AbstractValues()...)
600620
query, uniqueKeyArgs, err := this.checkpointInsertQueryBuilder.BuildQuery(uniqueKeyArgs)
601621
if err != nil {
602622
return insertId, err
@@ -611,11 +631,12 @@ func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
611631
}
612632

613633
func (this *Applier) ReadLastCheckpoint(chk *Checkpoint) error {
614-
row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName()))
634+
row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName()))
615635

616636
var coordStr string
617637
ptrs := []interface{}{&chk.Id, &coordStr, &chk.Iteration}
618638
ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...)
639+
ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...)
619640
err := row.Scan(ptrs...)
620641
if err != nil {
621642
if errors.Is(err, gosql.ErrNoRows) {
@@ -777,6 +798,13 @@ func (this *Applier) ReadMigrationRangeValues() error {
777798
// no further chunk to work through, i.e. we're past the last chunk and are done with
778799
// iterating the range (and thus done with copying row chunks)
779800
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
801+
this.LastIterationRangeMutex.Lock()
802+
if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil {
803+
this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone()
804+
this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone()
805+
}
806+
this.LastIterationRangeMutex.Unlock()
807+
780808
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
781809
if this.migrationContext.MigrationIterationRangeMinValues == nil {
782810
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues

go/logic/applier_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
689689
suite.Require().NoError(err)
690690

691691
// checkpoint table is empty
692-
gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2)}
692+
gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2), IterationRangeMax: sql.NewColumnValues(2)}
693693
err = applier.ReadLastCheckpoint(gotChk)
694694
suite.Require().ErrorIs(err, NoCheckpointFoundError)
695695

@@ -699,7 +699,8 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
699699

700700
chk := &Checkpoint{
701701
LastTrxCoords: coords,
702-
IterationRangeMin: applier.migrationContext.MigrationRangeMaxValues,
702+
IterationRangeMin: applier.migrationContext.MigrationRangeMinValues,
703+
IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues,
703704
Iteration: 2,
704705
}
705706
id, err := applier.WriteCheckpoint(chk)
@@ -712,6 +713,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
712713
suite.Require().Equal(chk.Iteration, gotChk.Iteration)
713714
suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String())
714715
suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String())
716+
suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String())
715717
}
716718

717719
func TestApplier(t *testing.T) {

go/logic/migrator.go

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ var (
2626
ErrMigratorUnsupportedRenameAlter = errors.New("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.")
2727
ErrMigrationNotAllowedOnMaster = errors.New("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (this reduces load from the master). To proceed please provide --allow-on-master.")
2828
RetrySleepFn = time.Sleep
29+
checkpointInterval = 10 * time.Second // 5 * time.Minute
30+
checkpointTimeout = 2 * time.Second
2931
)
3032

3133
type ChangelogState string
@@ -46,15 +48,16 @@ type tableWriteFunc func() error
4648
type applyEventStruct struct {
4749
writeFunc *tableWriteFunc
4850
dmlEvent *binlog.BinlogDMLEvent
51+
coords mysql.BinlogCoordinates
4952
}
5053

5154
func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct {
5255
result := &applyEventStruct{writeFunc: writeFunc}
5356
return result
5457
}
5558

56-
func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct {
57-
result := &applyEventStruct{dmlEvent: dmlEvent}
59+
func newApplyEventStructByDML(dmlEntry *binlog.BinlogEntry) *applyEventStruct {
60+
result := &applyEventStruct{dmlEvent: dmlEntry.DmlEvent, coords: dmlEntry.Coordinates}
5861
return result
5962
}
6063

@@ -431,6 +434,9 @@ func (this *Migrator) Migrate() (err error) {
431434
go this.iterateChunks()
432435
this.migrationContext.MarkRowCopyStartTime()
433436
go this.initiateStatus()
437+
if this.migrationContext.Checkpoint {
438+
go this.checkpointLoop()
439+
}
434440

435441
this.migrationContext.Log.Debugf("Operating until row copy is complete")
436442
this.consumeRowCopyComplete()
@@ -1086,7 +1092,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
10861092
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
10871093
len(this.applyEventsQueue), cap(this.applyEventsQueue),
10881094
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
1089-
currentBinlogCoordinates,
1095+
currentBinlogCoordinates.DisplayString(),
10901096
this.migrationContext.GetCurrentLagDuration().Seconds(),
10911097
this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(),
10921098
state,
@@ -1123,8 +1129,8 @@ func (this *Migrator) initiateStreaming() error {
11231129
false,
11241130
this.migrationContext.DatabaseName,
11251131
this.migrationContext.GetChangelogTableName(),
1126-
func(dmlEvent *binlog.BinlogDMLEvent) error {
1127-
return this.onChangelogEvent(dmlEvent)
1132+
func(dmlEntry *binlog.BinlogEntry) error {
1133+
return this.onChangelogEvent(dmlEntry.DmlEvent)
11281134
},
11291135
)
11301136

@@ -1157,8 +1163,8 @@ func (this *Migrator) addDMLEventsListener() error {
11571163
false,
11581164
this.migrationContext.DatabaseName,
11591165
this.migrationContext.OriginalTableName,
1160-
func(dmlEvent *binlog.BinlogDMLEvent) error {
1161-
this.applyEventsQueue <- newApplyEventStructByDML(dmlEvent)
1166+
func(dmlEntry *binlog.BinlogEntry) error {
1167+
this.applyEventsQueue <- newApplyEventStructByDML(dmlEntry)
11621168
return nil
11631169
},
11641170
)
@@ -1342,6 +1348,11 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
13421348
if err := this.retryOperation(applyEventFunc); err != nil {
13431349
return this.migrationContext.Log.Errore(err)
13441350
}
1351+
// update applier coordinates
1352+
this.applier.CurrentCoordinatesMutex.Lock()
1353+
this.applier.CurrentCoordinates = eventStruct.coords
1354+
this.applier.CurrentCoordinatesMutex.Unlock()
1355+
13451356
if nonDmlStructToApply != nil {
13461357
// We pulled DML events from the queue, and then we hit a non-DML event. Wait!
13471358
// We need to handle it!
@@ -1353,6 +1364,65 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
13531364
return nil
13541365
}
13551366

1367+
func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) {
1368+
coords := this.eventsStreamer.GetCurrentBinlogCoordinates()
1369+
this.applier.LastIterationRangeMutex.Lock()
1370+
if this.applier.LastIterationRangeMaxValues == nil || this.applier.LastIterationRangeMinValues == nil {
1371+
this.applier.LastIterationRangeMutex.Unlock()
1372+
return nil, errors.New("iteration range is empty, not checkpointing...")
1373+
}
1374+
chk := &Checkpoint{
1375+
IterationRangeMin: this.applier.LastIterationRangeMinValues.Clone(),
1376+
IterationRangeMax: this.applier.LastIterationRangeMaxValues.Clone(),
1377+
LastTrxCoords: coords,
1378+
}
1379+
this.applier.LastIterationRangeMutex.Unlock()
1380+
1381+
for {
1382+
select {
1383+
case <-ctx.Done():
1384+
return nil, ctx.Err()
1385+
default:
1386+
this.applier.CurrentCoordinatesMutex.Lock()
1387+
if coords.SmallerThanOrEquals(this.applier.CurrentCoordinates) {
1388+
id, err := this.applier.WriteCheckpoint(chk)
1389+
chk.Id = id
1390+
this.applier.CurrentCoordinatesMutex.Unlock()
1391+
return chk, err
1392+
}
1393+
this.applier.CurrentCoordinatesMutex.Unlock()
1394+
time.Sleep(500 * time.Millisecond)
1395+
}
1396+
}
1397+
}
1398+
1399+
func (this *Migrator) checkpointLoop() {
1400+
if this.migrationContext.Noop {
1401+
this.migrationContext.Log.Debugf("Noop operation; not really checkpointing")
1402+
return
1403+
}
1404+
ticker := time.NewTicker(checkpointInterval)
1405+
for t := range ticker.C {
1406+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
1407+
return
1408+
}
1409+
this.migrationContext.Log.Infof("starting checkpoint at %+v", t)
1410+
ctx, cancel := context.WithTimeout(context.Background(), checkpointTimeout)
1411+
chk, err := this.Checkpoint(ctx)
1412+
if err != nil {
1413+
if errors.Is(err, context.DeadlineExceeded) {
1414+
this.migrationContext.Log.Errorf("checkpoint attempt timed out after %+v", checkpointTimeout)
1415+
} else {
1416+
this.migrationContext.Log.Errorf("error attempting checkpoint: %+v", err)
1417+
}
1418+
} else {
1419+
this.migrationContext.Log.Infof("checkpoint success at coords=%+v range_min=%+v range_max=%+v",
1420+
chk.LastTrxCoords.DisplayString(), chk.IterationRangeMin.String(), chk.IterationRangeMax.String())
1421+
}
1422+
cancel()
1423+
}
1424+
}
1425+
13561426
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
13571427
// This is where the ghost table gets the data. The function fills the data single-threaded.
13581428
// Both event backlog and rowcopy events are polled; the backlog events have precedence.

go/logic/streamer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type BinlogEventListener struct {
2424
async bool
2525
databaseName string
2626
tableName string
27-
onDmlEvent func(event *binlog.BinlogDMLEvent) error
27+
onDmlEvent func(event *binlog.BinlogEntry) error
2828
}
2929

3030
const (
@@ -60,7 +60,7 @@ func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer
6060

6161
// AddListener registers a new listener for binlog events, on a per-table basis
6262
func (this *EventsStreamer) AddListener(
63-
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
63+
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogEntry) error) (err error) {
6464
this.listenersMutex.Lock()
6565
defer this.listenersMutex.Unlock()
6666

@@ -82,24 +82,24 @@ func (this *EventsStreamer) AddListener(
8282

8383
// notifyListeners will notify relevant listeners with given DML event. Only
8484
// listeners registered for changes on the table on which the DML operates are notified.
85-
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
85+
func (this *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) {
8686
this.listenersMutex.Lock()
8787
defer this.listenersMutex.Unlock()
8888

8989
for _, listener := range this.listeners {
9090
listener := listener
91-
if !strings.EqualFold(listener.databaseName, binlogEvent.DatabaseName) {
91+
if !strings.EqualFold(listener.databaseName, binlogEntry.DmlEvent.DatabaseName) {
9292
continue
9393
}
94-
if !strings.EqualFold(listener.tableName, binlogEvent.TableName) {
94+
if !strings.EqualFold(listener.tableName, binlogEntry.DmlEvent.TableName) {
9595
continue
9696
}
9797
if listener.async {
9898
go func() {
99-
listener.onDmlEvent(binlogEvent)
99+
listener.onDmlEvent(binlogEntry)
100100
}()
101101
} else {
102-
listener.onDmlEvent(binlogEvent)
102+
listener.onDmlEvent(binlogEntry)
103103
}
104104
}
105105
}
@@ -176,7 +176,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
176176
go func() {
177177
for binlogEntry := range this.eventsChannel {
178178
if binlogEntry.DmlEvent != nil {
179-
this.notifyListeners(binlogEntry.DmlEvent)
179+
this.notifyListeners(binlogEntry)
180180
}
181181
}
182182
}()

go/logic/streamer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ func (suite *EventsStreamerTestSuite) TestStreamEvents() {
9090
streamCtx, cancel := context.WithCancel(context.Background())
9191

9292
dmlEvents := make([]*binlog.BinlogDMLEvent, 0)
93-
err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogDMLEvent) error {
94-
dmlEvents = append(dmlEvents, event)
93+
err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error {
94+
dmlEvents = append(dmlEvents, event.DmlEvent)
9595

9696
// Stop once we've collected three events
9797
if len(dmlEvents) == 3 {
@@ -165,8 +165,8 @@ func (suite *EventsStreamerTestSuite) TestStreamEventsAutomaticallyReconnects()
165165
streamCtx, cancel := context.WithCancel(context.Background())
166166

167167
dmlEvents := make([]*binlog.BinlogDMLEvent, 0)
168-
err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogDMLEvent) error {
169-
dmlEvents = append(dmlEvents, event)
168+
err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error {
169+
dmlEvents = append(dmlEvents, event.DmlEvent)
170170

171171
// Stop once we've collected three events
172172
if len(dmlEvents) == 3 {

0 commit comments

Comments
 (0)