Hello Luke, Jun,

Thank you for your feedback.  I've added the Rejected Alternative section
that may clarify some of the questions w.r.t. returning -1.

1. I've elaborated on the -1 in the KIP.  The problem is that a significant
part of the logic needs to be in the producer (because it now uses
information about brokers that only the producer knows), so encapsulation
of the logic within the default partitioner isn't as clean.   I've added
the Rejected Alternative section that documents an attempt to keep the
encapsulation by providing new callbacks to the partitioner.

2. The meaning of the partitioner.sticky.batch.size is explained in the
Uniform Sticky Batch Size section.  Basically, we track the amount of bytes
produced to the partition and if it exceeds partitioner.sticky.batch.size
then we switch to the next partition.  As far as the reason to make it
different from batch.size, I think Luke answered this with the question #3
-- what if the load pattern is such that 15.5KB would be more efficient
than 16KB?

3. I think it's hard to have one size that would fit all patterns.  E.g. if
the load pattern is such that there is linger and the app fills the batch
before linger expires, then having 16KB would most likely synchronize
batching and partition switching, so each partition would get a full
batch.  If load pattern is such that there are a few non-complete batches
go out before a larger batch starts to fill, then it may actually be
beneficial to make slightly larger (e.g. linger=0, first few records go in
the first batch, then next few records go to second batch, and so on, until
5 in-flight, then larger batch would form while waiting for broker to
respond, but the partition switch would happen before the larger batch is
full).

4. There are a couple of reasons for introducing
partition.availability.timeout.ms.  Luke's an Jun's questions are slightly
different, so I'm going to separate replies.
(Luke) Is the queue size a good enough signal?  I think it's a good default
signal as it tries to preserve general fairness and not overreact on the
broker's state at each moment in time.  But because it's smooth, it may not
be reactive enough to instantaneous latency jumps.  For latency-sensitive
workloads, it may be desirable to react faster when a broker becomes
unresponsive (but that may make the distribution really choppy), so
partition.availability.timeout.ms provides an opportunity to tune
adaptiveness.

(Jun) Can we just not assign partitions to brokers that are not ready?
Switching partitions purely based on current broker readiness information
can really skew workload I think (or at least I couldn't build a model that
proves that over time it's going to be generally fair), I feel that the
algorithm should try to be fair in general and use smoother signals by
default (e.g. a broker with choppier latency may get much less load even
though it can handle throughput, it then may potentially skew consumption),
note that the queue-size-based logic uses probabilities (so we don't fully
remove brokers, just make it less likely) and relative info rather than a
threshold (so if all brokers are heavily, but equally loaded, they will get
equal distribution, rather than get removed because they exceed some
threshold).  So at the very least, I would like this logic to be turned off
by default as it's hard to predict what it could do with different patterns
(which means that there would need to be some configuration).  We could
just not use brokers that are not ready, but again, I think that it's good
to try to be fair under normal circumstances, so if normally brokers can
respond under some partition.availability.timeout.ms threshold and the
application works well with those latencies, then we could distribute data
equally between brokers that don't exceed the latencies.  The value, of
course, would depend on the environment and app requirements, hence it's
configurable.

10. Added a statement at the beginning of the proposed changes.

-Artem


On Thu, Feb 17, 2022 at 3:46 PM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the KIP. A few comments below.
>
> 1. I agree with Luke that having the partitioner returning -1 is kind of
> weird. Could we just change the implementation of DefaultPartitioner to the
> new behavior?
>
> 2. partitioner.sticky.batch.size: Similar question to Luke. I am not sure
> why we want to introduce this new configuration. Could we just use the
> existing batch.size?
>
> 4. I also agree with Luke that it's not clear why we need
> partition.availability.timeout.ms. The KIP says the broker "would not be
> chosen until the broker is able to accept the next ready batch from the
> partition". If we are keeping track of this, could we just avoid assigning
> records to partitions whose leader is not able to accept the next batch? If
> we do that, perhaps we don't need partition.availability.timeout.ms.
>
> 10. Currently, partitioner.class defaults to DefaultPartitioner, which uses
> StickyPartitioner when the key is specified. Since this KIP improves upon
> StickyPartitioner, it would be useful to make the new behavior the default
> and document that in the KIP.
>
> Thanks,
>
> Jun
>
>
> On Wed, Feb 16, 2022 at 7:30 PM Luke Chen <show...@gmail.com> wrote:
>
> > Hi Artem,
> >
> > Also, one more thing I think you need to know.
> > As this bug KAFKA-7572 <https://issues.apache.org/jira/browse/KAFKA-7572
> >
> > mentioned, sometimes the custom partitioner would return negative
> partition
> > id accidentally.
> > If it returned -1, how could you know if it is expected or not expected?
> >
> > Thanks.
> > Luke
> >
> > On Wed, Feb 16, 2022 at 3:28 PM Luke Chen <show...@gmail.com> wrote:
> >
> > > Hi Artem,
> > >
> > > Thanks for the update. I have some questions about it:
> > >
> > > 1. Could you explain why you need the `partitioner` return -1? In which
> > > case we need it? And how it is used in your KIP?
> > > 2. What does the "partitioner.sticky.batch.size" mean? In the
> > > "Configuration" part, you didn't explain it. And default to 0, I guess
> > it's
> > > the same as current behavior for backward compatibility, right? You
> > should
> > > mention it.
> > > 3. I'm thinking we can have a threshold to the
> > > "partitioner.sticky.batch.size". Let's say, we already accumulate
> 15.5KB
> > in
> > > partition1, and sent. So when next batch created, in your current
> design,
> > > we still stick to partition1, until 16KB reached, and then we create a
> > new
> > > batch to change to next partition, ex: partition2. But I think if we
> set
> > a
> > > threshold to 95% (for example), we can know previous 15.5KB already
> > exceeds
> > > the threshold so that we can directly create new batch for next
> records.
> > > This way should be able to make it more efficient. WDYT?
> > > 4. I think the improved queuing logic should be good enough. I can't
> get
> > > the benefit of having `partition.availability.timeout.ms` config. In
> > > short, you want to make the partitioner take the broker load into
> > > consideration. We can just improve that in the queuing logic (and you
> > > already did it). Why should we add the config? Could you use some
> > examples
> > > to explain why we need it.
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Wed, Feb 16, 2022 at 8:57 AM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > >> Hello,
> > >>
> > >> Please add your comments about the KIP.  If there are no
> considerations,
> > >> I'll put it up for vote in the next few days.
> > >>
> > >> -Artem
> > >>
> > >> On Mon, Feb 7, 2022 at 6:01 PM Artem Livshits <alivsh...@confluent.io
> >
> > >> wrote:
> > >>
> > >> > Hello,
> > >> >
> > >> > After trying a few prototypes, I've made some changes to the public
> > >> > interface.  Please see the updated document
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > >> > .
> > >> >
> > >> > -Artem
> > >> >
> > >> > On Thu, Nov 4, 2021 at 10:37 AM Artem Livshits <
> > alivsh...@confluent.io>
> > >> > wrote:
> > >> >
> > >> >> Hello,
> > >> >>
> > >> >> This is the discussion thread for
> > >> >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > >> >> .
> > >> >>
> > >> >> The proposal is a bug fix for
> > >> >> https://issues.apache.org/jira/browse/KAFKA-10888, but it does
> > >> include a
> > >> >> client config change, therefore we have a KIP to discuss.
> > >> >>
> > >> >> -Artem
> > >> >>
> > >> >
> > >>
> > >
> >
>

Reply via email to