Hi Artem,

Thanks for the KIP! I have a few comments:

1. In the preamble of the proposed change section, there is still a
mention of the
-1 approach. My understanding is that we have moved away from it now.

2. I am a bit concerned by the trick suggested about the DefaultPartitioner and
the UniformStickyPartitioner. I do agree that implementing the logic in the
producer itself is a good thing. However, it is weird from a user perspective
that he can set a class as partitioner that is not used in the end. I think that
this will be confusing for our users. Have we considered changing the default
value of partitioner.class to null to indicate that the new built-in partitioner
must be used? By default, the built-in partitioner would be used unless the
user explicitly specify one. The downside is that the new default behavior
would not work if the user explicitly specify the partitioner but we could
mitigate this with my next point.

3. Related to the previous point, I think that we could deprecate both the
DefaultPartitioner and the UniformStickyPartitioner. I would also add a
warning if one of them is explicitly provided by the user to inform them
that they should switch to the new built-in one. I am pretty sure that most
of the folks use the default configuration anyway.

4. It would be great if we could explain why the -1 way was rejected. At
the moment, the rejected alternative only explain the idea but does not
say why we rejected it.

Best,
David

On Fri, Mar 4, 2022 at 6:03 AM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:
>
> Hi Jun,
>
> 2. Removed the option from the KIP.  Now the sticky partitioning threshold
> is hardcoded to batch.size.
>
> 20. Added the corresponding wording to the KIP.
>
> -Artem
>
> On Thu, Mar 3, 2022 at 10:52 AM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 1. Sounds good.
> >
> > 2. If we don't expect users to change it, we probably could just leave out
> > the new config. In general, it's easy to add a new config, but hard to
> > remove an existing config.
> >
> > 20. The two new configs enable.adaptive.partitioning and
> > partition.availability.timeout.ms only apply to the two built-in
> > partitioners DefaultPartitioner and UniformStickyPartitioner, right? It
> > would be useful to document that in the KIP.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Mar 3, 2022 at 9:47 AM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> > > Hi Jun,
> > >
> > > Thank you for the suggestions.
> > >
> > > 1. As we discussed offline, we can hardcode the logic for
> > > DefaultPartitioner and UniformStickyPartitioner in the KafkaProducer
> > (i.e.
> > > the DefaultPartitioner.partition won't get called, instead KafkaProducer
> > > would check if the partitioner is an instance of DefaultPartitioner and
> > > then run the actual partitioning logic itself).  Then the change to the
> > > Partitioner wouldn't be required.  I'll update the KIP to reflect that.
> > >
> > > 2. I don't expect users to change this too often, as changing it would
> > > require a bit of studying of the production patterns.  As a general
> > > principle, if I can think of a model that requires a deviation from
> > > default, I tend to add a configuration option.  It could be that it'll
> > > never get used in practice, but I cannot prove that.  I'm ok with
> > removing
> > > the option, let me know what you think.
> > >
> > > -Artem
> > >
> > > On Mon, Feb 28, 2022 at 2:06 PM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > >
> > > > Hi, Artem,
> > > >
> > > > Thanks for the reply. A few more comments.
> > > >
> > > > 1. Since we control the implementation and the usage of
> > > DefaultPartitioner,
> > > > another way is to instantiate the DefaultPartitioner with a special
> > > > constructor, which allows it to have more access to internal
> > information.
> > > > Then we could just change the behavior of  DefaultPartitioner such that
> > > it
> > > > can use the internal infoamtion when choosing the partition. This seems
> > > > more intuitive than having DefaultPartitioner return -1 partition.
> > > >
> > > > 2. I guess partitioner.sticky.batch.size is introduced because the
> > > > effective batch size could be less than batch.size and we want to align
> > > > partition switching with the effective batch size. How would a user
> > know
> > > > the effective batch size to set partitioner.sticky.batch.size properly?
> > > If
> > > > the user somehow knows the effective batch size, does setting
> > batch.size
> > > to
> > > > the effective batch size achieve the same result?
> > > >
> > > > 4. Thanks for the explanation. Makes sense to me.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Feb 25, 2022 at 8:26 PM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > 1. Updated the KIP to add a couple paragraphs about implementation
> > > > > necessities in the Proposed Changes section.
> > > > >
> > > > > 2. Sorry if my reply was confusing, what I meant to say (and I
> > > elaborated
> > > > > on that in point #3) is that there could be patterns for which 16KB
> > > > > wouldn't be the most effective setting, thus it would be good to make
> > > it
> > > > > configurable.
> > > > >
> > > > > 4. We could use broker readiness timeout.  But I'm not sure it would
> > > > > correctly model the broker load.  The problem is that latency is not
> > an
> > > > > accurate measure of throughput, we could have 2 brokers that have
> > equal
> > > > > throughput but one has higher latency (so it takes larger batches
> > less
> > > > > frequently, but still takes the same load).  Latency-based logic is
> > > > likely
> > > > > to send less data to the broker with higher latency.  Using the queue
> > > > size
> > > > > would adapt to throughput, regardless of latency (which could be
> > just a
> > > > > result of deployment topology), so that's the model chosen in the
> > > > > proposal.  The partition.availability.timeout.ms logic approaches
> > the
> > > > > model
> > > > > from a slightly different angle, say we have a requirement to deliver
> > > > > messages via brokers that have a certain latency, then
> > > > > partition.availability.timeout.ms could be used to tune that.
> > Latency
> > > > is
> > > > > a
> > > > > much more volatile metric than throughput (latency depends on
> > external
> > > > > load, on capacity, on deployment topology, on jitter in network, on
> > > > jitter
> > > > > in disk, etc.) and I think it would be best to leave latency-based
> > > > > thresholds configurable to tune for the environment.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Wed, Feb 23, 2022 at 11:14 AM Jun Rao <j...@confluent.io.invalid>
> > > > wrote:
> > > > >
> > > > > > Hi, Artem,
> > > > > >
> > > > > > Thanks for the reply. A few more comments.
> > > > > >
> > > > > > 1. Perhaps you could elaborate a bit more on how the producer
> > > > determines
> > > > > > the partition if the partitioner returns -1. This will help
> > > understand
> > > > > why
> > > > > > encapsulating that logic as a partitioner is not clean.
> > > > > >
> > > > > > 2. I am not sure that I understand this part. If 15.5KB is more
> > > > > efficient,
> > > > > > could we just set batch.size to 15.5KB?
> > > > > >
> > > > > > 4. Yes, we could add a switch (or a variant of the partitioner) for
> > > > > > enabling this behavior. Also, choosing partitions based on broker
> > > > > readiness
> > > > > > can be made in a smoother way. For example, we could track the last
> > > > time
> > > > > a
> > > > > > broker has drained any batches from the accumulator. We can then
> > > select
> > > > > > partitions from brokers proportionally to the inverse of that time.
> > > > This
> > > > > > seems smoother than a cutoff based on a
> > > > > partition.availability.timeout.ms
> > > > > >  threshold.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits
> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hello Luke, Jun,
> > > > > > >
> > > > > > > Thank you for your feedback.  I've added the Rejected Alternative
> > > > > section
> > > > > > > that may clarify some of the questions w.r.t. returning -1.
> > > > > > >
> > > > > > > 1. I've elaborated on the -1 in the KIP.  The problem is that a
> > > > > > significant
> > > > > > > part of the logic needs to be in the producer (because it now
> > uses
> > > > > > > information about brokers that only the producer knows), so
> > > > > encapsulation
> > > > > > > of the logic within the default partitioner isn't as clean.
> >  I've
> > > > > added
> > > > > > > the Rejected Alternative section that documents an attempt to
> > keep
> > > > the
> > > > > > > encapsulation by providing new callbacks to the partitioner.
> > > > > > >
> > > > > > > 2. The meaning of the partitioner.sticky.batch.size is explained
> > in
> > > > the
> > > > > > > Uniform Sticky Batch Size section.  Basically, we track the
> > amount
> > > of
> > > > > > bytes
> > > > > > > produced to the partition and if it exceeds
> > > > > partitioner.sticky.batch.size
> > > > > > > then we switch to the next partition.  As far as the reason to
> > make
> > > > it
> > > > > > > different from batch.size, I think Luke answered this with the
> > > > question
> > > > > > #3
> > > > > > > -- what if the load pattern is such that 15.5KB would be more
> > > > efficient
> > > > > > > than 16KB?
> > > > > > >
> > > > > > > 3. I think it's hard to have one size that would fit all
> > patterns.
> > > > > E.g.
> > > > > > if
> > > > > > > the load pattern is such that there is linger and the app fills
> > the
> > > > > batch
> > > > > > > before linger expires, then having 16KB would most likely
> > > synchronize
> > > > > > > batching and partition switching, so each partition would get a
> > > full
> > > > > > > batch.  If load pattern is such that there are a few non-complete
> > > > > batches
> > > > > > > go out before a larger batch starts to fill, then it may actually
> > > be
> > > > > > > beneficial to make slightly larger (e.g. linger=0, first few
> > > records
> > > > go
> > > > > > in
> > > > > > > the first batch, then next few records go to second batch, and so
> > > on,
> > > > > > until
> > > > > > > 5 in-flight, then larger batch would form while waiting for
> > broker
> > > to
> > > > > > > respond, but the partition switch would happen before the larger
> > > > batch
> > > > > is
> > > > > > > full).
> > > > > > >
> > > > > > > 4. There are a couple of reasons for introducing
> > > > > > > partition.availability.timeout.ms.  Luke's an Jun's questions
> > are
> > > > > > slightly
> > > > > > > different, so I'm going to separate replies.
> > > > > > > (Luke) Is the queue size a good enough signal?  I think it's a
> > good
> > > > > > default
> > > > > > > signal as it tries to preserve general fairness and not overreact
> > > on
> > > > > the
> > > > > > > broker's state at each moment in time.  But because it's smooth,
> > it
> > > > may
> > > > > > not
> > > > > > > be reactive enough to instantaneous latency jumps.  For
> > > > > latency-sensitive
> > > > > > > workloads, it may be desirable to react faster when a broker
> > > becomes
> > > > > > > unresponsive (but that may make the distribution really choppy),
> > so
> > > > > > > partition.availability.timeout.ms provides an opportunity to
> > tune
> > > > > > > adaptiveness.
> > > > > > >
> > > > > > > (Jun) Can we just not assign partitions to brokers that are not
> > > > ready?
> > > > > > > Switching partitions purely based on current broker readiness
> > > > > information
> > > > > > > can really skew workload I think (or at least I couldn't build a
> > > > model
> > > > > > that
> > > > > > > proves that over time it's going to be generally fair), I feel
> > that
> > > > the
> > > > > > > algorithm should try to be fair in general and use smoother
> > signals
> > > > by
> > > > > > > default (e.g. a broker with choppier latency may get much less
> > load
> > > > > even
> > > > > > > though it can handle throughput, it then may potentially skew
> > > > > > consumption),
> > > > > > > note that the queue-size-based logic uses probabilities (so we
> > > don't
> > > > > > fully
> > > > > > > remove brokers, just make it less likely) and relative info
> > rather
> > > > > than a
> > > > > > > threshold (so if all brokers are heavily, but equally loaded,
> > they
> > > > will
> > > > > > get
> > > > > > > equal distribution, rather than get removed because they exceed
> > > some
> > > > > > > threshold).  So at the very least, I would like this logic to be
> > > > turned
> > > > > > off
> > > > > > > by default as it's hard to predict what it could do with
> > different
> > > > > > patterns
> > > > > > > (which means that there would need to be some configuration).  We
> > > > could
> > > > > > > just not use brokers that are not ready, but again, I think that
> > > it's
> > > > > > good
> > > > > > > to try to be fair under normal circumstances, so if normally
> > > brokers
> > > > > can
> > > > > > > respond under some partition.availability.timeout.ms threshold
> > and
> > > > the
> > > > > > > application works well with those latencies, then we could
> > > distribute
> > > > > > data
> > > > > > > equally between brokers that don't exceed the latencies.  The
> > > value,
> > > > of
> > > > > > > course, would depend on the environment and app requirements,
> > hence
> > > > > it's
> > > > > > > configurable.
> > > > > > >
> > > > > > > 10. Added a statement at the beginning of the proposed changes.
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Feb 17, 2022 at 3:46 PM Jun Rao <j...@confluent.io.invalid
> > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Artem,
> > > > > > > >
> > > > > > > > Thanks for the KIP. A few comments below.
> > > > > > > >
> > > > > > > > 1. I agree with Luke that having the partitioner returning -1
> > is
> > > > kind
> > > > > > of
> > > > > > > > weird. Could we just change the implementation of
> > > > DefaultPartitioner
> > > > > to
> > > > > > > the
> > > > > > > > new behavior?
> > > > > > > >
> > > > > > > > 2. partitioner.sticky.batch.size: Similar question to Luke. I
> > am
> > > > not
> > > > > > sure
> > > > > > > > why we want to introduce this new configuration. Could we just
> > > use
> > > > > the
> > > > > > > > existing batch.size?
> > > > > > > >
> > > > > > > > 4. I also agree with Luke that it's not clear why we need
> > > > > > > > partition.availability.timeout.ms. The KIP says the broker
> > > "would
> > > > > not
> > > > > > be
> > > > > > > > chosen until the broker is able to accept the next ready batch
> > > from
> > > > > the
> > > > > > > > partition". If we are keeping track of this, could we just
> > avoid
> > > > > > > assigning
> > > > > > > > records to partitions whose leader is not able to accept the
> > next
> > > > > > batch?
> > > > > > > If
> > > > > > > > we do that, perhaps we don't need
> > > > partition.availability.timeout.ms.
> > > > > > > >
> > > > > > > > 10. Currently, partitioner.class defaults to
> > DefaultPartitioner,
> > > > > which
> > > > > > > uses
> > > > > > > > StickyPartitioner when the key is specified. Since this KIP
> > > > improves
> > > > > > upon
> > > > > > > > StickyPartitioner, it would be useful to make the new behavior
> > > the
> > > > > > > default
> > > > > > > > and document that in the KIP.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Feb 16, 2022 at 7:30 PM Luke Chen <show...@gmail.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Artem,
> > > > > > > > >
> > > > > > > > > Also, one more thing I think you need to know.
> > > > > > > > > As this bug KAFKA-7572 <
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-7572
> > > > > > > > >
> > > > > > > > > mentioned, sometimes the custom partitioner would return
> > > negative
> > > > > > > > partition
> > > > > > > > > id accidentally.
> > > > > > > > > If it returned -1, how could you know if it is expected or
> > not
> > > > > > > expected?
> > > > > > > > >
> > > > > > > > > Thanks.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Wed, Feb 16, 2022 at 3:28 PM Luke Chen <show...@gmail.com
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Artem,
> > > > > > > > > >
> > > > > > > > > > Thanks for the update. I have some questions about it:
> > > > > > > > > >
> > > > > > > > > > 1. Could you explain why you need the `partitioner` return
> > > -1?
> > > > In
> > > > > > > which
> > > > > > > > > > case we need it? And how it is used in your KIP?
> > > > > > > > > > 2. What does the "partitioner.sticky.batch.size" mean? In
> > the
> > > > > > > > > > "Configuration" part, you didn't explain it. And default to
> > > 0,
> > > > I
> > > > > > > guess
> > > > > > > > > it's
> > > > > > > > > > the same as current behavior for backward compatibility,
> > > right?
> > > > > You
> > > > > > > > > should
> > > > > > > > > > mention it.
> > > > > > > > > > 3. I'm thinking we can have a threshold to the
> > > > > > > > > > "partitioner.sticky.batch.size". Let's say, we already
> > > > accumulate
> > > > > > > > 15.5KB
> > > > > > > > > in
> > > > > > > > > > partition1, and sent. So when next batch created, in your
> > > > current
> > > > > > > > design,
> > > > > > > > > > we still stick to partition1, until 16KB reached, and then
> > we
> > > > > > create
> > > > > > > a
> > > > > > > > > new
> > > > > > > > > > batch to change to next partition, ex: partition2. But I
> > > think
> > > > if
> > > > > > we
> > > > > > > > set
> > > > > > > > > a
> > > > > > > > > > threshold to 95% (for example), we can know previous 15.5KB
> > > > > already
> > > > > > > > > exceeds
> > > > > > > > > > the threshold so that we can directly create new batch for
> > > next
> > > > > > > > records.
> > > > > > > > > > This way should be able to make it more efficient. WDYT?
> > > > > > > > > > 4. I think the improved queuing logic should be good
> > enough.
> > > I
> > > > > > can't
> > > > > > > > get
> > > > > > > > > > the benefit of having `partition.availability.timeout.ms`
> > > > > config.
> > > > > > In
> > > > > > > > > > short, you want to make the partitioner take the broker
> > load
> > > > into
> > > > > > > > > > consideration. We can just improve that in the queuing
> > logic
> > > > (and
> > > > > > you
> > > > > > > > > > already did it). Why should we add the config? Could you
> > use
> > > > some
> > > > > > > > > examples
> > > > > > > > > > to explain why we need it.
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > > Luke
> > > > > > > > > >
> > > > > > > > > > On Wed, Feb 16, 2022 at 8:57 AM Artem Livshits
> > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > >> Hello,
> > > > > > > > > >>
> > > > > > > > > >> Please add your comments about the KIP.  If there are no
> > > > > > > > considerations,
> > > > > > > > > >> I'll put it up for vote in the next few days.
> > > > > > > > > >>
> > > > > > > > > >> -Artem
> > > > > > > > > >>
> > > > > > > > > >> On Mon, Feb 7, 2022 at 6:01 PM Artem Livshits <
> > > > > > > alivsh...@confluent.io
> > > > > > > > >
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hello,
> > > > > > > > > >> >
> > > > > > > > > >> > After trying a few prototypes, I've made some changes to
> > > the
> > > > > > > public
> > > > > > > > > >> > interface.  Please see the updated document
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > > > >> > .
> > > > > > > > > >> >
> > > > > > > > > >> > -Artem
> > > > > > > > > >> >
> > > > > > > > > >> > On Thu, Nov 4, 2021 at 10:37 AM Artem Livshits <
> > > > > > > > > alivsh...@confluent.io>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >> Hello,
> > > > > > > > > >> >>
> > > > > > > > > >> >> This is the discussion thread for
> > > > > > > > > >> >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > > > >> >> .
> > > > > > > > > >> >>
> > > > > > > > > >> >> The proposal is a bug fix for
> > > > > > > > > >> >> https://issues.apache.org/jira/browse/KAFKA-10888, but
> > > it
> > > > > does
> > > > > > > > > >> include a
> > > > > > > > > >> >> client config change, therefore we have a KIP to
> > discuss.
> > > > > > > > > >> >>
> > > > > > > > > >> >> -Artem
> > > > > > > > > >> >>
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

Reply via email to