linqigeng created FLINK-37537: --------------------------------- Summary: Duplicate key when job failover in SchemaOperator Key: FLINK-37537 URL: https://issues.apache.org/jira/browse/FLINK-37537 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.3.0 Reporter: linqigeng
When SchemaOperator receives SchameChangeEvent, if exceptions occur and cause failover, the latest CreateTableEvent will be automatically issued when the job is restored from the previous checkpoint. When the AddColumnEvent is processed in `SchemaMergingUtils#coerceRow` later would cause `Duplicate key` exception. {code:java} java.lang.IllegalStateException: Duplicate key not_show (attempted merging values TINYINT and TINYINT) at java.base/java.util.stream.Collectors.duplicateKeyException(Unknown Source) at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Unknown Source) at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source) at java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) at org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow(SchemaMergingUtils.java:270) at org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow(SchemaMergingUtils.java:253) at org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator.coerceDataRecord(SchemaDerivator.java:334) at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:227) at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:147) at java.base/java.util.Collections$SingletonList.forEach(Unknown Source) at org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:120) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:121) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:421) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)