yuxiqian commented on PR #3364: URL: https://github.com/apache/flink-cdc/pull/3364#issuecomment-2139108465
Hi @hk-lrzy, I've compiled Flink CDC on your branch and tried to verify this patch by doing the following steps: 1. Create a MySQL source with tables (`alice`, `bob`, and `chris`) in `fallen` DB: 2. Submit a pipeline job defined with the following YAML: ```yaml source: type: mysql tables: fallen.\.* # ... sink: type: values ``` 3. Send 4 data records with tag 1 to 4, waiting it to be recieved by sink. 4. Stop the job and create a savepoint file with `./bin/flink stop`. 5. Create three more tables (`dorothy`, `eve`, and `faye`) in the same DB, and insert data records with tag 5 to 8. 6. Resubmit the pipeline job again (which should capture all 6 tables now). --- Expected result: pipeline job successfully captured newly added tables. Actual result: pipeline job failed with the following exception: ``` org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Schema is never registered or outdated for table "fallen.eve" at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3989) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4950) at org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.partitionBy(PrePartitionOperator.java:102) at org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.processElement(PrePartitionOperator.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.cdc.runtime.operators.schema.SchemaOperator.processElement(SchemaOperator.java:187) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:147) at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4856) at org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:120) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalStateException: Schema is never registered or outdated for table "fallen.eve" at org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.loadLatestSchemaFromRegistry(PrePartitionOperator.java:123) at org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.recreateHashFunction(PrePartitionOperator.java:130) at org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.access$000(PrePartitionOperator.java:52) at org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator$1.load(PrePartitionOperator.java:140) at org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator$1.load(PrePartitionOperator.java:137) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049) ... 35 more ``` and sink never received data records from new tables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org