Hi Luke, I don't mind increasing the max.request.size to a higher number, e.g. 2MB could be good. I think we should also run some benchmarks to see the effects of different sizes.
I agree that changing round robin to random solves an independent existing issue, however the logic in this KIP exacerbates the issue, so there is some dependency. -Artem On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <show...@gmail.com> wrote: > Hi Artem, > Yes, I agree if we go with random selection instead of round-robin > selection, the latency issue will be more fair. That is, if there are 10 > partitions, the 10th partition will always be the last choice in each round > in current design, but with random selection, the chance to be selected is > more fair. > > However, I think that's kind of out of scope with this KIP. This is an > existing issue, and it might need further discussion to decide if this > change is necessary. > > I agree the default 32KB for "batch.max.size" might be not huge improvement > compared with 256KB. I'm thinking, maybe default to "64KB" for > "batch.max.size", and make the documentation clear that if the > "batch.max.size" > is increased, there might be chances that the "ready" partitions need to > wait for next request to send to broker, because of the "max.request.size" > (default 1MB) limitation. "max.request.size" can also be considered to > increase to avoid this issue. What do you think? > > Thank you. > Luke > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits > <alivsh...@confluent.io.invalid> wrote: > > > > maybe I can firstly decrease the "batch.max.size" to 32KB > > > > I think 32KB is too small. With 5 in-flight and 100ms latency we can > > produce 1.6MB/s per partition. With 256KB we can produce 12.8MB/s per > > partition. We should probably set up some testing and see if 256KB has > > problems. > > > > To illustrate latency dynamics, let's consider a simplified model: 1 > > in-flight request per broker, produce latency 125ms, 256KB max request > > size, 16 partitions assigned to the same broker, every second 128KB is > > produced to each partition (total production rate is 2MB/sec). > > > > If the batch size is 16KB, then the pattern would be the following: > > > > 0ms - produce 128KB into each partition > > 0ms - take 16KB from each partition send (total 256KB) > > 125ms - complete first 16KB from each partition, send next 16KB > > 250ms - complete second 16KB, send next 16KB > > ... > > 1000ms - complete 8th 16KB from each partition > > > > from this model it's easy to see that there are 256KB that are sent > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in > > 875ms. > > > > If the batch size is 256KB, then the pattern would be the following: > > > > 0ms - produce 128KB into each partition > > 0ms - take 128KB each from first 2 partitions and send (total 256KB) > > 125ms - complete 2 first partitions, send data from next 2 partitions > > ... > > 1000ms - complete last 2 partitions > > > > even though the pattern is different, there are still 256KB that are sent > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in > > 875ms. > > > > Now, in this example if we do strictly round-robin (current > implementation) > > and we have this exact pattern (not sure how often such regular pattern > > would happen in practice -- I would expect that it would be a bit more > > random), some partitions would experience higher latency than others (not > > sure how much it would matter in practice -- in the end of the day some > > bytes produced to a topic would have higher latency and some bytes would > > have lower latency). This pattern is easily fixed by choosing the next > > partition randomly instead of using round-robin. > > > > -Artem > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <show...@gmail.com> wrote: > > > > > Hi Tom, > > > Thanks for your comments. And thanks for Artem's explanation. > > > Below is my response: > > > > > > > Currently because buffers are allocated using batch.size it means we > > can > > > handle records that are that large (e.g. one big record per batch). > > Doesn't > > > the introduction of smaller buffer sizes (batch.initial.size) mean a > > > corresponding decrease in the maximum record size that the producer can > > > handle? > > > > > > Actually, the "batch.size" is only like a threshold to decide if the > > batch > > > is "ready to be sent". That is, even if you set the "batch.size=16KB" > > > (default value), users can still send one record sized with 20KB, as > long > > > as the size is less than "max.request.size" in producer (default 1MB). > > > Therefore, the introduction of "batch.initial.size" won't decrease the > > > maximum record size that the producer can handle. > > > > > > > But isn't there the risk that drainBatchesForOneNode would end up not > > > sending ready > > > batches well past when they ought to be sent (according to their > > linger.ms > > > ), > > > because it's sending buffers for earlier partitions too aggressively? > > > > > > Did you mean that we have a "max.request.size" per request (default is > > > 1MB), and before this KIP, the request can include 64 batches in single > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be able to > > > include 32 batches or less, because we aggressively sent more records > in > > > one batch, is that what you meant? That's a really good point that I've > > > never thought about. I think your suggestion to go through other > > partitions > > > that just fit "batch.size", or expire "linger.ms" first, before > handling > > > the one that is > "batch.size" limit is not a good way, because it > might > > > cause the one with size > "batch.size" always in the lowest priority, > and > > > cause starving issue that the batch won't have chance to get sent. > > > > > > I don't have better solution for it, but maybe I can firstly decrease > the > > > "batch.max.size" to 32KB, instead of aggressively 256KB in the KIP. > That > > > should alleviate the problem. And still improve the throughput. What do > > you > > > think? > > > > > > Thank you. > > > Luke > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > I think this KIP would change the behaviour of producers when there > > are > > > > multiple partitions ready to be sent > > > > > > > > This is correct, the pattern changes and becomes more coarse-grained. > > > But > > > > I don't think it changes fairness over the long run. I think it's a > > good > > > > idea to change drainIndex to be random rather than round robin to > avoid > > > > forming patterns where some partitions would consistently get higher > > > > latencies than others because they wait longer for their turn. > > > > > > > > If we really wanted to preserve the exact patterns, we could either > try > > > to > > > > support multiple 16KB batches from one partition per request > (probably > > > > would require protocol change to change logic on the broker for > > duplicate > > > > detection) or try to re-batch 16KB batches from accumulator into > larger > > > > batches during send (additional computations) or try to consider all > > > > partitions assigned to a broker to check if a new batch needs to be > > > created > > > > (i.e. compare cumulative batch size from all partitions assigned to a > > > > broker and create new batch when cumulative size is 1MB, more > complex). > > > > > > > > Overall, it seems like just increasing the max batch size is a > simpler > > > > solution and it does favor larger batch sizes, which is beneficial > not > > > just > > > > for production. > > > > > > > > > ready batches well past when they ought to be sent (according to > > their > > > > linger.ms) > > > > > > > > The trigger for marking batches ready to be sent isn't changed - a > > batch > > > is > > > > ready to be sent once it reaches 16KB, so by the time larger batches > > > start > > > > forming, linger.ms wouldn't matter much because the batching goal is > > met > > > > and the batch can be sent immediately. Larger batches start forming > > once > > > > the client starts waiting for the server, in which case some data > will > > > wait > > > > its turn to be sent. This will happen for some data regardless of > how > > we > > > > pick data to send, the question is just whether we'd have some > > scenarios > > > > where some partitions would consistently experience higher latency > than > > > > others. I think picking drainIndex randomly would prevent such > > > scenarios. > > > > > > > > -Artem > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <tbent...@redhat.com> > > wrote: > > > > > > > > > Hi Luke, > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > Currently because buffers are allocated using batch.size it means > we > > > can > > > > > handle records that are that large (e.g. one big record per batch). > > > > Doesn't > > > > > the introduction of smaller buffer sizes (batch.initial.size) mean > a > > > > > corresponding decrease in the maximum record size that the producer > > can > > > > > handle? That might not be a problem if the user knows their maximum > > > > record > > > > > size and has tuned batch.initial.size accordingly, but if the > default > > > for > > > > > batch.initial.size < batch.size it could cause regressions for > > existing > > > > > users with a large record size, I think. It should be enough for > > > > > batch.initial.size to default to batch.size, allowing users who > care > > > > about > > > > > the memory saving in the off-peak throughput case to do the tuning, > > but > > > > not > > > > > causing a regression for existing users. > > > > > > > > > > I think this KIP would change the behaviour of producers when there > > are > > > > > multiple partitions ready to be sent: By sending all the ready > > buffers > > > > > (which may now be > batch.size) for the first partition, we could > end > > > up > > > > > excluding ready buffers for other partitions from the current send. > > In > > > > > other words, as I understand the KIP currently, there's a change in > > > > > fairness. I think the code in > > RecordAccumulator#drainBatchesForOneNode > > > > will > > > > > ensure fairness in the long run, because the drainIndex will ensure > > > that > > > > > those other partitions each get their turn at being the first. But > > > isn't > > > > > there the risk that drainBatchesForOneNode would end up not sending > > > ready > > > > > batches well past when they ought to be sent (according to their > > > > linger.ms > > > > > ), > > > > > because it's sending buffers for earlier partitions too > aggressively? > > > Or, > > > > > to put it another way, perhaps the RecordAccumulator should > > round-robin > > > > the > > > > > ready buffers for _all_ the partitions before trying to fill the > > > > remaining > > > > > space with the extra buffers (beyond the batch.size limit) for the > > > first > > > > > partitions? > > > > > > > > > > Kind regards, > > > > > > > > > > Tom > > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <show...@gmail.com> > wrote: > > > > > > > > > > > Hi Ismael and all devs, > > > > > > Is there any comments/suggestions to this KIP? > > > > > > If no, I'm going to update the KIP based on my previous mail, and > > > > start a > > > > > > vote tomorrow or next week. > > > > > > > > > > > > Thank you. > > > > > > Luke > > > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <show...@gmail.com> > > wrote: > > > > > > > > > > > > > Hi Ismael, > > > > > > > Thanks for your comments. > > > > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We can keep a list > of > > > > > buffers > > > > > > > instead and avoid reallocation. > > > > > > > -> Do you mean we allocate multiple buffers with > > > > "buffer.initial.size", > > > > > > > and link them together (with linked list)? > > > > > > > ex: > > > > > > > a. We allocate 4KB initial buffer > > > > > > > | 4KB | > > > > > > > > > > > > > > b. when new records reached and the remaining buffer is not > > enough > > > > for > > > > > > the > > > > > > > records, we create another batch with "batch.initial.size" > buffer > > > > > > > ex: we already have 3KB of data in the 1st buffer, and here > comes > > > the > > > > > 2KB > > > > > > > record > > > > > > > > > > > > > > | 4KB (1KB remaining) | > > > > > > > now, record: 2KB coming > > > > > > > We fill the 1st 1KB into 1st buffer, and create new buffer, and > > > > linked > > > > > > > together, and fill the rest of data into it > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) | > > > > > > > > > > > > > > Is that what you mean? > > > > > > > If so, I think I like this idea! > > > > > > > If not, please explain more detail about it. > > > > > > > Thank you. > > > > > > > > > > > > > > 2. I think we should also consider tweaking the semantics of > > > > batch.size > > > > > > so > > > > > > > that the sent batches can be larger if the batch is not ready > to > > be > > > > > sent > > > > > > > (while still respecting max.request.size and perhaps a new > > > > > > max.batch.size). > > > > > > > > > > > > > > --> In the KIP, I was trying to make the "batch.size" as the > > upper > > > > > bound > > > > > > > of the batch size, and introduce a "batch.initial.size" as > > initial > > > > > batch > > > > > > > size. > > > > > > > So are you saying that we can let "batch.size" as initial batch > > > size > > > > > and > > > > > > > introduce a "max.batch.size" as upper bound value? > > > > > > > That's a good suggestion, but that would change the semantics > of > > > > > > > "batch.size", which might surprise some users. I think my > > original > > > > > > proposal > > > > > > > ("batch.initial.size") is safer for users. What do you think? > > > > > > > > > > > > > > Thank you. > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <ism...@juma.me.uk > > > > > > wrote: > > > > > > > > > > > > > >> I think we should also consider tweaking the semantics of > > > batch.size > > > > > so > > > > > > >> that the sent batches can be larger if the batch is not ready > to > > > be > > > > > sent > > > > > > >> (while still respecting max.request.size and perhaps a new > > > > > > >> max.batch.size). > > > > > > >> > > > > > > >> Ismael > > > > > > >> > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <ism...@juma.me.uk > > > > > > wrote: > > > > > > >> > > > > > > >> > Hi Luke, > > > > > > >> > > > > > > > >> > Thanks for the KIP. Why do we have to reallocate the buffer? > > We > > > > can > > > > > > >> keep a > > > > > > >> > list of buffers instead and avoid reallocation. > > > > > > >> > > > > > > > >> > Ismael > > > > > > >> > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <show...@gmail.com> > > > > wrote: > > > > > > >> > > > > > > > >> >> Hi Kafka dev, > > > > > > >> >> I'd like to start the discussion for the proposal: KIP-782: > > > > > > Expandable > > > > > > >> >> batch size in producer. > > > > > > >> >> > > > > > > >> >> The main purpose for this KIP is to have better memory > usage > > in > > > > > > >> producer, > > > > > > >> >> and also save users from the dilemma while setting the > batch > > > size > > > > > > >> >> configuration. After this KIP, users can set a higher > > > batch.size > > > > > > >> without > > > > > > >> >> worries, and of course, with an appropriate > > > "batch.initial.size" > > > > > and > > > > > > >> >> "batch.reallocation.factor". > > > > > > >> >> > > > > > > >> >> Derailed description can be found here: > > > > > > >> >> > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer > > > > > > >> >> > > > > > > >> >> Any comments and feedback are welcome. > > > > > > >> >> > > > > > > >> >> Thank you. > > > > > > >> >> Luke > > > > > > >> >> > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >