Hi,
I am currently working on a simple application that requires exactly-once
end-to-end guarantee.
I am reading data from Kafka and writing it back to Kafka.
When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level,
everything works fine.
Here's the relevant code:
KafkaSink<String> sink = KafkaSink.<String>builder()
. . .
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
. . .
.build();
Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I encounter
the following error during error handling (High Availability mode in k8s)::
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
at
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
. . .
Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
The code causing this issue is as follows
(org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer):
Object transactionManager = this.getTransactionManager();
synchronized(transactionManager) {
Object topicPartitionBookkeeper = getField(transactionManager,
"topicPartitionBookkeeper");
transitionTransactionManagerStateTo(transactionManager,
"INITIALIZING");
invoke(topicPartitionBookkeeper, "reset");
setField(transactionManager, "producerIdAndEpoch",
createProducerIdAndEpoch(producerId, epoch));
I am using Apache Kafka 1.17.1 and Apache Kafka Client
(org.apache.kafka:kafka-clients) 3.5.1.
I have examined the code of
org.apache.kafka.clients.producer.internals.TransactionManager, which is
used by org.apache.kafka.clients.producer.KafkaProducer.
I can see the producerIdAndEpoch field, but there is no
topicPartitionBookkeeper field.
Could you please advise which version of KafkaProducer is compatible with
the flink-connector-kafka? And am I missing something in my configuration?
Kind regards
Krzysztof