Hi Andrew,

Thanks for the review and sorry for the late reply. I took some time to think 
through how a partition-level approach could be implemented, what benefits it 
might bring, and to run additional experiments in low-latency environments.

AS1 & AS2: Since both comments converge on the idea of introducing 
partition-level configuration, I'll address them together.

1. Partition-level configuration does not satisfy all use cases. While a 
partition-level in-flight limit offers finer-grained control per partition, it 
doesn't cover the case where a user wants to bound the total number of 
in-flight requests on a single broker connection. These are two distinct 
concerns: per-partition flow control vs. per-connection back-pressure. A 
partition-level configuration alone cannot replace the connection-level limit 
that max.in.flight.requests.per.connection currently provides.

2. Having two levels of in-flight limits increases both user-facing and 
implementation complexity.

3. A broker-level configuration benefits both high-latency and low-latency 
environments. I've added localhost benchmark results to the KIP. Even in a 
low-latency environment, setting max.in.flight.requests.per.connection=1 causes 
each request to queue at the producer level, resulting in significantly higher 
latency compared to allowing more in-flight requests.

Thank you,
PoAn

> On Mar 2, 2026, at 7:11 AM, Andrew Schofield <[email protected]> wrote:
> 
> Hi PoAn,
> Thanks for your KIP. This is seems like a good area to improve, not just for 
> the high-latency connections between clients and brokers that you mentioned, 
> but also because diskless is introducing topics which have high write latency 
> too.
> 
> AS1: In general, I'm nervous of having to set broker configurations based on 
> knowledge of the client latency. If you have an asymmetrical configuration 
> with a mixture of high and low latency clients, you end up having to 
> configure for the worst case. I'd prefer the client to behave differently in 
> the event that it is experiencing high latency, and also to be responsive to 
> the difference in latency for specific topics which have higher latency, 
> rather than to change the broker configuration for all clients. wdyt?
> 
> AS2: If I understand the code correctly (and that's not guaranteed), 
> ProducerStateEntry.NUM_BATCHES_TO_RETAIN (5) is the number of batches per 
> producer ID per topic-partition that the broker can retain. The Java producer 
> client uses max.in-flight.batches.per.connection (also 5) to limit how many 
> requests it is prepared to have in flight, but this is at the level of the 
> entire connection. Would an alternative change be to switch the producer's 
> limit from a connection-level limit to a partition-level limit matching the 
> broker implementation? You could get a lot of in-flight requests by using 
> more partitions. The key is the amount of data in flight, not really the 
> number of batches. I may have misunderstood how this area works, but it 
> doesn't seem optimal.
> 
> Thanks,
> Andrew
> 
> On 2026/02/28 12:21:10 PoAn Yang wrote:
>> Hi Luke,
>> 
>> Thanks for the review.
>> 
>> 2 & 4. I add more background to Broker Configuration and
>> Dynamic Capacity Discovery paragraphs. In the initial state,
>> the producer can only send at most min(5,
>> max.in.flight.requests.per.connection) requests, so it doesn’t
>> break old brokers capacity.
>> 
>> Thank you,
>> PoAn
>> 
>>> On Feb 27, 2026, at 4:27 PM, Luke Chen <[email protected]> wrote:
>>> 
>>> Hi PoAn,
>>> 
>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause
>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove
>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related.
>>> 
>>> OK, I see.
>>> 
>>>> Yes, if max.in.flight.requests.per.connection is larger than
>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained.
>>> That is why we have initial state to make sure the producer sends
>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN.
>>> Only if it finds a broker can retain more batches, it adjusts its
>>> limitation.
>>> 
>>> So, currently, when idempotent/transactional producer is enabled, we will
>>> throw exception if the max.in.flight.requests.per.connection > 5.
>>> When we allow users to configure the NUM_BATCHES_TO_RETAIN, the validation
>>> will not be applied before sending the produce request.
>>> And that's why we need the produce response to tell the producer what the
>>> setting in the broker side is.
>>> Could you make it more clear about this in the KIP?
>>> 
>>> Also, if the max.in.flight.requests.per.connection is set to 100,
>>> and NUM_BATCHES_TO_RETAIN is 5, then it means it's a little late when the
>>> first producer response is received if we already allow producers to send
>>> 100 requests in flight. If we want to adopt this solution, maybe we need to
>>> let the producer begins from max.in.flight.requests.per.connection = 1 and
>>> then adjust it to the expected value after the first producer response is
>>> received. Does that make sense?
>>> 
>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However,
>>> if a broker works with old producers, it may waste memory. Old
>>> producers can't send more in flight requests cause of ConfigException.
>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0?
>>> 
>>> Sounds good to me.
>>> 
>>> Thank you,
>>> Luke
>>> 
>>> 
>>> 
>>> On Thu, Feb 26, 2026 at 9:22 PM PoAn Yang <[email protected]> wrote:
>>> 
>>>> Hi Luke,
>>>> 
>>>> Thanks for the review and suggestions.
>>>> 
>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause
>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove
>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related.
>>>> 
>>>> 2. Agree, transactional producers are based on idempotent producers.
>>>> Updated it.
>>>> 
>>>> 3.
>>>>> So, I'd like to know why we have to adjust the
>>>>> `max.in.flight.requests.per.connection` value in the producer side?
>>>> 
>>>> 
>>>> User doesn’t need to update max.in.flight.requests.per.connection in
>>>> this case. The producer will automatically adjust internal limitation of
>>>> in flight requests.
>>>> 
>>>>> Using the example above, after this KIP,
>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained
>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right?
>>>> 
>>>> 
>>>> Yes, if max.in.flight.requests.per.connection is larger than
>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained.
>>>> That is why we have initial state to make sure the producer sends
>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN.
>>>> Only if it finds a broker can retain more batches, it adjusts its
>>>> limitation.
>>>> 
>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However,
>>>> if a broker works with old producers, it may waste memory. Old
>>>> producers can't send more in flight requests cause of ConfigException.
>>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0?
>>>> 
>>>> Thank you,
>>>> PoAn
>>>> 
>>>>> On Feb 25, 2026, at 9:07 PM, Luke Chen <[email protected]> wrote:
>>>>> 
>>>>> Hi PoAn,
>>>>> 
>>>>> Thanks for the KIP!
>>>>> I agree the number of batches to retain should be configurable to improve
>>>>> the throughput.
>>>>> 
>>>>> Comments:
>>>>> 1. Could you add the issue: KAFKA-18905
>>>>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the
>>>>> motivation section? I think this is the issue we want to address, right?
>>>>> 
>>>>> 2. > Introduce a new config on the broker, as the broker must know how
>>>> much
>>>>> memory to allocate. Operators can set a limitation on the broker side to
>>>>> prevent malicious producers. This configuration only takes effect for
>>>>> idempotent producers.
>>>>> I think not only the idempotent producers, but also the
>>>>> transactional producers, as long as they have the PID.
>>>>> 
>>>>> 3. About the producer response update, I'm wondering if it is necessary?
>>>>> Currently, when producer with `max.in.flight.requests.per.connection=10`
>>>>> and NUM_BATCHES_TO_RETAIN=5, we won't adjust the producer config to 5.
>>>>> Of course it is possible to the duplication cannot be detected, but that
>>>>> might be user's choice to improve the throughput (though it might be
>>>> rare).
>>>>> So, I'd like to know why we have to adjust the
>>>>> `max.in.flight.requests.per.connection` value in the producer side?
>>>>> Using the example above, after this KIP,
>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained
>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right?
>>>>> 
>>>>> 4. The default value of `max.idempotence.batches.to.retain`
>>>>> In the performance test you showed, it obviously shows
>>>>> larger `max.idempotence.batches.to.retain` will get better throughput.
>>>>> Also, the memory usage is small, do we have any reason we keep the
>>>> default
>>>>> value for 5?
>>>>> 
>>>>> Thank you,
>>>>> Luke
>>>>> 
>>>>> 
>>>>> 
>>>>> On Sun, Feb 22, 2026 at 9:48 PM PoAn Yang <[email protected]> wrote:
>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> I would like to start a discussion thread on KIP-1269. In this KIP, we
>>>> aim
>>>>>> to remove limitation of maximal number of batches to retain for a
>>>>>> idempotent producer. In our test, it can improve throughput and reduce
>>>>>> latency.
>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/x/loI8G
>>>>>> 
>>>>>> Please take a look and feel free to share any thoughts.
>>>>>> 
>>>>>> Thanks.
>>>>>> PoAn
>>>> 
>>>> 
>> 
>> 

Reply via email to