[ 
https://issues.apache.org/jira/browse/FLINK-35720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Ferrario closed FLINK-35720.
------------------------------------
    Resolution: Not A Problem

> flink-connector-kafka fails to restore from checkpoint when using an 
> EXACTLY_ONCE sink
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-35720
>                 URL: https://issues.apache.org/jira/browse/FLINK-35720
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>         Environment: Flink 1.17, 1.18, 1.19
> Kubernetes
> flink-connector-kafka 3.1, 3.2
>            Reporter: Nicolas Ferrario
>            Priority: Critical
>
> Hi team, I noticed that the Kafka Client was changed from version 3.2.4 to to 
> 3.4.0 in Flink Connector Kafka 3.1.
> This introduced a bug that prevents Flink from restoring from a checkpoint 
> due to the usage of Reflection to get field {{txnPartitionMap}} when a 
> Producer with Exactly Once semantics is used.
> Another side-effect is that a downgrade from 3.1 to 3.0.2 (the last working 
> version) is not possible, as there was a version increase (from 1 to 2) in 
> the Source state serializer, effectively forcing a full bootstrap losing all 
> state.
> *All flink-connector-kafka versions starting from 3.1 are affected.*
> How to reproduce:
>  # Start a pipeline with an EXACTLY ONCE Kafka Sink
>  # Wait for a checkpoint
>  # Stop the and restore from checkpoint
>  # The sink fails to restore state, causing an exception
> The code-line causing the exception: 
> [https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L310]
> POM 3.0: https://github.com/apache/flink-connector-kafka/blob/v3.0/pom.xml#L53
> POM 3.1: https://github.com/apache/flink-connector-kafka/blob/v3.1/pom.xml#L54
> Exception:
> {quote}2024-06-18 19:48:17
> java.lang.IllegalStateException: Failed to commit KafkaCommittable
> Unknown macro: \{producerId=67946, epoch=0, 
> transactionalId=smsc-toolbox-enrichment-toolbox-sms-0-438}
> at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
> at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
> at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
> at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
> at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
> at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
> at 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:285)
> at 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:272)
> at 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:310)
> at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:143)
> at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:72)
> at java.base/java.util.Optional.orElseGet(Unknown Source)
> at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:72)
> ... 16 more
> Caused by: java.lang.NoSuchFieldException: txnPartitionMap
> at java.base/java.lang.Class.getDeclaredField(Unknown Source)
> at 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:281)
> ... 22 more
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to