Hi Luke,

I don't mind increasing the max.request.size to a higher number, e.g. 2MB
could be good.  I think we should also run some benchmarks to see the
effects of different sizes.

I agree that changing round robin to random solves an independent existing
issue, however the logic in this KIP exacerbates the issue, so there is
some dependency.

-Artem

On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <show...@gmail.com> wrote:

> Hi Artem,
> Yes, I agree if we go with random selection instead of round-robin
> selection, the latency issue will be more fair. That is, if there are 10
> partitions, the 10th partition will always be the last choice in each round
> in current design, but with random selection, the chance to be selected is
> more fair.
>
> However, I think that's kind of out of scope with this KIP. This is an
> existing issue, and it might need further discussion to decide if this
> change is necessary.
>
> I agree the default 32KB for "batch.max.size" might be not huge improvement
> compared with 256KB. I'm thinking, maybe default to "64KB" for
> "batch.max.size", and make the documentation clear that if the
> "batch.max.size"
> is increased, there might be chances that the "ready" partitions need to
> wait for next request to send to broker, because of the "max.request.size"
> (default 1MB) limitation. "max.request.size" can also be considered to
> increase to avoid this issue. What do you think?
>
> Thank you.
> Luke
>
> On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> >
> > I think 32KB is too small.  With 5 in-flight and 100ms latency we can
> > produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s per
> > partition.  We should probably set up some testing and see if 256KB has
> > problems.
> >
> > To illustrate latency dynamics, let's consider a simplified model: 1
> > in-flight request per broker, produce latency 125ms, 256KB max request
> > size, 16 partitions assigned to the same broker, every second 128KB is
> > produced to each partition (total production rate is 2MB/sec).
> >
> > If the batch size is 16KB, then the pattern would be the following:
> >
> > 0ms - produce 128KB into each partition
> > 0ms - take 16KB from each partition send (total 256KB)
> > 125ms - complete first 16KB from each partition, send next 16KB
> > 250ms - complete second 16KB, send next 16KB
> > ...
> > 1000ms - complete 8th 16KB from each partition
> >
> > from this model it's easy to see that there are 256KB that are sent
> > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > 875ms.
> >
> > If the batch size is 256KB, then the pattern would be the following:
> >
> > 0ms - produce 128KB into each partition
> > 0ms - take 128KB each from first 2 partitions and send (total 256KB)
> > 125ms - complete 2 first partitions, send data from next 2 partitions
> > ...
> > 1000ms - complete last 2 partitions
> >
> > even though the pattern is different, there are still 256KB that are sent
> > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > 875ms.
> >
> > Now, in this example if we do strictly round-robin (current
> implementation)
> > and we have this exact pattern (not sure how often such regular pattern
> > would happen in practice -- I would expect that it would be a bit more
> > random), some partitions would experience higher latency than others (not
> > sure how much it would matter in practice -- in the end of the day some
> > bytes produced to a topic would have higher latency and some bytes would
> > have lower latency).  This pattern is easily fixed by choosing the next
> > partition randomly instead of using round-robin.
> >
> > -Artem
> >
> > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <show...@gmail.com> wrote:
> >
> > > Hi Tom,
> > > Thanks for your comments. And thanks for Artem's explanation.
> > > Below is my response:
> > >
> > > > Currently because buffers are allocated using batch.size it means we
> > can
> > > handle records that are that large (e.g. one big record per batch).
> > Doesn't
> > > the introduction of smaller buffer sizes (batch.initial.size) mean a
> > > corresponding decrease in the maximum record size that the producer can
> > > handle?
> > >
> > > Actually, the "batch.size" is only like a threshold to decide if the
> > batch
> > > is "ready to be sent". That is, even if you set the "batch.size=16KB"
> > > (default value), users can still send one record sized with 20KB, as
> long
> > > as the size is less than "max.request.size" in producer (default 1MB).
> > > Therefore, the introduction of "batch.initial.size" won't decrease the
> > > maximum record size that the producer can handle.
> > >
> > > > But isn't there the risk that drainBatchesForOneNode would end up not
> > > sending ready
> > > batches well past when they ought to be sent (according to their
> > linger.ms
> > > ),
> > > because it's sending buffers for earlier partitions too aggressively?
> > >
> > > Did you mean that we have a "max.request.size" per request (default is
> > > 1MB), and before this KIP, the request can include 64 batches in single
> > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be able to
> > > include 32 batches or less, because we aggressively sent more records
> in
> > > one batch, is that what you meant? That's a really good point that I've
> > > never thought about. I think your suggestion to go through other
> > partitions
> > > that just fit "batch.size", or expire "linger.ms" first, before
> handling
> > > the one that is > "batch.size" limit is not a good way, because it
> might
> > > cause the one with size > "batch.size" always in the lowest priority,
> and
> > > cause starving issue that the batch won't have chance to get sent.
> > >
> > > I don't have better solution for it, but maybe I can firstly decrease
> the
> > > "batch.max.size" to 32KB, instead of aggressively 256KB in the KIP.
> That
> > > should alleviate the problem. And still improve the throughput. What do
> > you
> > > think?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > > > > I think this KIP would change the behaviour of producers when there
> > are
> > > > multiple partitions ready to be sent
> > > >
> > > > This is correct, the pattern changes and becomes more coarse-grained.
> > > But
> > > > I don't think it changes fairness over the long run.  I think it's a
> > good
> > > > idea to change drainIndex to be random rather than round robin to
> avoid
> > > > forming patterns where some partitions would consistently get higher
> > > > latencies than others because they wait longer for their turn.
> > > >
> > > > If we really wanted to preserve the exact patterns, we could either
> try
> > > to
> > > > support multiple 16KB batches from one partition per request
> (probably
> > > > would require protocol change to change logic on the broker for
> > duplicate
> > > > detection) or try to re-batch 16KB batches from accumulator into
> larger
> > > > batches during send (additional computations) or try to consider all
> > > > partitions assigned to a broker to check if a new batch needs to be
> > > created
> > > > (i.e. compare cumulative batch size from all partitions assigned to a
> > > > broker and create new batch when cumulative size is 1MB, more
> complex).
> > > >
> > > > Overall, it seems like just increasing the max batch size is a
> simpler
> > > > solution and it does favor larger batch sizes, which is beneficial
> not
> > > just
> > > > for production.
> > > >
> > > > > ready batches well past when they ought to be sent (according to
> > their
> > > > linger.ms)
> > > >
> > > > The trigger for marking batches ready to be sent isn't changed - a
> > batch
> > > is
> > > > ready to be sent once it reaches 16KB, so by the time larger batches
> > > start
> > > > forming, linger.ms wouldn't matter much because the batching goal is
> > met
> > > > and the batch can be sent immediately.  Larger batches start forming
> > once
> > > > the client starts waiting for the server, in which case some data
> will
> > > wait
> > > > its turn to be sent.  This will happen for some data regardless of
> how
> > we
> > > > pick data to send, the question is just whether we'd have some
> > scenarios
> > > > where some partitions would consistently experience higher latency
> than
> > > > others.  I think picking drainIndex randomly would prevent such
> > > scenarios.
> > > >
> > > > -Artem
> > > >
> > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <tbent...@redhat.com>
> > wrote:
> > > >
> > > > > Hi Luke,
> > > > >
> > > > > Thanks for the KIP!
> > > > >
> > > > > Currently because buffers are allocated using batch.size it means
> we
> > > can
> > > > > handle records that are that large (e.g. one big record per batch).
> > > > Doesn't
> > > > > the introduction of smaller buffer sizes (batch.initial.size) mean
> a
> > > > > corresponding decrease in the maximum record size that the producer
> > can
> > > > > handle? That might not be a problem if the user knows their maximum
> > > > record
> > > > > size and has tuned batch.initial.size accordingly, but if the
> default
> > > for
> > > > > batch.initial.size < batch.size it could cause regressions for
> > existing
> > > > > users with a large record size, I think. It should be enough for
> > > > > batch.initial.size to default to batch.size, allowing users who
> care
> > > > about
> > > > > the memory saving in the off-peak throughput case to do the tuning,
> > but
> > > > not
> > > > > causing a regression for existing users.
> > > > >
> > > > > I think this KIP would change the behaviour of producers when there
> > are
> > > > > multiple partitions ready to be sent: By sending all the ready
> > buffers
> > > > > (which may now be > batch.size) for the first partition, we could
> end
> > > up
> > > > > excluding ready buffers for other partitions from the current send.
> > In
> > > > > other words, as I understand the KIP currently, there's a change in
> > > > > fairness. I think the code in
> > RecordAccumulator#drainBatchesForOneNode
> > > > will
> > > > > ensure fairness in the long run, because the drainIndex will ensure
> > > that
> > > > > those other partitions each get their turn at being the first. But
> > > isn't
> > > > > there the risk that drainBatchesForOneNode would end up not sending
> > > ready
> > > > > batches well past when they ought to be sent (according to their
> > > > linger.ms
> > > > > ),
> > > > > because it's sending buffers for earlier partitions too
> aggressively?
> > > Or,
> > > > > to put it another way, perhaps the RecordAccumulator should
> > round-robin
> > > > the
> > > > > ready buffers for _all_ the partitions before trying to fill the
> > > > remaining
> > > > > space with the extra buffers (beyond the batch.size limit) for the
> > > first
> > > > > partitions?
> > > > >
> > > > > Kind regards,
> > > > >
> > > > > Tom
> > > > >
> > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <show...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi Ismael and all devs,
> > > > > > Is there any comments/suggestions to this KIP?
> > > > > > If no, I'm going to update the KIP based on my previous mail, and
> > > > start a
> > > > > > vote tomorrow or next week.
> > > > > >
> > > > > > Thank you.
> > > > > > Luke
> > > > > >
> > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <show...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi Ismael,
> > > > > > > Thanks for your comments.
> > > > > > >
> > > > > > > 1. Why do we have to reallocate the buffer? We can keep a list
> of
> > > > > buffers
> > > > > > > instead and avoid reallocation.
> > > > > > > -> Do you mean we allocate multiple buffers with
> > > > "buffer.initial.size",
> > > > > > > and link them together (with linked list)?
> > > > > > > ex:
> > > > > > > a. We allocate 4KB initial buffer
> > > > > > > | 4KB |
> > > > > > >
> > > > > > > b. when new records reached and the remaining buffer is not
> > enough
> > > > for
> > > > > > the
> > > > > > > records, we create another batch with "batch.initial.size"
> buffer
> > > > > > > ex: we already have 3KB of data in the 1st buffer, and here
> comes
> > > the
> > > > > 2KB
> > > > > > > record
> > > > > > >
> > > > > > > | 4KB (1KB remaining) |
> > > > > > > now, record: 2KB coming
> > > > > > > We fill the 1st 1KB into 1st buffer, and create new buffer, and
> > > > linked
> > > > > > > together, and fill the rest of data into it
> > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > >
> > > > > > > Is that what you mean?
> > > > > > > If so, I think I like this idea!
> > > > > > > If not, please explain more detail about it.
> > > > > > > Thank you.
> > > > > > >
> > > > > > > 2. I think we should also consider tweaking the semantics of
> > > > batch.size
> > > > > > so
> > > > > > > that the sent batches can be larger if the batch is not ready
> to
> > be
> > > > > sent
> > > > > > > (while still respecting max.request.size and perhaps a new
> > > > > > max.batch.size).
> > > > > > >
> > > > > > > --> In the KIP, I was trying to make the "batch.size" as the
> > upper
> > > > > bound
> > > > > > > of the batch size, and introduce a "batch.initial.size" as
> > initial
> > > > > batch
> > > > > > > size.
> > > > > > > So are you saying that we can let "batch.size" as initial batch
> > > size
> > > > > and
> > > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > > That's a good suggestion, but that would change the semantics
> of
> > > > > > > "batch.size", which might surprise some users. I think my
> > original
> > > > > > proposal
> > > > > > > ("batch.initial.size") is safer for users. What do you think?
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <ism...@juma.me.uk
> >
> > > > wrote:
> > > > > > >
> > > > > > >> I think we should also consider tweaking the semantics of
> > > batch.size
> > > > > so
> > > > > > >> that the sent batches can be larger if the batch is not ready
> to
> > > be
> > > > > sent
> > > > > > >> (while still respecting max.request.size and perhaps a new
> > > > > > >> max.batch.size).
> > > > > > >>
> > > > > > >> Ismael
> > > > > > >>
> > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <ism...@juma.me.uk
> >
> > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Luke,
> > > > > > >> >
> > > > > > >> > Thanks for the KIP. Why do we have to reallocate the buffer?
> > We
> > > > can
> > > > > > >> keep a
> > > > > > >> > list of buffers instead and avoid reallocation.
> > > > > > >> >
> > > > > > >> > Ismael
> > > > > > >> >
> > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <show...@gmail.com>
> > > > wrote:
> > > > > > >> >
> > > > > > >> >> Hi Kafka dev,
> > > > > > >> >> I'd like to start the discussion for the proposal: KIP-782:
> > > > > > Expandable
> > > > > > >> >> batch size in producer.
> > > > > > >> >>
> > > > > > >> >> The main purpose for this KIP is to have better memory
> usage
> > in
> > > > > > >> producer,
> > > > > > >> >> and also save users from the dilemma while setting the
> batch
> > > size
> > > > > > >> >> configuration. After this KIP, users can set a higher
> > > batch.size
> > > > > > >> without
> > > > > > >> >> worries, and of course, with an appropriate
> > > "batch.initial.size"
> > > > > and
> > > > > > >> >> "batch.reallocation.factor".
> > > > > > >> >>
> > > > > > >> >> Derailed description can be found here:
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > >> >>
> > > > > > >> >> Any comments and feedback are welcome.
> > > > > > >> >>
> > > > > > >> >> Thank you.
> > > > > > >> >> Luke
> > > > > > >> >>
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to