Hi, Krzysztof.

I find that this part has been changed in PR[1] when updating the kafka
client version to 3.4.0.
This fix is not released yet. Maybe you can package and check it by
yourself.

Best,
Hang

[1] https://github.com/apache/flink-connector-kafka/pull/11

Krzysztof Jankiewicz <jankiewicz.krzysz...@gmail.com> 于2023年9月10日周日 21:52写道:

> Hi,
>
> I am currently working on a simple application that requires exactly-once
> end-to-end guarantee.
>
> I am reading data from Kafka and writing it back to Kafka.
>
> When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level,
> everything works fine.
> Here's the relevant code:
>
> KafkaSink<String> sink = KafkaSink.<String>builder()
>     . . .
>     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>     . . .
>     .build();
>
> Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I
> encounter the following error during error handling (High Availability mode
> in k8s)::
>
> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
>   at
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
> . . .
> Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
>
> The code causing this issue is as follows
> (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer):
>
> Object transactionManager = this.getTransactionManager();
>         synchronized(transactionManager) {
>             Object topicPartitionBookkeeper = getField(transactionManager,
> "topicPartitionBookkeeper");
>             transitionTransactionManagerStateTo(transactionManager,
> "INITIALIZING");
>             invoke(topicPartitionBookkeeper, "reset");
>             setField(transactionManager, "producerIdAndEpoch",
> createProducerIdAndEpoch(producerId, epoch));
>
> I am using Apache Kafka 1.17.1 and Apache Kafka Client
> (org.apache.kafka:kafka-clients) 3.5.1.
> I have examined the code of
> org.apache.kafka.clients.producer.internals.TransactionManager, which is
> used by org.apache.kafka.clients.producer.KafkaProducer.
> I can see the producerIdAndEpoch field, but there is no
> topicPartitionBookkeeper field.
>
> Could you please advise which version of KafkaProducer is compatible with
> the flink-connector-kafka? And am I missing something in my configuration?
>
> Kind regards
> Krzysztof
>

Reply via email to