Hello all: I could really using **Flink 1.20** with **Flink CDC 3.3**, deployed on a **Kubernetes cluster**, to synchronize data from **MySQL** to **StarRocks**. Both the source table (`test.vip_record`) and the sink table (`test.ods_vip_record`) currently have **exactly the same schema**, and no DDL changes have been made during the job execution.
However, the job keeps failing after running for some time, and the logs show the following error: ``` 2025-11-14 20:32:03 java.lang.IllegalStateException: Unable to coerce data record from test.vip_record (schema: null) to test.ods_vip_record (schema: null) at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:235) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:232) at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141) at java.util.Collections$SingletonList.forEach(Collections.java:4824) at org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:750) ``` The key part is: ``` schema: null ``` which suggests that both the source and sink schemas are unexpectedly lost or not recognized during the runtime. --- ### **Additional information** * Deployment mode: **Flink on Kubernetes (Native K8s Mode)** * Source: **MySQL 5.7** * Sink: **StarRocks** * Flink CDC performs snapshot + incremental reading normally after restart * After **1–2 hours** of normal running, the job fails with the above error * No schema changes were made during this time --- ### **Questions** 1. Under what conditions would `SchemaOperator` encounter `(schema: null)` for both source and sink? 2. Could this be related to state inconsistency or schema metadata loss inside checkpoint/savepoint? 3. Is this a known issue in Flink CDC 3.3 with Flink 1.20? 4. Are there recommended configurations to ensure stable schema management for MySQL → StarRocks pipelines? Any help or guidance from the community would be greatly appreciated. Thank you!
