Hello all:

I could really using **Flink 1.20** with **Flink CDC 3.3**, deployed on a
**Kubernetes cluster**, to synchronize data from **MySQL** to
**StarRocks**. Both the source table (`test.vip_record`) and the sink table
(`test.ods_vip_record`) currently have **exactly the same schema**, and no
DDL changes have been made during the job execution.

However, the job keeps failing after running for some time, and the logs
show the following error:

```
2025-11-14 20:32:03
java.lang.IllegalStateException: Unable to coerce data record from
test.vip_record (schema: null) to test.ods_vip_record (schema: null)
  at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:235)
  at java.util.Optional.orElseThrow(Optional.java:290)
  at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:232)
  at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
  at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
  at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
  at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
  at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
  at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
  at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
  at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141)
  at java.util.Collections$SingletonList.forEach(Collections.java:4824)
  at
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
  at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119)
  at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
  at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120)
  at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
  at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
  at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
  at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
  at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
  at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
  at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
  at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
  at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
  at java.lang.Thread.run(Thread.java:750)
```

The key part is:

```
schema: null
```

which suggests that both the source and sink schemas are unexpectedly lost
or not recognized during the runtime.

---

### **Additional information**

* Deployment mode: **Flink on Kubernetes (Native K8s Mode)**
* Source: **MySQL 5.7**
* Sink: **StarRocks**
* Flink CDC performs snapshot + incremental reading normally after restart
* After **1–2 hours** of normal running, the job fails with the above error
* No schema changes were made during this time

---

### **Questions**

1. Under what conditions would `SchemaOperator` encounter `(schema: null)`
for both source and sink?
2. Could this be related to state inconsistency or schema metadata loss
inside checkpoint/savepoint?
3. Is this a known issue in Flink CDC 3.3 with Flink 1.20?
4. Are there recommended configurations to ensure stable schema management
for MySQL → StarRocks pipelines?

Any help or guidance from the community would be greatly appreciated.
Thank you!

Reply via email to