Hi, Krzysztof. I find that this part has been changed in PR[1] when updating the kafka client version to 3.4.0. This fix is not released yet. Maybe you can package and check it by yourself.
Best, Hang [1] https://github.com/apache/flink-connector-kafka/pull/11 Krzysztof Jankiewicz <jankiewicz.krzysz...@gmail.com> 于2023年9月10日周日 21:52写道: > Hi, > > I am currently working on a simple application that requires exactly-once > end-to-end guarantee. > > I am reading data from Kafka and writing it back to Kafka. > > When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level, > everything works fine. > Here's the relevant code: > > KafkaSink<String> sink = KafkaSink.<String>builder() > . . . > .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > . . . > .build(); > > Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I > encounter the following error during error handling (High Availability mode > in k8s):: > > Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version > at > org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266) > ~[flink-connector-kafka-1.17.1.jar:1.17.1] > . . . > Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper > > The code causing this issue is as follows > (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer): > > Object transactionManager = this.getTransactionManager(); > synchronized(transactionManager) { > Object topicPartitionBookkeeper = getField(transactionManager, > "topicPartitionBookkeeper"); > transitionTransactionManagerStateTo(transactionManager, > "INITIALIZING"); > invoke(topicPartitionBookkeeper, "reset"); > setField(transactionManager, "producerIdAndEpoch", > createProducerIdAndEpoch(producerId, epoch)); > > I am using Apache Kafka 1.17.1 and Apache Kafka Client > (org.apache.kafka:kafka-clients) 3.5.1. > I have examined the code of > org.apache.kafka.clients.producer.internals.TransactionManager, which is > used by org.apache.kafka.clients.producer.KafkaProducer. > I can see the producerIdAndEpoch field, but there is no > topicPartitionBookkeeper field. > > Could you please advise which version of KafkaProducer is compatible with > the flink-connector-kafka? And am I missing something in my configuration? > > Kind regards > Krzysztof >