Hi Andrew, Thanks for the review and sorry for the late reply. I took some time to think through how a partition-level approach could be implemented, what benefits it might bring, and to run additional experiments in low-latency environments.
AS1 & AS2: Since both comments converge on the idea of introducing partition-level configuration, I'll address them together. 1. Partition-level configuration does not satisfy all use cases. While a partition-level in-flight limit offers finer-grained control per partition, it doesn't cover the case where a user wants to bound the total number of in-flight requests on a single broker connection. These are two distinct concerns: per-partition flow control vs. per-connection back-pressure. A partition-level configuration alone cannot replace the connection-level limit that max.in.flight.requests.per.connection currently provides. 2. Having two levels of in-flight limits increases both user-facing and implementation complexity. 3. A broker-level configuration benefits both high-latency and low-latency environments. I've added localhost benchmark results to the KIP. Even in a low-latency environment, setting max.in.flight.requests.per.connection=1 causes each request to queue at the producer level, resulting in significantly higher latency compared to allowing more in-flight requests. Thank you, PoAn > On Mar 2, 2026, at 7:11 AM, Andrew Schofield <[email protected]> wrote: > > Hi PoAn, > Thanks for your KIP. This is seems like a good area to improve, not just for > the high-latency connections between clients and brokers that you mentioned, > but also because diskless is introducing topics which have high write latency > too. > > AS1: In general, I'm nervous of having to set broker configurations based on > knowledge of the client latency. If you have an asymmetrical configuration > with a mixture of high and low latency clients, you end up having to > configure for the worst case. I'd prefer the client to behave differently in > the event that it is experiencing high latency, and also to be responsive to > the difference in latency for specific topics which have higher latency, > rather than to change the broker configuration for all clients. wdyt? > > AS2: If I understand the code correctly (and that's not guaranteed), > ProducerStateEntry.NUM_BATCHES_TO_RETAIN (5) is the number of batches per > producer ID per topic-partition that the broker can retain. The Java producer > client uses max.in-flight.batches.per.connection (also 5) to limit how many > requests it is prepared to have in flight, but this is at the level of the > entire connection. Would an alternative change be to switch the producer's > limit from a connection-level limit to a partition-level limit matching the > broker implementation? You could get a lot of in-flight requests by using > more partitions. The key is the amount of data in flight, not really the > number of batches. I may have misunderstood how this area works, but it > doesn't seem optimal. > > Thanks, > Andrew > > On 2026/02/28 12:21:10 PoAn Yang wrote: >> Hi Luke, >> >> Thanks for the review. >> >> 2 & 4. I add more background to Broker Configuration and >> Dynamic Capacity Discovery paragraphs. In the initial state, >> the producer can only send at most min(5, >> max.in.flight.requests.per.connection) requests, so it doesn’t >> break old brokers capacity. >> >> Thank you, >> PoAn >> >>> On Feb 27, 2026, at 4:27 PM, Luke Chen <[email protected]> wrote: >>> >>> Hi PoAn, >>> >>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause >>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove >>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. >>> >>> OK, I see. >>> >>>> Yes, if max.in.flight.requests.per.connection is larger than >>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. >>> That is why we have initial state to make sure the producer sends >>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. >>> Only if it finds a broker can retain more batches, it adjusts its >>> limitation. >>> >>> So, currently, when idempotent/transactional producer is enabled, we will >>> throw exception if the max.in.flight.requests.per.connection > 5. >>> When we allow users to configure the NUM_BATCHES_TO_RETAIN, the validation >>> will not be applied before sending the produce request. >>> And that's why we need the produce response to tell the producer what the >>> setting in the broker side is. >>> Could you make it more clear about this in the KIP? >>> >>> Also, if the max.in.flight.requests.per.connection is set to 100, >>> and NUM_BATCHES_TO_RETAIN is 5, then it means it's a little late when the >>> first producer response is received if we already allow producers to send >>> 100 requests in flight. If we want to adopt this solution, maybe we need to >>> let the producer begins from max.in.flight.requests.per.connection = 1 and >>> then adjust it to the expected value after the first producer response is >>> received. Does that make sense? >>> >>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, >>> if a broker works with old producers, it may waste memory. Old >>> producers can't send more in flight requests cause of ConfigException. >>> How about we still use 5 in 4.x and adjust to a larger value in 5.0? >>> >>> Sounds good to me. >>> >>> Thank you, >>> Luke >>> >>> >>> >>> On Thu, Feb 26, 2026 at 9:22 PM PoAn Yang <[email protected]> wrote: >>> >>>> Hi Luke, >>>> >>>> Thanks for the review and suggestions. >>>> >>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause >>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove >>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. >>>> >>>> 2. Agree, transactional producers are based on idempotent producers. >>>> Updated it. >>>> >>>> 3. >>>>> So, I'd like to know why we have to adjust the >>>>> `max.in.flight.requests.per.connection` value in the producer side? >>>> >>>> >>>> User doesn’t need to update max.in.flight.requests.per.connection in >>>> this case. The producer will automatically adjust internal limitation of >>>> in flight requests. >>>> >>>>> Using the example above, after this KIP, >>>>> the `max.in.flight.requests.per.connection=10` cannot be retained >>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? >>>> >>>> >>>> Yes, if max.in.flight.requests.per.connection is larger than >>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. >>>> That is why we have initial state to make sure the producer sends >>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. >>>> Only if it finds a broker can retain more batches, it adjusts its >>>> limitation. >>>> >>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, >>>> if a broker works with old producers, it may waste memory. Old >>>> producers can't send more in flight requests cause of ConfigException. >>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0? >>>> >>>> Thank you, >>>> PoAn >>>> >>>>> On Feb 25, 2026, at 9:07 PM, Luke Chen <[email protected]> wrote: >>>>> >>>>> Hi PoAn, >>>>> >>>>> Thanks for the KIP! >>>>> I agree the number of batches to retain should be configurable to improve >>>>> the throughput. >>>>> >>>>> Comments: >>>>> 1. Could you add the issue: KAFKA-18905 >>>>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the >>>>> motivation section? I think this is the issue we want to address, right? >>>>> >>>>> 2. > Introduce a new config on the broker, as the broker must know how >>>> much >>>>> memory to allocate. Operators can set a limitation on the broker side to >>>>> prevent malicious producers. This configuration only takes effect for >>>>> idempotent producers. >>>>> I think not only the idempotent producers, but also the >>>>> transactional producers, as long as they have the PID. >>>>> >>>>> 3. About the producer response update, I'm wondering if it is necessary? >>>>> Currently, when producer with `max.in.flight.requests.per.connection=10` >>>>> and NUM_BATCHES_TO_RETAIN=5, we won't adjust the producer config to 5. >>>>> Of course it is possible to the duplication cannot be detected, but that >>>>> might be user's choice to improve the throughput (though it might be >>>> rare). >>>>> So, I'd like to know why we have to adjust the >>>>> `max.in.flight.requests.per.connection` value in the producer side? >>>>> Using the example above, after this KIP, >>>>> the `max.in.flight.requests.per.connection=10` cannot be retained >>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? >>>>> >>>>> 4. The default value of `max.idempotence.batches.to.retain` >>>>> In the performance test you showed, it obviously shows >>>>> larger `max.idempotence.batches.to.retain` will get better throughput. >>>>> Also, the memory usage is small, do we have any reason we keep the >>>> default >>>>> value for 5? >>>>> >>>>> Thank you, >>>>> Luke >>>>> >>>>> >>>>> >>>>> On Sun, Feb 22, 2026 at 9:48 PM PoAn Yang <[email protected]> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I would like to start a discussion thread on KIP-1269. In this KIP, we >>>> aim >>>>>> to remove limitation of maximal number of batches to retain for a >>>>>> idempotent producer. In our test, it can improve throughput and reduce >>>>>> latency. >>>>>> >>>>>> https://cwiki.apache.org/confluence/x/loI8G >>>>>> >>>>>> Please take a look and feel free to share any thoughts. >>>>>> >>>>>> Thanks. >>>>>> PoAn >>>> >>>> >> >>
