Hi Richard,
Thanks for replying.

> but I close the KafkaProducer inside the send
> callback.
> ...
>  Combined with idempotence enabled
> and max inflight set to 5 (the maximum for idempotence tracking) it gave
me
> relatively good performance.

Yes, I also find that closing the KafkaProducer inside the send callback
can prevent more extra records from being sent. But after some
investigation into the source code of KafkaProducer and Sender, I think
closing kafka producer in callback is not 100% reliable in such cases. For
example, If you set max.in.flight.requests.per.connection to 5, and you
sent 5 batches 1, 2, 3, 4, 5, say batch No.2 failed will exception in the
callback and you initiated kafka producer closing inside callback, but
batch No.3 might already in flight which still might be sent to the broker.
Even though I haven't observed such results during my experiments, I am
still not sure this is reliable since kafka's official documentation has no
guarantee about this behaviour.

In the source code of KafkaProducer and Sender, only when
max.in.flight.requests.per.connection set to 1 will the
"guaranteeMessageOrder" property set to true thus ensuring only one request
will be in flight per partition.
kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
at master · a0x8o/kafka
<https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L128>
kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
at master · a0x8o/kafka
<https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L538>

Do you have any thoughts?

Thanks and regards,
William Lee

Richard Bosch <richard.bo...@axual.com> 于2024年3月13日周三 16:38写道:

> Hi WIlliam,
>
> I see from your example that you close the kafka producer in the send
> loop, based on the content of sendException that is used in the callback of
> the KafkaProducer send.
> Since your send loop is a different thread than the KafkaProducer uses to
> send you will encounter race conditions on this close logic.
>
> I actually had a similar requirement as yours and solved it by using a
> sendException like you do, but I close the KafkaProducer inside the send
> callback. The send callback is executed as part of the produce thread, and
> closing the consumer there will stop all subsequent batches of processing,
> as the current batch isn't finished yet. Combined with idempotence enabled
> and max inflight set to 5 (the maximum for idempotence tracking) it gave me
> relatively good performance.
>
> Kind regards,
>
>
> Richard Bosch
>
> Developer Advocate
>
> Axual BV
>
> https://axual.com/
>

Reply via email to