yuxiqian commented on PR #3364:
URL: https://github.com/apache/flink-cdc/pull/3364#issuecomment-2139108465

   Hi @hk-lrzy, I've compiled Flink CDC on your branch and tried to verify this 
patch by doing the following steps:
   
   1. Create a MySQL source with tables (`alice`, `bob`, and `chris`) in 
`fallen` DB:
   2. Submit a pipeline job defined with the following YAML:
   ```yaml
   source:
     type: mysql
     tables: fallen.\.*
     # ...
   
   sink:
     type: values
   ```
   
   3. Send 4 data records with tag 1 to 4, waiting it to be recieved by sink.
   4. Stop the job and create a savepoint file with `./bin/flink stop`.
   5. Create three more tables (`dorothy`, `eve`, and `faye`) in the same DB, 
and insert data records with tag 5 to 8.
   6. Resubmit the pipeline job again (which should capture all 6 tables now).
   
   ---
   
   Expected result: pipeline job successfully captured newly added tables.
   
   Actual result: pipeline job failed with the following exception:
   
   ```
   
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: Schema is never registered or outdated for 
table "fallen.eve"
        at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
        at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
        at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3989)
        at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4950)
        at 
org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.partitionBy(PrePartitionOperator.java:102)
        at 
org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.processElement(PrePartitionOperator.java:92)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at 
org.apache.flink.cdc.runtime.operators.schema.SchemaOperator.processElement(SchemaOperator.java:187)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
        at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:147)
        at 
java.base/java.util.Collections$SingletonList.forEach(Collections.java:4856)
        at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
        at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:120)
        at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
        at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120)
        at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
        at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.IllegalStateException: Schema is never registered or 
outdated for table "fallen.eve"
        at 
org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.loadLatestSchemaFromRegistry(PrePartitionOperator.java:123)
        at 
org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.recreateHashFunction(PrePartitionOperator.java:130)
        at 
org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.access$000(PrePartitionOperator.java:52)
        at 
org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator$1.load(PrePartitionOperator.java:140)
        at 
org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator$1.load(PrePartitionOperator.java:137)
        at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
        at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
        at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
        at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
        ... 35 more
   ```
   
   and sink never received data records from new tables.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to