cdc with pipeline-transform failed because schema issue, is this a problem? #4163
Unanswered
liuyq2step
asked this question in
General
Replies: 1 comment 10 replies
-
|
Seems your upstream table doesn't hit that transform rule. Could you please try this? transform:
- - source-table: t_flinktest
+ - source-table: public.t_flinktest
projection: id,txt,CAST(dt as VARCHAR) as dt
|
Beta Was this translation helpful? Give feedback.
10 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment

Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
test postgres db table with a timestamptz column , pipeline yaml is
`
source:
scan.startup.mode: committed-offset
....
transform:
projection: id,txt,CAST(dt as VARCHAR) as dt
--projection: id,txt,DATE_FORMAT(dt , ‘yyyy-MM-dd hh:mi:ss’) as dt
--projection: id,txt,CAST(id as VARCHAR) as dt
....
pipeline:
name: test PGSQL to StarRocks Pieline
schema.change.behavior: ignore
parallelism: 1
`
because starrocks dont have type that fit timetamptz, starrocks connector throws exception.
`
2025-10-24 13:47:37,065 ERROR org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry [] - Current schema manager state: Schema Manager 427242635:
original schema map:
- table public.t_flinktest: {public.t_flinktest={0=columns={id BIGINT NOT NULL,txt VARCHAR(255),dt TIMESTAMP(6) WITH TIME ZONE now()}, primaryKeys=id, options=()}}
evolved schema map:
2025-10-24 13:47:37,071 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Transform:Data -> SchemaOperator -> PrePartition' (operator 9899a42c64d67ef3172b7e3be3c1bbb9).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry.failJob(SchemaRegistry.java:392) ~[?:?]
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.lambda$startSchemaChangesEvolve$1(SchemaCoordinator.java:249) ~[?:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:842) ~[?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to apply schema change event.
... 6 more
Caused by: java.lang.UnsupportedOperationException: Unsupported CDC data type TIMESTAMP(6) WITH TIME ZONE
at org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils$CdcDataTypeTransformer.defaultMethod(StarRocksUtils.java:388) ~[?:?]
at org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils$CdcDataTypeTransformer.defaultMethod(StarRocksUtils.java:241) ~[?:?]
at org.apache.flink.cdc.common.types.DataTypeDefaultVisitor.visit(DataTypeDefaultVisitor.java:105) ~[?:?]
at org.apache.flink.cdc.common.types.ZonedTimestampType.accept(ZonedTimestampType.java:110) ~[?:?]
at org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType(StarRocksUtils.java:119) ~[?:?]
at org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksTable(StarRocksUtils.java:93) ~[?:?]
at org.apache.flink.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applyCreateTable(StarRocksMetadataApplier.java:138) ~[?:?]
at org.apache.flink.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.lambda$applySchemaChange$2(StarRocksMetadataApplier.java:115) ~[?:?]
at org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor.visit(SchemaChangeEventVisitor.java:57) ~[?:?]
at org.apache.flink.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applySchemaChange(StarRocksMetadataApplier.java:104) ~[?:?]
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.applyAndUpdateEvolvedSchemaChange(SchemaCoordinator.java:437) ~[?:?]
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.applySchemaChange(SchemaCoordinator.java:406) ~[?:?]
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.lambda$startSchemaChangesEvolve$1(SchemaCoordinator.java:247) ~[?:?]
... 5 more
`
is this a problem?
Beta Was this translation helpful? Give feedback.
All reactions