Hi Andrew, Thanks for the review and suggestions. I have updated the KIP accordingly. Here is a summary of the changes:
1. Term change max.idempotence.batches.to.retain -> producer.state.batches.to.retain MaxIdempotenceBatchesToRetain -> ProducerStateBatchesToRetain 2. Topic-level configuration The configuration has been changed from a broker-level config to a topic-level config with a server default. The ProducerStateBatchesToRetain field in ProduceResponse is now placed inside TopicProduceResponse rather than as a top-level tagged field, since different partitions on the same broker can have different values. 3. Two-level in-flight check To support per-topic deduplication window sizes, the producer now enforces two independent in-flight checks: Per-partition check (new): the number of in-flight batches to a specific partition must not exceed that partition's discovered ProducerStateBatchesToRetain limit. Per-connection check (existing): the total number of in-flight requests to a broker node must not exceed max.in.flight.requests.per.connection. Kind regards, PoAn > On Mar 9, 2026, at 8:43 PM, Andrew Schofield <[email protected]> wrote: > > Hi PoAn, > Thanks for your response. I'm going to try again :) > > AS2: ProducerStateEntry#NUM_BATCHES_TO_RETAIN is already a partition-level > concept. The new max.idempotence.batches.to.retain acts per-partition as far > as I understand. The fact that we have connection-level and partition-level > concepts colliding is weird to me. > > If I created 20 partitions and set max.in.flight.requests.per.connection to > 100, and also changed the producer to limit its in-flight requests at the > partition level, couldn't I get the best of both worlds? I could use the > usual scaling mechanism of adding partitions to get higher throughput and I > could continue to use 5 requests per partition. > > I don't mind adding a config to set the number of batches to retain, but I > think that's only half the problem. > > Thanks, > Andrew > > On 2026/03/08 06:33:35 PoAn Yang wrote: >> Hi Andrew, >> >> Thanks for the review and sorry for the late reply. I took some time to >> think through how a partition-level approach could be implemented, what >> benefits it might bring, and to run additional experiments in low-latency >> environments. >> >> AS1 & AS2: Since both comments converge on the idea of introducing >> partition-level configuration, I'll address them together. >> >> 1. Partition-level configuration does not satisfy all use cases. While a >> partition-level in-flight limit offers finer-grained control per partition, >> it doesn't cover the case where a user wants to bound the total number of >> in-flight requests on a single broker connection. These are two distinct >> concerns: per-partition flow control vs. per-connection back-pressure. A >> partition-level configuration alone cannot replace the connection-level >> limit that max.in.flight.requests.per.connection currently provides. >> >> 2. Having two levels of in-flight limits increases both user-facing and >> implementation complexity. >> >> 3. A broker-level configuration benefits both high-latency and low-latency >> environments. I've added localhost benchmark results to the KIP. Even in a >> low-latency environment, setting max.in.flight.requests.per.connection=1 >> causes each request to queue at the producer level, resulting in >> significantly higher latency compared to allowing more in-flight requests. >> >> Thank you, >> PoAn >> >>> On Mar 2, 2026, at 7:11 AM, Andrew Schofield <[email protected]> wrote: >>> >>> Hi PoAn, >>> Thanks for your KIP. This is seems like a good area to improve, not just >>> for the high-latency connections between clients and brokers that you >>> mentioned, but also because diskless is introducing topics which have high >>> write latency too. >>> >>> AS1: In general, I'm nervous of having to set broker configurations based >>> on knowledge of the client latency. If you have an asymmetrical >>> configuration with a mixture of high and low latency clients, you end up >>> having to configure for the worst case. I'd prefer the client to behave >>> differently in the event that it is experiencing high latency, and also to >>> be responsive to the difference in latency for specific topics which have >>> higher latency, rather than to change the broker configuration for all >>> clients. wdyt? >>> >>> AS2: If I understand the code correctly (and that's not guaranteed), >>> ProducerStateEntry.NUM_BATCHES_TO_RETAIN (5) is the number of batches per >>> producer ID per topic-partition that the broker can retain. The Java >>> producer client uses max.in-flight.batches.per.connection (also 5) to limit >>> how many requests it is prepared to have in flight, but this is at the >>> level of the entire connection. Would an alternative change be to switch >>> the producer's limit from a connection-level limit to a partition-level >>> limit matching the broker implementation? You could get a lot of in-flight >>> requests by using more partitions. The key is the amount of data in flight, >>> not really the number of batches. I may have misunderstood how this area >>> works, but it doesn't seem optimal. >>> >>> Thanks, >>> Andrew >>> >>> On 2026/02/28 12:21:10 PoAn Yang wrote: >>>> Hi Luke, >>>> >>>> Thanks for the review. >>>> >>>> 2 & 4. I add more background to Broker Configuration and >>>> Dynamic Capacity Discovery paragraphs. In the initial state, >>>> the producer can only send at most min(5, >>>> max.in.flight.requests.per.connection) requests, so it doesn’t >>>> break old brokers capacity. >>>> >>>> Thank you, >>>> PoAn >>>> >>>>> On Feb 27, 2026, at 4:27 PM, Luke Chen <[email protected]> wrote: >>>>> >>>>> Hi PoAn, >>>>> >>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause >>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove >>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. >>>>> >>>>> OK, I see. >>>>> >>>>>> Yes, if max.in.flight.requests.per.connection is larger than >>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. >>>>> That is why we have initial state to make sure the producer sends >>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. >>>>> Only if it finds a broker can retain more batches, it adjusts its >>>>> limitation. >>>>> >>>>> So, currently, when idempotent/transactional producer is enabled, we will >>>>> throw exception if the max.in.flight.requests.per.connection > 5. >>>>> When we allow users to configure the NUM_BATCHES_TO_RETAIN, the validation >>>>> will not be applied before sending the produce request. >>>>> And that's why we need the produce response to tell the producer what the >>>>> setting in the broker side is. >>>>> Could you make it more clear about this in the KIP? >>>>> >>>>> Also, if the max.in.flight.requests.per.connection is set to 100, >>>>> and NUM_BATCHES_TO_RETAIN is 5, then it means it's a little late when the >>>>> first producer response is received if we already allow producers to send >>>>> 100 requests in flight. If we want to adopt this solution, maybe we need >>>>> to >>>>> let the producer begins from max.in.flight.requests.per.connection = 1 and >>>>> then adjust it to the expected value after the first producer response is >>>>> received. Does that make sense? >>>>> >>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, >>>>> if a broker works with old producers, it may waste memory. Old >>>>> producers can't send more in flight requests cause of ConfigException. >>>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0? >>>>> >>>>> Sounds good to me. >>>>> >>>>> Thank you, >>>>> Luke >>>>> >>>>> >>>>> >>>>> On Thu, Feb 26, 2026 at 9:22 PM PoAn Yang <[email protected]> wrote: >>>>> >>>>>> Hi Luke, >>>>>> >>>>>> Thanks for the review and suggestions. >>>>>> >>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause >>>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove >>>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. >>>>>> >>>>>> 2. Agree, transactional producers are based on idempotent producers. >>>>>> Updated it. >>>>>> >>>>>> 3. >>>>>>> So, I'd like to know why we have to adjust the >>>>>>> `max.in.flight.requests.per.connection` value in the producer side? >>>>>> >>>>>> >>>>>> User doesn’t need to update max.in.flight.requests.per.connection in >>>>>> this case. The producer will automatically adjust internal limitation of >>>>>> in flight requests. >>>>>> >>>>>>> Using the example above, after this KIP, >>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained >>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? >>>>>> >>>>>> >>>>>> Yes, if max.in.flight.requests.per.connection is larger than >>>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. >>>>>> That is why we have initial state to make sure the producer sends >>>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. >>>>>> Only if it finds a broker can retain more batches, it adjusts its >>>>>> limitation. >>>>>> >>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, >>>>>> if a broker works with old producers, it may waste memory. Old >>>>>> producers can't send more in flight requests cause of ConfigException. >>>>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0? >>>>>> >>>>>> Thank you, >>>>>> PoAn >>>>>> >>>>>>> On Feb 25, 2026, at 9:07 PM, Luke Chen <[email protected]> wrote: >>>>>>> >>>>>>> Hi PoAn, >>>>>>> >>>>>>> Thanks for the KIP! >>>>>>> I agree the number of batches to retain should be configurable to >>>>>>> improve >>>>>>> the throughput. >>>>>>> >>>>>>> Comments: >>>>>>> 1. Could you add the issue: KAFKA-18905 >>>>>>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the >>>>>>> motivation section? I think this is the issue we want to address, right? >>>>>>> >>>>>>> 2. > Introduce a new config on the broker, as the broker must know how >>>>>> much >>>>>>> memory to allocate. Operators can set a limitation on the broker side to >>>>>>> prevent malicious producers. This configuration only takes effect for >>>>>>> idempotent producers. >>>>>>> I think not only the idempotent producers, but also the >>>>>>> transactional producers, as long as they have the PID. >>>>>>> >>>>>>> 3. About the producer response update, I'm wondering if it is necessary? >>>>>>> Currently, when producer with `max.in.flight.requests.per.connection=10` >>>>>>> and NUM_BATCHES_TO_RETAIN=5, we won't adjust the producer config to 5. >>>>>>> Of course it is possible to the duplication cannot be detected, but that >>>>>>> might be user's choice to improve the throughput (though it might be >>>>>> rare). >>>>>>> So, I'd like to know why we have to adjust the >>>>>>> `max.in.flight.requests.per.connection` value in the producer side? >>>>>>> Using the example above, after this KIP, >>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained >>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? >>>>>>> >>>>>>> 4. The default value of `max.idempotence.batches.to.retain` >>>>>>> In the performance test you showed, it obviously shows >>>>>>> larger `max.idempotence.batches.to.retain` will get better throughput. >>>>>>> Also, the memory usage is small, do we have any reason we keep the >>>>>> default >>>>>>> value for 5? >>>>>>> >>>>>>> Thank you, >>>>>>> Luke >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sun, Feb 22, 2026 at 9:48 PM PoAn Yang <[email protected]> wrote: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I would like to start a discussion thread on KIP-1269. In this KIP, we >>>>>> aim >>>>>>>> to remove limitation of maximal number of batches to retain for a >>>>>>>> idempotent producer. In our test, it can improve throughput and reduce >>>>>>>> latency. >>>>>>>> >>>>>>>> https://cwiki.apache.org/confluence/x/loI8G >>>>>>>> >>>>>>>> Please take a look and feel free to share any thoughts. >>>>>>>> >>>>>>>> Thanks. >>>>>>>> PoAn >>>>>> >>>>>> >>>> >>>> >> >>
