Skip to content

Conversation

@lvyanquan
Copy link
Contributor

@lvyanquan lvyanquan commented Jul 24, 2025

Fix potential exception when online schema change happened.

Why did this happen?
Because when making table schema changes, the original table was not locked.
And the online schema change corresponds to a set of SQL statements, which were not processed completely before being issued. There may be a situation where the table structure does not correspond.

private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$");

private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$");
private static final Pattern RDS_OGT_TEMP_TABLE_ID_PATTERN =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to support RDS OGT later as it could not be tested now.

Optional<String> finishedTables =
RecordUtils.parseOnLineSchemaRenameEvent(event.getRecord());
if (finishedTables.isPresent()) {
TableId tableId = RecordUtils.getTableId(event.getRecord());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When execute gh-ost /gh-ost --user="root" --password="XXXX" --host="localhost" --database="dinky" --table="dinky_user" --alter="ADD COLUMN czy04 DATETIME NULL AFTER update_time" --allow-on-master --ok-to-drop-table --initially-drop-ghost-table --initially-drop-old-table --execute,Have this error schema can't remove on BinlogSplitReader. pendingSchemaChangeEvents

If have del table,RecordUtils.getTableId have ',' with del and finishedTable.That will error record to ddl remove

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2025-08-04 16:57:01,623 INFO  org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils  - Checking if DDL might be an OSC renaming event... rename /* gh-ost */ table `dinky`.`dinky_user` to `dinky`.`_dinky_user_del`
2025-08-04 16:57:01,624 INFO  org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils  - Determined the shorter TableId dinky_user is the renaming source.
2025-08-04 16:57:01,624 INFO  org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils  - Determined the longer TableId _dinky_user_del is the renaming target.
2025-08-04 16:57:01,624 INFO  org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils  - Renamed to TableId name _dinky_user_del matches OSC temporary TableId pattern, yield dinky_user.
2025-08-04 16:57:01,624 INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader  - Received the ending event of  tableId:dinky._dinky_user_del,dinky_user,finishedTableId:dinky.dinky_user
2025-08-04 16:57:01,624 INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader  - Received the ending event of table dinky.dinky_user. Emit corresponding DDL event now.
2025-08-04 16:57:01,624 INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader  - remove pendingSchemaChangeEvents:dinky.dinky_user
2025-08-04 16:57:01,626 INFO  org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils  - Checking if DDL might be an OSC renaming event... rename /* gh-ost */ table `dinky`.`_dinky_user_gho` to `dinky`.`dinky_user`
2025-08-04 16:57:01,626 INFO  org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils  - Determined the shorter TableId dinky_user is the renaming source.
2025-08-04 16:57:01,626 INFO  org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils  - Determined the longer TableId _dinky_user_del is the renaming target.
2025-08-04 16:57:01,626 INFO  org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils  - Renamed to TableId name _dinky_user_del matches OSC temporary TableId pattern, yield dinky_user.
2025-08-04 16:57:01,626 INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader  - Received the ending event of  tableId:dinky._dinky_user_del,dinky_user,finishedTableId:dinky.dinky_user
2025-08-04 16:57:01,626 INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader  - Received the ending event of table dinky.dinky_user. Emit corresponding DDL event now.
2025-08-04 16:57:01,626 ERROR org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader  - Error: met an unexpected osc finish event. Current pending events: {}, Record: DataChangeEvent [record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1754297821, file=mysql-bin.000003, pos=181321, server_id=1}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=dinky}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1754297821468,db=dinky,table=_dinky_user_del,dinky_user,server_id=1,file=mysql-bin.000003,pos=181119,row=0},historyRecord={"source":{"file":"mysql-bin.000003","pos":181119,"server_id":1},"position

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @czy006 for reporting this, will try to fix this case.

value.getStruct(Envelope.FieldName.SOURCE)
.getString(TABLE_NAME_KEY)
.split(","));
if (tableNames.size() != 2) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some quick investigation I noticed that if we use wildcard matching (.*) to capture tables or scan.binlog.newly-added-tables.enabled, shadow tables will not be included in the tableNames field and will be regarded as two independent renaming events. Maybe we need to handle this case gracefully, too?

@beryllw
Copy link
Contributor

beryllw commented Nov 13, 2025

What if a gh-ost process is interrupted after executing alter /* gh-ost */ table db._tb1_gho add column c varchar(255), and after a while another gh-ost job is initiated on the same table? Would adding a unit test to cover this scenario be useful?

May be we could add a unit test like this:
The first gh-ost change is canceled, downstream not receive the schema change event. The second gh-ost change succeeds, and downstream receives the schema change event of the second change.

@beryllw
Copy link
Contributor

beryllw commented Nov 18, 2025

Not supporting tables with a primary key id AUTO_INCREMENT: gh-ost generates two schema change DDLs as shown below. We need to emit the first one.
Additionally, we might consider adding test cases for tables with a primary key id AUTO_INCREMENT.

16598 [Source Data Fetcher for Source: MySQL Source -> SchemaOperator -> PrePartition (1/1)#0] INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader - Received the start event of online schema change: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1763457658, file=mysql-bin.000003, pos=393080, gtids=df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1088, server_id=223344}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=customer_rsjl2b}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1763457658483,db=customer_rsjl2b,table=customers,server_id=223344,gtid=df0b83c5-c45f-11f0-ac64-eab2010302a6:1089,file=mysql-bin.000003,pos=392886,row=0},historyRecord={
  "source" : {
    "file" : "mysql-bin.000003",
    "pos" : 392886,
    "server_id" : 223344
  },
  "position" : {
    "transaction_id" : null,
    "ts_sec" : 1763457658,
    "file" : "mysql-bin.000003",
    "pos" : 393080,
    "gtids" : "df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1088",
    "server_id" : 223344
  },
  "databaseName" : "customer_rsjl2b",
  "ddl" : "alter /* gh-ost */ table `customer_rsjl2b`.`customers` add column ext int first",
  "tableChanges" : [ ]
}}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. Save it for later.
16599 [Source Data Fetcher for Source: MySQL Source -> SchemaOperator -> PrePartition (1/1)#0] INFO  org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader - Received the start event of online schema change: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1763457658, file=mysql-bin.000003, pos=394428, gtids=df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1092, server_id=223344}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=customer_rsjl2b}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1763457658486,db=customer_rsjl2b,table=customers,server_id=223344,gtid=df0b83c5-c45f-11f0-ac64-eab2010302a6:1093,file=mysql-bin.000003,pos=394239,row=0},historyRecord={
  "source" : {
    "file" : "mysql-bin.000003",
    "pos" : 394239,
    "server_id" : 223344
  },
  "position" : {
    "transaction_id" : null,
    "ts_sec" : 1763457658,
    "file" : "mysql-bin.000003",
    "pos" : 394428,
    "gtids" : "df0b83c5-c45f-11f0-ac64-eab2010302a6:1-1092",
    "server_id" : 223344
  },
  "databaseName" : "customer_rsjl2b",
  "ddl" : "alter /* gh-ost */ table `customer_rsjl2b`.`customers` AUTO_INCREMENT=5699",
  "tableChanges" : [ ]
}}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. Save it for later.

Minor improvement, use List to cache multiple DDL events for online schema changes
beryllw@c621f7b

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants