Hi Jun, JR1: The example's point was about the case where flow remains under the batch limit (those are the cases where we would get significant memory improvement/differences). But I do get your point and agree: in scenarios where the full batch is used, the dynamic strategy would end up using the same amount of memory. Still, in those cases the value comes from the predictability/tuning of the buffer.memory (memory consumption depends on known factors, not workload-dependant ones). I clarified the first example, and added a second one to showcase the case where it's not about memory gains but about predictability.
JR2: The main concern with keeping the same close-and-block as trunk in this case was the change it would bring into the send() blocking pattern. On trunk, send only blocks for memory for the first record of a batch, but never mid-batch. Applying this close-and-block to the dynamic strategy would change this (send() could block on any record regardless of an open batch). I leaned initially toward avoiding changing the blocking behaviour (and pay the extra direct allocation with visibility), but on second thoughts I agree it's cleaner to surface the situation to the API (blocking on send, aligned with what trunk does on new batch only, and dynamic would do at the record level). It's no change to the send or max.bloc.ms contract really, just a different pattern that seems sensible given the "on-demand" allocation. I updated the KIP with this, and left a rejected alternative for the record. Also, with this I opted for dropping the new metric I had (which was mainly to have visbility over this new direct-allocation path, now removed) Hi TengYao: TYC1: interesting point, agree that your suggested metric would give visibility on what's actually allocated from the pool (which is dynamic now, didn't make too much sense before because it was "static", ~batch.size). I believe that for some of the scenarios you shared, we would be covered with the metrics that already exist in trunk (e.g., bufferpool-wait-*, buffer-available/total-bytes, batch-size-avg), still, it's a fact that the new strategy allocates differently from the pool, dynamically, and only a metric like you suggest would let us see how that goes (batch-size-avg is the closest but is post-compression so not the same). I just wonder if it would make more sense to represent it in bytes, rather than in chunks?? (e.g, "batch-pool-bytes-avg"). It would align better with existing metrics in this space, all in bytes. Also I expect operators probably think in bytes (not a new "chunk" concept, which is just an internal implementation grouping bytes), and maybe better not to expose chunk as a unit of measure to make sure the metric ages well even if the internal chunk details move). What do you think? Will wait to hear back and align before updating the KIP Thanks both! Cheers, Lianet On Wed, May 13, 2026 at 4:40 PM Jun Rao via dev <[email protected]> wrote: > Hi, Lianet, > > Thanks for the reply. > > JR1. "As an example: a producer writing 10 MiB/s of aggregate throughput to > a 1000-partition topic with RoundRobinPartitioner struggles to achieve a > meaningful fraction of that at the default 16384 bytes "batch.size". Each > partition only sends 16384 bytes at a time over a high-latency link, so > per-partition throughput is bounded by "16384 bytes / RTT". Increasing > "batch.size" to 4 MiB unblocks throughput but the producer would need 4 MiB > × 1000 partitions = 4 GiB of pool memory to accommodate all partitions > simultaneously (regardless of actual volume of data flowing per > partition)." > This example does not seem strong. In this case, the producer still > requires 4GB of memory even with the proposed KIP to achieve high > throughput because all 1000 partitions are active. > > JR2. "When a new record arrives mid-batch and the pool is exhausted, it > will perform direct heap allocation to allocate all the chunks estimated > needed for the record uncompressed size." > Why do we need to introduce this new case for direct allocation? This case > exists in the static allocation approach. If the buffer pool is exhausted, > the send() call blocks but all pending batches become drainable to prevent > deadlock. Is there any issue with using the same mechanism for dynamic > allocation? > > Jun > > > On Wed, May 13, 2026 at 8:53 AM Lianet Magrans <[email protected]> wrote: > > > Hi Jun, > > > > JR1: Agreed, I updated the motivation section to clarify the different > > scenarios based on keys and partitioner, and under which situations it > > becomes problematic. > > > > JR2: The KIP preserves the 2 existing direct allocation triggers you > > mentioned (compressed data exceeding allocation and batch split), and > also > > introduces a new one (on new record mid-batch when pool exhausted, > > basically due to the per-record reservation approach). To mitigate, > direct > > allocation is limited to one record's worth of growth per batch (batch > > closed right after it), and we're also introducing the new metric to have > > visblity and allow to tune buffer.memory. Under normal pool conditions, > > direct allocations with the new strategy should happen less often than > with > > the current behaviour, mainly because of the proposed improvement to try > > the pool first, non-blocking before falling back to heap allocation. I > > clarified it all in the Internal allocation strategy section (extending > on > > new sections "Blocking behaviour" and "Direct heap allocation"). Please > > take a look and let me know. > > > > Thanks for the review! > > Lianet > > > > PS: addressing TengYao's feedback shortly, thanks! > > > > On Tue, May 12, 2026 at 11:31 AM TengYao Chi <[email protected]> > > wrote: > > > > > Hi Lianet, > > > > > > Thanks for this great KIP. > > > > > > TYC1. I have one consideration regarding observability: Do we need a > new > > > metric for average-chunks-per-batch? With the introduction of the > > > chunked-buffer strategy, memory usage per partition is no longer a > fixed > > > batch.size. While this significantly improves memory efficiency, it > might > > > be beneficial for operators to understand the actual "chunk > utilization" > > or > > > fragmentation under different workloads. Specifically, I think this > > metric > > > would be valuable when combined with the proposed bufferpool-overflow > > > metrics: it would help operators distinguish whether memory pressure is > > > being driven by a large number of active partitions (many small > batches) > > or > > > by individual batches becoming unexpectedly large (many chunks per > batch, > > > perhaps due to large records or low compression ratios). What do you > > think? > > > > > > Best, > > > TengYao Chi > > > > > > On 2026/05/11 23:03:53 Jun Rao via dev wrote: > > > > Hi, Lianet, > > > > > > > > Thanks for the KIP. > > > > > > > > JR1. It would be useful to provide a bit more motivation for the KIP. > > The > > > > batches allocated from the buffer pool are proportional to the number > > of > > > > active partitions. For publishing records without keys, the active > > > > partition is 1 by default, independent of the number of partitions > in a > > > > topic. It's only when publishing records with keys that the active > > > > partition can be the total number of partitions in a topic. So, a > > > possible > > > > scenario is that a client publishes records without keys to one topic > > > while > > > > publishing records with keys to another. > > > > > > > > JR2. "Following records appended to the batch do not block or throw. > > They > > > > attempt non-blocking pool allocation and fall back to direct heap if > > the > > > > pool is exhausted. > > > > Ensures not blocking on pool memory while already holding some for a > > > batch". > > > > > > > > Currently, the producer only allocates memory exceeding the > configured > > > > buffer pool size in two cases. > > > > (1) Compressed data exceeding the estimated size > > > > (2) When a batch is too large for the broker's max.message.bytes and > > gets > > > > split, each sub-batch is allocated via > ByteBuffer.allocate(initialSize) > > > > directly. > > > > > > > > With the KIP, are we introducing new cases in addition to the above > > two? > > > > > > > > Jun > > > > > > > > > > > > > > > > On Fri, May 1, 2026 at 6:03 AM Lianet Magrans <[email protected]> > > > wrote: > > > > > > > > > Thanks for the feedback Jaisen! I like your proposed "static" for > the > > > > > current behaviour, it aligns nicely. All updated. > > > > > > > > > > Best! > > > > > Lianet > > > > > > > > > > On Thu, Apr 30, 2026 at 4:27 PM Jaisen Mathai via dev < > > > > > [email protected]> > > > > > wrote: > > > > > > > > > > > Thanks Lianet. > > > > > > > > > > > > I like the proposal. > > > > > > > > > > > > I suggest a descriptive name such as static or fixed instead of > > > legacy > > > > > for > > > > > > the default configuration value. I think these will age better > > while > > > > > still > > > > > > communicating that users should strongly consider using the > > > non-default > > > > > > value of dynamic. > > > > > > > > > > > > Jaisen > > > > > > > > > > > > On Thu, Apr 30, 2026 at 8:02 AM Lianet Magrans < > [email protected] > > > > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I would like to start a discussion on KIP-1332 that proposes a > > > dynamic > > > > > > > memory allocation strategy for the Kafka producer, to unlock > > > > > high-latency > > > > > > > scenarios increasingly common as Kafka moves toward object > > storage. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1332*3A*Dynamic*memory*allocation*for*the*Kafka*producer__;JSsrKysrKys!!Ayb5sqE7!t4yI-C5BwMxJ6dMJC7tuQhu94KuolbgKXyEnl4GChJGLYY2eS4NXk-GZYlnVPnuw3ESrGwKjyPDr5Bjp0Gk$ > > > > > > > > > > > > > > Thanks! > > > > > > > Lianet > > > > > > > > > > > > > > > > > > > > > > > > > > > >
