Hi Luke, Thanks for the KIP. It looks like an interesting idea. I like the concept of dynamically adjusting settings to handle load. I wonder if other client settings could also benefit from a similar logic.
Just a couple of questions: - When under load, the producer may allocate extra buffers. Are these buffers ever released if the load drops? - Do we really need batch.initial.size? It's not clear that having this extra setting adds a lot of value. Thanks, Mickael On Tue, Oct 26, 2021 at 11:12 AM Luke Chen <show...@gmail.com> wrote: > > Thank you, Artem! > > @devs, welcome to vote for this KIP. > Key proposal: > 1. allocate multiple smaller initial batch size buffer in producer, and > list them together when expansion for better memory usage > 2. add a max batch size config in producer, so when producer rate is > suddenly high, we can still have high throughput with batch size larger > than "batch.size" (and less than "batch.max.size", where "batch.size" is > soft limit and "batch.max.size" is hard limit) > Here's the updated KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer > > And, any comments and feedback are welcome. > > Thank you. > Luke > > On Tue, Oct 26, 2021 at 6:35 AM Artem Livshits > <alivsh...@confluent.io.invalid> wrote: > > > Hi Luke, > > > > I've looked at the updated KIP-782, it looks good to me. > > > > -Artem > > > > On Sun, Oct 24, 2021 at 1:46 AM Luke Chen <show...@gmail.com> wrote: > > > > > Hi Artem, > > > Thanks for your good suggestion again. > > > I've combined your idea into this KIP, and updated it. > > > Note, in the end, I still keep the "batch.initial.size" config (default > > is > > > 0, which means "batch.size" will be initial batch size) for better memory > > > conservation. > > > > > > Detailed description can be found here: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer > > > > > > Let me know if you have other suggestions. > > > > > > Thank you. > > > Luke > > > > > > On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <show...@gmail.com> wrote: > > > > > >> Hi Artem, > > >> Thanks for the suggestion. Let me confirm my understanding is correct. > > >> So, what you suggest is that the "batch.size" is more like a "soft > > limit" > > >> batch size, and the "hard limit" is "batch.max.size". When reaching the > > >> batch.size of the buffer, it means the buffer is "ready" to be be sent. > > But > > >> before the linger.ms reached, if there are more data coming, we can > > >> still accumulate it into the same buffer, until it reached the > > >> "batch.max.size". After it reached the "batch.max.size", we'll create > > >> another batch for it. > > >> > > >> So after your suggestion, we won't need the "batch.initial.size", and we > > >> can use "batch.size" as the initial batch size. We list each > > "batch.size" > > >> together, until it reached "batch.max.size". Something like this: > > >> > > >> [image: image.png] > > >> Is my understanding correct? > > >> If so, that sounds good to me. > > >> If not, please kindly explain more to me. > > >> > > >> Thank you. > > >> Luke > > >> > > >> > > >> > > >> > > >> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits > > >> <alivsh...@confluent.io.invalid> wrote: > > >> > > >>> Hi Luke, > > >>> > > >>> Nice suggestion. It should optimize how memory is used with different > > >>> production rates, but I wonder if we can take this idea further and > > >>> improve > > >>> batching in general. > > >>> > > >>> Currently batch.size is used in two conditions: > > >>> > > >>> 1. When we append records to a batch in the accumulator, we create a > > new > > >>> batch if the current batch would exceed the batch.size. > > >>> 2. When we drain the batch from the accumulator, a batch becomes > > 'ready' > > >>> when it reaches batch.size. > > >>> > > >>> The second condition is good with the current batch size, because if > > >>> linger.ms is greater than 0, the send can be triggered by > > accomplishing > > >>> the > > >>> batching goal. > > >>> > > >>> The first condition, though, leads to creating many batches if the > > >>> network > > >>> latency or production rate (or both) is high, and with 5 in-flight and > > >>> 16KB > > >>> batches we can only have 80KB of data in-flight per partition. Which > > >>> means > > >>> that with 50ms latency, we can only push ~1.6MB/sec per partition (this > > >>> goes down if we consider higher latencies, e.g. with 100ms we can only > > >>> push > > >>> ~0.8MB/sec). > > >>> > > >>> I think it would be great to separate the two sizes: > > >>> > > >>> 1. When appending records to a batch, create a new batch if the current > > >>> exceeds a larger size (we can call it batch.max.size), say 256KB by > > >>> default. > > >>> 2. When we drain, consider batch 'ready' if it exceeds batch.size, > > which > > >>> is > > >>> 16KB by default. > > >>> > > >>> For memory conservation we may introduce batch.initial.size if we want > > to > > >>> have a flexibility to make it even smaller than batch.size, or we can > > >>> just > > >>> always use batch.size as the initial size (in which case we don't > > >>> need batch.initial.size config). > > >>> > > >>> -Artem > > >>> > > >>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <show...@gmail.com> wrote: > > >>> > > >>> > Hi Kafka dev, > > >>> > I'd like to start a vote for the proposal: KIP-782: Expandable batch > > >>> size > > >>> > in producer. > > >>> > > > >>> > The main purpose for this KIP is to have better memory usage in > > >>> producer, > > >>> > and also save users from the dilemma while setting the batch size > > >>> > configuration. After this KIP, users can set a higher batch.size > > >>> without > > >>> > worries, and of course, with an appropriate "batch.initial.size". > > >>> > > > >>> > Derailed description can be found here: > > >>> > > > >>> > > > >>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer > > >>> > > > >>> > Any comments and feedback are welcome. > > >>> > > > >>> > Thank you. > > >>> > Luke > > >>> > > > >>> > > >> > >