Hi Krzysztof,

The bundled Flink Kafka connector for 1.17 uses Kafka 3.2.3, see
https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-kafka/pom.xml#L38
That's also the case for the externalized Flink Kafka connector v3.0,
see https://github.com/apache/flink-connector-kafka/blob/v3.0/pom.xml#L53
The externalized Flink Kafka connector has been upgraded to use 3.4.0,
but that hasn't been released yet. See
https://github.com/apache/flink-connector-kafka/blob/main/pom.xml#L53
for the details.

Best regards,

Martijn

On Tue, Sep 12, 2023 at 7:36 AM Krzysztof Jankiewicz
<jankiewicz.krzysz...@gmail.com> wrote:
>
> Hi, Hang.
>
> There have been a lot of changes made to the TransactionsManager in the Kafka 
> client in 2022.
> (e.g. 
> https://github.com/apache/kafka/commit/3ea7b418fb3d7e9fc74c27751c1b02b04877f197).
>
> Version 3.2.3 was the last one when the TransactionsManager class contained 
> attributes (e.g., topicPartitionBookkeeper) referenced by 
> flink-connector-kafka (1.17.1).
>
> Thanks once again.
> Krzysztof
>
> pon., 11 wrz 2023 o 11:24 Hang Ruan <ruanhang1...@gmail.com> napisał(a):
>>
>> Hi, Krzysztof.
>>
>> I find that this part has been changed in PR[1] when updating the kafka 
>> client version to 3.4.0.
>> This fix is not released yet. Maybe you can package and check it by yourself.
>>
>> Best,
>> Hang
>>
>> [1] https://github.com/apache/flink-connector-kafka/pull/11
>>
>> Krzysztof Jankiewicz <jankiewicz.krzysz...@gmail.com> 于2023年9月10日周日 21:52写道:
>>>
>>> Hi,
>>>
>>> I am currently working on a simple application that requires exactly-once 
>>> end-to-end guarantee.
>>>
>>> I am reading data from Kafka and writing it back to Kafka.
>>>
>>> When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level, 
>>> everything works fine.
>>> Here's the relevant code:
>>>
>>> KafkaSink<String> sink = KafkaSink.<String>builder()
>>>     . . .
>>>     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>>     . . .
>>>     .build();
>>>
>>> Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I encounter 
>>> the following error during error handling (High Availability mode in k8s)::
>>>
>>> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
>>>   at 
>>> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
>>>  ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>>> . . .
>>> Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
>>>
>>> The code causing this issue is as follows 
>>> (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer):
>>>
>>> Object transactionManager = this.getTransactionManager();
>>>         synchronized(transactionManager) {
>>>             Object topicPartitionBookkeeper = getField(transactionManager, 
>>> "topicPartitionBookkeeper");
>>>             transitionTransactionManagerStateTo(transactionManager, 
>>> "INITIALIZING");
>>>             invoke(topicPartitionBookkeeper, "reset");
>>>             setField(transactionManager, "producerIdAndEpoch", 
>>> createProducerIdAndEpoch(producerId, epoch));
>>>
>>> I am using Apache Kafka 1.17.1 and Apache Kafka Client 
>>> (org.apache.kafka:kafka-clients) 3.5.1.
>>> I have examined the code of 
>>> org.apache.kafka.clients.producer.internals.TransactionManager, which is 
>>> used by org.apache.kafka.clients.producer.KafkaProducer.
>>> I can see the producerIdAndEpoch field, but there is no 
>>> topicPartitionBookkeeper field.
>>>
>>> Could you please advise which version of KafkaProducer is compatible with 
>>> the flink-connector-kafka? And am I missing something in my configuration?
>>>
>>> Kind regards
>>> Krzysztof

Reply via email to