Hi Richard, Thanks for replying. > but I close the KafkaProducer inside the send > callback. > ... > Combined with idempotence enabled > and max inflight set to 5 (the maximum for idempotence tracking) it gave me > relatively good performance.
Yes, I also find that closing the KafkaProducer inside the send callback can prevent more extra records from being sent. But after some investigation into the source code of KafkaProducer and Sender, I think closing kafka producer in callback is not 100% reliable in such cases. For example, If you set max.in.flight.requests.per.connection to 5, and you sent 5 batches 1, 2, 3, 4, 5, say batch No.2 failed will exception in the callback and you initiated kafka producer closing inside callback, but batch No.3 might already in flight which still might be sent to the broker. Even though I haven't observed such results during my experiments, I am still not sure this is reliable since kafka's official documentation has no guarantee about this behaviour. In the source code of KafkaProducer and Sender, only when max.in.flight.requests.per.connection set to 1 will the "guaranteeMessageOrder" property set to true thus ensuring only one request will be in flight per partition. kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java at master · a0x8o/kafka <https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L128> kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java at master · a0x8o/kafka <https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L538> Do you have any thoughts? Thanks and regards, William Lee Richard Bosch <richard.bo...@axual.com> 于2024年3月13日周三 16:38写道: > Hi WIlliam, > > I see from your example that you close the kafka producer in the send > loop, based on the content of sendException that is used in the callback of > the KafkaProducer send. > Since your send loop is a different thread than the KafkaProducer uses to > send you will encounter race conditions on this close logic. > > I actually had a similar requirement as yours and solved it by using a > sendException like you do, but I close the KafkaProducer inside the send > callback. The send callback is executed as part of the produce thread, and > closing the consumer there will stop all subsequent batches of processing, > as the current batch isn't finished yet. Combined with idempotence enabled > and max inflight set to 5 (the maximum for idempotence tracking) it gave me > relatively good performance. > > Kind regards, > > > Richard Bosch > > Developer Advocate > > Axual BV > > https://axual.com/ >