linqigeng created FLINK-37537:
---------------------------------

             Summary: Duplicate key when job failover in SchemaOperator
                 Key: FLINK-37537
                 URL: https://issues.apache.org/jira/browse/FLINK-37537
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.3.0
            Reporter: linqigeng


When SchemaOperator receives SchameChangeEvent, if exceptions occur and cause 
failover, the latest CreateTableEvent will be automatically issued when the job 
is restored from the previous checkpoint.  When the AddColumnEvent is processed 
in `SchemaMergingUtils#coerceRow` later would cause `Duplicate key` exception.
{code:java}
java.lang.IllegalStateException: Duplicate key not_show (attempted merging 
values TINYINT and TINYINT)
    at java.base/java.util.stream.Collectors.duplicateKeyException(Unknown 
Source)
    at 
java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Unknown 
Source)
    at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source)
    at java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(Unknown 
Source)
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
Source)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
Source)
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
    at 
org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow(SchemaMergingUtils.java:270)
    at 
org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow(SchemaMergingUtils.java:253)
    at 
org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator.coerceDataRecord(SchemaDerivator.java:334)
    at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:227)
    at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
    at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:147)
    at java.base/java.util.Collections$SingletonList.forEach(Unknown Source)
    at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
    at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:120)
    at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
    at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:121)
    at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
    at 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:421)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.base/java.lang.Thread.run(Unknown Source) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to