Hello,

During implementation it turned out that the existing Partitioner.partition
method doesn't have enough arguments to accurately estimate record size in
bytes (e.g. it doesn't have headers, cannot take compression into account,
etc.).  So I'm proposing to add a new Partitioner.partition method that
takes a callback that can be used to estimate record size.

I've updated the KIP correspondingly
https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner

-Artem

On Mon, Nov 8, 2021 at 5:42 PM Artem Livshits <alivsh...@confluent.io>
wrote:

> Hi Luke, Justine,
>
> Thank you for feedback and questions. I've added clarification to the KIP.
>
> > there will be some period of time the distribution is not even.
>
> That's correct.  There would be a small temporary imbalance, but over time
> the distribution should be uniform.
>
> > 1. This paragraph is a little confusing, because there's no "batch mode"
> or "non-batch mode", right?
>
> Updated the wording to not use "batch mode"
>
> > 2. In motivation, you mentioned 1 drawback of current
> UniformStickyPartitioner is
>
> The problem with the current implementation is that it switches once a new
> batch is created which may happen after the first record when linger.ms=0.
> The new implementation won't switch after the batch, so even if the first
> record got sent out in a batch, the second record would be produced to the
> same partition.  Once we have 5 batches in-flight, the new records will
> pile up in the accumulator.
>
> > I was curious about how the logic automatically switches here.
>
> Added some clarifications to the KIP.  Basically, because we can only have
> 5 in-flight batches, as soon as the first 5 are in-flight, the records
> start piling up in the accumulator.  If the rate is low, records get sent
> quickly (e.g. if we have latency 50ms, and produce less than 20 rec / sec,
> then each record will often get sent in its own batch, because a batch
> would often complete before a new record arrives).  If the rate is high,
> then the first few records get sent quickly, but then records will batch
> together until one of the in-flight batches completes, the higher the rate
> is (or the higher latency is), the larger the batches are.
>
> This is not a new logic, btw, this is how it works now, the new logic just
> helps to utilize this better by giving the partition an opportunity to hit
> 5 in-flight and start accumulating sooner.  KIP-782 will make this even
> better, so batches could also grow beyond 16KB if production rate is high.
>
> -Artem
>
>
> On Mon, Nov 8, 2021 at 11:56 AM Justine Olshan
> <jols...@confluent.io.invalid> wrote:
>
>> Hi Artem,
>> Thanks for working on improving the Sticky Partitioner!
>>
>> I had a few questions about this portion:
>>
>> *The batching will continue until either an in-flight batch completes or
>> we
>> hit the N bytes and move to the next partition.  This way it takes just 5
>> records to get to batching mode, not 5 x number of partition records, and
>> the batching mode will stay longer as we'll be batching while waiting for
>> a
>> request to be completed.  As the production rate accelerates, the logic
>> will automatically switch to use larger batches to sustain higher
>> throughput.*
>>
>> *If one of the brokers has higher latency the records for the partitions
>> hosted on that broker are going to form larger batches, but it's still
>> going to be the same *amount records* sent less frequently in larger
>> batches, the logic automatically adapts to that.*
>>
>> I was curious about how the logic automatically switches here. It seems
>> like we are just adding *partitioner.sticky.batch.size *which seems like a
>> static value. Can you go into more detail about this logic? Or clarify
>> something I may have missed.
>>
>> On Mon, Nov 8, 2021 at 1:34 AM Luke Chen <show...@gmail.com> wrote:
>>
>> > Thanks Artem,
>> > It's much better now.
>> > I've got your idea. In KIP-480: Sticky Partitioner, we'll change
>> partition
>> > (call partitioner) when either 1 of below condition match
>> > 1. the batch is full
>> > 2. when linger.ms is up
>> > But, you are changing the definition, into a
>> > "partitioner.sticky.batch.size" size is reached.
>> >
>> > It'll fix the uneven distribution issue, because we did the sent out
>> size
>> > calculation in the producer side.
>> > But it might have another issue that when the producer rate is low,
>> there
>> > will be some period of time the distribution is not even. Ex:
>> > tp-1: 12KB
>> > tp-2: 0KB
>> > tp-3: 0KB
>> > tp-4: 0KB
>> > while the producer is still keeping sending records into tp-1 (because
>> we
>> > haven't reached the 16KB threshold)
>> > Maybe the user should set a good value to
>> "partitioner.sticky.batch.size"
>> > to fix this issue?
>> >
>> > Some comment to the KIP:
>> > 1. This paragraph is a little confusing, because there's no "batch
>> mode" or
>> > "non-batch mode", right?
>> >
>> > > The batching will continue until either an in-flight batch completes
>> or
>> > we hit the N bytes and move to the next partition.  This way it takes
>> just
>> > 5 records to get to batching mode, not 5 x number of partition records,
>> and
>> > the batching mode will stay longer as we'll be batching while waiting
>> for a
>> > request to be completed.
>> >
>> > Even with linger.ms=0, before the sender thread is ready, we're always
>> > batching (accumulating) records into batches. So I think the "batch
>> mode"
>> > description is confusing. And that's why I asked you if you have some
>> kind
>> > of "batch switch" here.
>> >
>> > 2. In motivation, you mentioned 1 drawback of current
>> > UniformStickyPartitioner is "the sticky partitioner doesn't create
>> batches
>> > as efficiently", because it sent out a batch with only 1 record (under
>> > linger.ms=0). But I can't tell how you fix this un-efficient issue in
>> the
>> > proposed solution. I still see we sent 1 record within 1 batch. Could
>> you
>> > explain more here?
>> >
>> > Thank you.
>> > Luke
>> >
>> > On Sat, Nov 6, 2021 at 6:41 AM Artem Livshits
>> > <alivsh...@confluent.io.invalid> wrote:
>> >
>> > > Hi Luke,
>> > >
>> > > Thank you for your feedback.  I've updated the KIP with your
>> suggestions.
>> > >
>> > > 1. Updated with a better example.
>> > > 2. I removed the reference to ClassicDefaultPartitioner, it was
>> probably
>> > > confusing.
>> > > 3. The logic doesn't rely on checking batches, I've updated the
>> proposal
>> > to
>> > > make it more explicit.
>> > > 4. The primary issue (uneven distribution) is described in the linked
>> > jira,
>> > > copied an example from jira into the KIP as well.
>> > >
>> > > -Artem
>> > >
>> > >
>> > > On Thu, Nov 4, 2021 at 8:34 PM Luke Chen <show...@gmail.com> wrote:
>> > >
>> > > > Hi Artem,
>> > > > Thanks for the KIP! And thanks for reminding me to complete KIP-782,
>> > > soon.
>> > > > :)
>> > > >
>> > > > Back to the KIP, I have some comments:
>> > > > 1. You proposed to have a new config:
>> "partitioner.sticky.batch.size",
>> > > but
>> > > > I can't see how we're going to use it to make the partitioner
>> better.
>> > > > Please explain more in KIP (with an example will be better as
>> > suggestion
>> > > > (4))
>> > > > 2. In the "Proposed change" section, you take an example to use
>> > > > "ClassicDefaultPartitioner", is that referring to the current
>> default
>> > > > sticky partitioner? I think it'd better you name your proposed
>> > partition
>> > > > with a different name for distinguish between the default one and
>> new
>> > > one.
>> > > > (Although after implementation, we are going to just use the same
>> name)
>> > > > 3. So, if my understanding is correct, you're going to have a
>> "batch"
>> > > > switch, and before the in-flight is full, it's disabled. Otherwise,
>> > we'll
>> > > > enable it. Is that right? Sorry, I don't see any advantage of having
>> > this
>> > > > batch switch. Could you explain more?
>> > > > 4. I think it should be more clear if you can have a clear real
>> example
>> > > in
>> > > > the motivation section, to describe what issue we faced using
>> current
>> > > > sticky partitioner. And in proposed changes section, using the same
>> > > > example, to describe more detail about how you fix this issue with
>> your
>> > > > way.
>> > > >
>> > > > Thank you.
>> > > > Luke
>> > > >
>> > > > On Fri, Nov 5, 2021 at 1:38 AM Artem Livshits
>> > > > <alivsh...@confluent.io.invalid> wrote:
>> > > >
>> > > > > Hello,
>> > > > >
>> > > > > This is the discussion thread for
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
>> > > > > .
>> > > > >
>> > > > > The proposal is a bug fix for
>> > > > > https://issues.apache.org/jira/browse/KAFKA-10888, but it does
>> > > include a
>> > > > > client config change, therefore we have a KIP to discuss.
>> > > > >
>> > > > > -Artem
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to