Hi Krzysztof, The bundled Flink Kafka connector for 1.17 uses Kafka 3.2.3, see https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-kafka/pom.xml#L38 That's also the case for the externalized Flink Kafka connector v3.0, see https://github.com/apache/flink-connector-kafka/blob/v3.0/pom.xml#L53 The externalized Flink Kafka connector has been upgraded to use 3.4.0, but that hasn't been released yet. See https://github.com/apache/flink-connector-kafka/blob/main/pom.xml#L53 for the details.
Best regards, Martijn On Tue, Sep 12, 2023 at 7:36 AM Krzysztof Jankiewicz <jankiewicz.krzysz...@gmail.com> wrote: > > Hi, Hang. > > There have been a lot of changes made to the TransactionsManager in the Kafka > client in 2022. > (e.g. > https://github.com/apache/kafka/commit/3ea7b418fb3d7e9fc74c27751c1b02b04877f197). > > Version 3.2.3 was the last one when the TransactionsManager class contained > attributes (e.g., topicPartitionBookkeeper) referenced by > flink-connector-kafka (1.17.1). > > Thanks once again. > Krzysztof > > pon., 11 wrz 2023 o 11:24 Hang Ruan <ruanhang1...@gmail.com> napisał(a): >> >> Hi, Krzysztof. >> >> I find that this part has been changed in PR[1] when updating the kafka >> client version to 3.4.0. >> This fix is not released yet. Maybe you can package and check it by yourself. >> >> Best, >> Hang >> >> [1] https://github.com/apache/flink-connector-kafka/pull/11 >> >> Krzysztof Jankiewicz <jankiewicz.krzysz...@gmail.com> 于2023年9月10日周日 21:52写道: >>> >>> Hi, >>> >>> I am currently working on a simple application that requires exactly-once >>> end-to-end guarantee. >>> >>> I am reading data from Kafka and writing it back to Kafka. >>> >>> When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level, >>> everything works fine. >>> Here's the relevant code: >>> >>> KafkaSink<String> sink = KafkaSink.<String>builder() >>> . . . >>> .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) >>> . . . >>> .build(); >>> >>> Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I encounter >>> the following error during error handling (High Availability mode in k8s):: >>> >>> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version >>> at >>> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266) >>> ~[flink-connector-kafka-1.17.1.jar:1.17.1] >>> . . . >>> Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper >>> >>> The code causing this issue is as follows >>> (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer): >>> >>> Object transactionManager = this.getTransactionManager(); >>> synchronized(transactionManager) { >>> Object topicPartitionBookkeeper = getField(transactionManager, >>> "topicPartitionBookkeeper"); >>> transitionTransactionManagerStateTo(transactionManager, >>> "INITIALIZING"); >>> invoke(topicPartitionBookkeeper, "reset"); >>> setField(transactionManager, "producerIdAndEpoch", >>> createProducerIdAndEpoch(producerId, epoch)); >>> >>> I am using Apache Kafka 1.17.1 and Apache Kafka Client >>> (org.apache.kafka:kafka-clients) 3.5.1. >>> I have examined the code of >>> org.apache.kafka.clients.producer.internals.TransactionManager, which is >>> used by org.apache.kafka.clients.producer.KafkaProducer. >>> I can see the producerIdAndEpoch field, but there is no >>> topicPartitionBookkeeper field. >>> >>> Could you please advise which version of KafkaProducer is compatible with >>> the flink-connector-kafka? And am I missing something in my configuration? >>> >>> Kind regards >>> Krzysztof