Hi, Artem, Thanks for the reply. A couple of more comments.
32. We could defer the metrics until we know what to add. 33. Since we are deprecating DefaultPartitioner and UniformStickyPartitioner, should we depreciate Partitioner.onNewBatch() too given its unexpected side effect? Thanks, Jun On Thu, Mar 10, 2022 at 5:20 PM Artem Livshits <alivsh...@confluent.io.invalid> wrote: > Hi Jun, > > 32. Good point. Do you think it's ok to defer the metrics until we run > some benchmarks so that we get a better idea of what metrics we need? > > -Artem > > On Thu, Mar 10, 2022 at 3:12 PM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Artem. > > > > Thanks for the reply. One more comment. > > > > 32. Do we need to add any new metric on the producer? For example, if > > partitioner.availability.timeout.ms is > 0, it might be useful to know > the > > number of unavailable partitions. > > > > Thanks, > > > > Jun > > > > On Thu, Mar 10, 2022 at 12:46 PM Artem Livshits > > <alivsh...@confluent.io.invalid> wrote: > > > > > Hi Jun, > > > > > > 30. Clarified. > > > > > > 31. I plan to do some benchmarking once implementation is finished, > I'll > > > update the KIP with the results once I have them. The reason to make > it > > > default is that it won't be used otherwise and we won't know if it's > good > > > or not in practical workloads. > > > > > > -Artem > > > > > > On Thu, Mar 10, 2022 at 11:42 AM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > Hi, Artem, > > > > > > > > Thanks for the updated KIP. A couple of more comments. > > > > > > > > 30. For the 3 new configs, it would be useful to make it clear that > > they > > > > are only relevant when the partitioner class is null. > > > > > > > > 31. partitioner.adaptive.partitioning.enable : I am wondering whether > > it > > > > should default to true. This is a more complex behavior than "uniform > > > > sticky" and may take some time to get right. If we do want to enable > it > > > by > > > > default, it would be useful to validate it with some test results. > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Wed, Mar 9, 2022 at 10:05 PM Artem Livshits > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > Thank you for feedback, I've discussed this offline with some of > the > > > > folks > > > > > and updated the KIP. The main change is that now instead of using > > > > > DefaultPartitioner and UniformStickyPartitioners as flags, in the > new > > > > > proposal the default partitioner is null, so if no custom > partitioner > > > is > > > > > specified then the partitioning logic is implemented in > > KafkaProducer. > > > > > Compatibility section is updated as well. Also the configuration > > > options > > > > > are renamed to be more consistent. > > > > > > > > > > -Artem > > > > > > > > > > On Fri, Mar 4, 2022 at 10:38 PM Luke Chen <show...@gmail.com> > wrote: > > > > > > > > > > > Hi Artem, > > > > > > > > > > > > Thanks for your explanation and update to the KIP. > > > > > > Some comments: > > > > > > > > > > > > 5. In the description for `enable.adaptive.partitioning`, the > > `false` > > > > > case, > > > > > > you said: > > > > > > > the producer will try to distribute messages uniformly. > > > > > > I think we should describe the possible skewing distribution. > > > > Otherwise, > > > > > > user might be confused about why adaptive partitioning is > > important. > > > > > > > > > > > > 6. In the description for `partition.availability.timeout.ms`, I > > > think > > > > > we > > > > > > should mention in the last sentence about if > > > > > `enable.adaptive.partitioning` > > > > > > is disabled this logic is also disabled. > > > > > > > > > > > > 7. Similar thoughts as Ismael, I think we should have a POC and > > test > > > to > > > > > > prove that this adaptive partitioning algorithm can have better > > > uniform > > > > > > partitioning, compared with original sticky one. > > > > > > > > > > > > Thank you. > > > > > > Luke > > > > > > > > > > > > On Fri, Mar 4, 2022 at 9:22 PM Ismael Juma <ism...@juma.me.uk> > > > wrote: > > > > > > > > > > > > > Regarding `3`, we should only deprecate it if we're sure the > new > > > > > approach > > > > > > > handles all cases better. Are we confident about that for both > of > > > the > > > > > > > previous partitioners? > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > On Fri, Mar 4, 2022 at 1:37 AM David Jacot > > > > <dja...@confluent.io.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Artem, > > > > > > > > > > > > > > > > Thanks for the KIP! I have a few comments: > > > > > > > > > > > > > > > > 1. In the preamble of the proposed change section, there is > > > still a > > > > > > > > mention of the > > > > > > > > -1 approach. My understanding is that we have moved away from > > it > > > > now. > > > > > > > > > > > > > > > > 2. I am a bit concerned by the trick suggested about the > > > > > > > > DefaultPartitioner and > > > > > > > > the UniformStickyPartitioner. I do agree that implementing > the > > > > logic > > > > > in > > > > > > > the > > > > > > > > producer itself is a good thing. However, it is weird from a > > user > > > > > > > > perspective > > > > > > > > that he can set a class as partitioner that is not used in > the > > > > end. I > > > > > > > > think that > > > > > > > > this will be confusing for our users. Have we considered > > changing > > > > the > > > > > > > > default > > > > > > > > value of partitioner.class to null to indicate that the new > > > > built-in > > > > > > > > partitioner > > > > > > > > must be used? By default, the built-in partitioner would be > > used > > > > > unless > > > > > > > the > > > > > > > > user explicitly specify one. The downside is that the new > > default > > > > > > > behavior > > > > > > > > would not work if the user explicitly specify the partitioner > > but > > > > we > > > > > > > could > > > > > > > > mitigate this with my next point. > > > > > > > > > > > > > > > > 3. Related to the previous point, I think that we could > > deprecate > > > > > both > > > > > > > the > > > > > > > > DefaultPartitioner and the UniformStickyPartitioner. I would > > also > > > > > add a > > > > > > > > warning if one of them is explicitly provided by the user to > > > inform > > > > > > them > > > > > > > > that they should switch to the new built-in one. I am pretty > > sure > > > > > that > > > > > > > most > > > > > > > > of the folks use the default configuration anyway. > > > > > > > > > > > > > > > > 4. It would be great if we could explain why the -1 way was > > > > rejected. > > > > > > At > > > > > > > > the moment, the rejected alternative only explain the idea > but > > > does > > > > > not > > > > > > > > say why we rejected it. > > > > > > > > > > > > > > > > Best, > > > > > > > > David > > > > > > > > > > > > > > > > On Fri, Mar 4, 2022 at 6:03 AM Artem Livshits > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > 2. Removed the option from the KIP. Now the sticky > > > partitioning > > > > > > > > threshold > > > > > > > > > is hardcoded to batch.size. > > > > > > > > > > > > > > > > > > 20. Added the corresponding wording to the KIP. > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > On Thu, Mar 3, 2022 at 10:52 AM Jun Rao > > > <j...@confluent.io.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > > > > > 1. Sounds good. > > > > > > > > > > > > > > > > > > > > 2. If we don't expect users to change it, we probably > could > > > > just > > > > > > > leave > > > > > > > > out > > > > > > > > > > the new config. In general, it's easy to add a new > config, > > > but > > > > > hard > > > > > > > to > > > > > > > > > > remove an existing config. > > > > > > > > > > > > > > > > > > > > 20. The two new configs enable.adaptive.partitioning and > > > > > > > > > > partition.availability.timeout.ms only apply to the two > > > > built-in > > > > > > > > > > partitioners DefaultPartitioner and > > UniformStickyPartitioner, > > > > > > right? > > > > > > > It > > > > > > > > > > would be useful to document that in the KIP. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Thu, Mar 3, 2022 at 9:47 AM Artem Livshits > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > Thank you for the suggestions. > > > > > > > > > > > > > > > > > > > > > > 1. As we discussed offline, we can hardcode the logic > for > > > > > > > > > > > DefaultPartitioner and UniformStickyPartitioner in the > > > > > > > KafkaProducer > > > > > > > > > > (i.e. > > > > > > > > > > > the DefaultPartitioner.partition won't get called, > > instead > > > > > > > > KafkaProducer > > > > > > > > > > > would check if the partitioner is an instance of > > > > > > DefaultPartitioner > > > > > > > > and > > > > > > > > > > > then run the actual partitioning logic itself). Then > the > > > > > change > > > > > > to > > > > > > > > the > > > > > > > > > > > Partitioner wouldn't be required. I'll update the KIP > to > > > > > reflect > > > > > > > > that. > > > > > > > > > > > > > > > > > > > > > > 2. I don't expect users to change this too often, as > > > changing > > > > > it > > > > > > > > would > > > > > > > > > > > require a bit of studying of the production patterns. > > As a > > > > > > general > > > > > > > > > > > principle, if I can think of a model that requires a > > > > deviation > > > > > > from > > > > > > > > > > > default, I tend to add a configuration option. It > could > > be > > > > > that > > > > > > > > it'll > > > > > > > > > > > never get used in practice, but I cannot prove that. > I'm > > > ok > > > > > with > > > > > > > > > > removing > > > > > > > > > > > the option, let me know what you think. > > > > > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 28, 2022 at 2:06 PM Jun Rao > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the reply. A few more comments. > > > > > > > > > > > > > > > > > > > > > > > > 1. Since we control the implementation and the usage > of > > > > > > > > > > > DefaultPartitioner, > > > > > > > > > > > > another way is to instantiate the DefaultPartitioner > > > with a > > > > > > > special > > > > > > > > > > > > constructor, which allows it to have more access to > > > > internal > > > > > > > > > > information. > > > > > > > > > > > > Then we could just change the behavior of > > > > DefaultPartitioner > > > > > > > such > > > > > > > > that > > > > > > > > > > > it > > > > > > > > > > > > can use the internal infoamtion when choosing the > > > > partition. > > > > > > This > > > > > > > > seems > > > > > > > > > > > > more intuitive than having DefaultPartitioner return > -1 > > > > > > > partition. > > > > > > > > > > > > > > > > > > > > > > > > 2. I guess partitioner.sticky.batch.size is > introduced > > > > > because > > > > > > > the > > > > > > > > > > > > effective batch size could be less than batch.size > and > > we > > > > > want > > > > > > to > > > > > > > > align > > > > > > > > > > > > partition switching with the effective batch size. > How > > > > would > > > > > a > > > > > > > user > > > > > > > > > > know > > > > > > > > > > > > the effective batch size to set > > > > partitioner.sticky.batch.size > > > > > > > > properly? > > > > > > > > > > > If > > > > > > > > > > > > the user somehow knows the effective batch size, does > > > > setting > > > > > > > > > > batch.size > > > > > > > > > > > to > > > > > > > > > > > > the effective batch size achieve the same result? > > > > > > > > > > > > > > > > > > > > > > > > 4. Thanks for the explanation. Makes sense to me. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 25, 2022 at 8:26 PM Artem Livshits > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Updated the KIP to add a couple paragraphs about > > > > > > > > implementation > > > > > > > > > > > > > necessities in the Proposed Changes section. > > > > > > > > > > > > > > > > > > > > > > > > > > 2. Sorry if my reply was confusing, what I meant to > > say > > > > > (and > > > > > > I > > > > > > > > > > > elaborated > > > > > > > > > > > > > on that in point #3) is that there could be > patterns > > > for > > > > > > which > > > > > > > > 16KB > > > > > > > > > > > > > wouldn't be the most effective setting, thus it > would > > > be > > > > > good > > > > > > > to > > > > > > > > make > > > > > > > > > > > it > > > > > > > > > > > > > configurable. > > > > > > > > > > > > > > > > > > > > > > > > > > 4. We could use broker readiness timeout. But I'm > > not > > > > sure > > > > > > it > > > > > > > > would > > > > > > > > > > > > > correctly model the broker load. The problem is > that > > > > > latency > > > > > > > is > > > > > > > > not > > > > > > > > > > an > > > > > > > > > > > > > accurate measure of throughput, we could have 2 > > brokers > > > > > that > > > > > > > have > > > > > > > > > > equal > > > > > > > > > > > > > throughput but one has higher latency (so it takes > > > larger > > > > > > > batches > > > > > > > > > > less > > > > > > > > > > > > > frequently, but still takes the same load). > > > > Latency-based > > > > > > > logic > > > > > > > > is > > > > > > > > > > > > likely > > > > > > > > > > > > > to send less data to the broker with higher > latency. > > > > Using > > > > > > the > > > > > > > > queue > > > > > > > > > > > > size > > > > > > > > > > > > > would adapt to throughput, regardless of latency > > (which > > > > > could > > > > > > > be > > > > > > > > > > just a > > > > > > > > > > > > > result of deployment topology), so that's the model > > > > chosen > > > > > in > > > > > > > the > > > > > > > > > > > > > proposal. The partition.availability.timeout.ms > > logic > > > > > > > > approaches > > > > > > > > > > the > > > > > > > > > > > > > model > > > > > > > > > > > > > from a slightly different angle, say we have a > > > > requirement > > > > > to > > > > > > > > deliver > > > > > > > > > > > > > messages via brokers that have a certain latency, > > then > > > > > > > > > > > > > partition.availability.timeout.ms could be used to > > > tune > > > > > > that. > > > > > > > > > > Latency > > > > > > > > > > > > is > > > > > > > > > > > > > a > > > > > > > > > > > > > much more volatile metric than throughput (latency > > > > depends > > > > > on > > > > > > > > > > external > > > > > > > > > > > > > load, on capacity, on deployment topology, on > jitter > > in > > > > > > > network, > > > > > > > > on > > > > > > > > > > > > jitter > > > > > > > > > > > > > in disk, etc.) and I think it would be best to > leave > > > > > > > > latency-based > > > > > > > > > > > > > thresholds configurable to tune for the > environment. > > > > > > > > > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 23, 2022 at 11:14 AM Jun Rao > > > > > > > > <j...@confluent.io.invalid> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the reply. A few more comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Perhaps you could elaborate a bit more on how > > the > > > > > > producer > > > > > > > > > > > > determines > > > > > > > > > > > > > > the partition if the partitioner returns -1. This > > > will > > > > > help > > > > > > > > > > > understand > > > > > > > > > > > > > why > > > > > > > > > > > > > > encapsulating that logic as a partitioner is not > > > clean. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. I am not sure that I understand this part. If > > > 15.5KB > > > > > is > > > > > > > more > > > > > > > > > > > > > efficient, > > > > > > > > > > > > > > could we just set batch.size to 15.5KB? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Yes, we could add a switch (or a variant of > the > > > > > > > > partitioner) for > > > > > > > > > > > > > > enabling this behavior. Also, choosing partitions > > > based > > > > > on > > > > > > > > broker > > > > > > > > > > > > > readiness > > > > > > > > > > > > > > can be made in a smoother way. For example, we > > could > > > > > track > > > > > > > the > > > > > > > > last > > > > > > > > > > > > time > > > > > > > > > > > > > a > > > > > > > > > > > > > > broker has drained any batches from the > > accumulator. > > > We > > > > > can > > > > > > > > then > > > > > > > > > > > select > > > > > > > > > > > > > > partitions from brokers proportionally to the > > inverse > > > > of > > > > > > that > > > > > > > > time. > > > > > > > > > > > > This > > > > > > > > > > > > > > seems smoother than a cutoff based on a > > > > > > > > > > > > > partition.availability.timeout.ms > > > > > > > > > > > > > > threshold. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Luke, Jun, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for your feedback. I've added the > > > Rejected > > > > > > > > Alternative > > > > > > > > > > > > > section > > > > > > > > > > > > > > > that may clarify some of the questions w.r.t. > > > > returning > > > > > > -1. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I've elaborated on the -1 in the KIP. The > > > problem > > > > > is > > > > > > > > that a > > > > > > > > > > > > > > significant > > > > > > > > > > > > > > > part of the logic needs to be in the producer > > > > (because > > > > > it > > > > > > > now > > > > > > > > > > uses > > > > > > > > > > > > > > > information about brokers that only the > producer > > > > > knows), > > > > > > so > > > > > > > > > > > > > encapsulation > > > > > > > > > > > > > > > of the logic within the default partitioner > isn't > > > as > > > > > > clean. > > > > > > > > > > I've > > > > > > > > > > > > > added > > > > > > > > > > > > > > > the Rejected Alternative section that documents > > an > > > > > > attempt > > > > > > > to > > > > > > > > > > keep > > > > > > > > > > > > the > > > > > > > > > > > > > > > encapsulation by providing new callbacks to the > > > > > > > partitioner. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. The meaning of the > > partitioner.sticky.batch.size > > > > is > > > > > > > > explained > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > > > Uniform Sticky Batch Size section. Basically, > we > > > > track > > > > > > the > > > > > > > > > > amount > > > > > > > > > > > of > > > > > > > > > > > > > > bytes > > > > > > > > > > > > > > > produced to the partition and if it exceeds > > > > > > > > > > > > > partitioner.sticky.batch.size > > > > > > > > > > > > > > > then we switch to the next partition. As far > as > > > the > > > > > > reason > > > > > > > > to > > > > > > > > > > make > > > > > > > > > > > > it > > > > > > > > > > > > > > > different from batch.size, I think Luke > answered > > > this > > > > > > with > > > > > > > > the > > > > > > > > > > > > question > > > > > > > > > > > > > > #3 > > > > > > > > > > > > > > > -- what if the load pattern is such that 15.5KB > > > would > > > > > be > > > > > > > more > > > > > > > > > > > > efficient > > > > > > > > > > > > > > > than 16KB? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. I think it's hard to have one size that > would > > > fit > > > > > all > > > > > > > > > > patterns. > > > > > > > > > > > > > E.g. > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > the load pattern is such that there is linger > and > > > the > > > > > app > > > > > > > > fills > > > > > > > > > > the > > > > > > > > > > > > > batch > > > > > > > > > > > > > > > before linger expires, then having 16KB would > > most > > > > > likely > > > > > > > > > > > synchronize > > > > > > > > > > > > > > > batching and partition switching, so each > > partition > > > > > would > > > > > > > > get a > > > > > > > > > > > full > > > > > > > > > > > > > > > batch. If load pattern is such that there are > a > > > few > > > > > > > > non-complete > > > > > > > > > > > > > batches > > > > > > > > > > > > > > > go out before a larger batch starts to fill, > then > > > it > > > > > may > > > > > > > > actually > > > > > > > > > > > be > > > > > > > > > > > > > > > beneficial to make slightly larger (e.g. > > linger=0, > > > > > first > > > > > > > few > > > > > > > > > > > records > > > > > > > > > > > > go > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > the first batch, then next few records go to > > second > > > > > > batch, > > > > > > > > and so > > > > > > > > > > > on, > > > > > > > > > > > > > > until > > > > > > > > > > > > > > > 5 in-flight, then larger batch would form while > > > > waiting > > > > > > for > > > > > > > > > > broker > > > > > > > > > > > to > > > > > > > > > > > > > > > respond, but the partition switch would happen > > > before > > > > > the > > > > > > > > larger > > > > > > > > > > > > batch > > > > > > > > > > > > > is > > > > > > > > > > > > > > > full). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. There are a couple of reasons for > introducing > > > > > > > > > > > > > > > partition.availability.timeout.ms. Luke's an > > > Jun's > > > > > > > > questions > > > > > > > > > > are > > > > > > > > > > > > > > slightly > > > > > > > > > > > > > > > different, so I'm going to separate replies. > > > > > > > > > > > > > > > (Luke) Is the queue size a good enough > signal? I > > > > think > > > > > > > it's > > > > > > > > a > > > > > > > > > > good > > > > > > > > > > > > > > default > > > > > > > > > > > > > > > signal as it tries to preserve general fairness > > and > > > > not > > > > > > > > overreact > > > > > > > > > > > on > > > > > > > > > > > > > the > > > > > > > > > > > > > > > broker's state at each moment in time. But > > because > > > > > it's > > > > > > > > smooth, > > > > > > > > > > it > > > > > > > > > > > > may > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > be reactive enough to instantaneous latency > > jumps. > > > > For > > > > > > > > > > > > > latency-sensitive > > > > > > > > > > > > > > > workloads, it may be desirable to react faster > > > when a > > > > > > > broker > > > > > > > > > > > becomes > > > > > > > > > > > > > > > unresponsive (but that may make the > distribution > > > > really > > > > > > > > choppy), > > > > > > > > > > so > > > > > > > > > > > > > > > partition.availability.timeout.ms provides an > > > > > > opportunity > > > > > > > to > > > > > > > > > > tune > > > > > > > > > > > > > > > adaptiveness. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (Jun) Can we just not assign partitions to > > brokers > > > > that > > > > > > are > > > > > > > > not > > > > > > > > > > > > ready? > > > > > > > > > > > > > > > Switching partitions purely based on current > > broker > > > > > > > readiness > > > > > > > > > > > > > information > > > > > > > > > > > > > > > can really skew workload I think (or at least I > > > > > couldn't > > > > > > > > build a > > > > > > > > > > > > model > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > proves that over time it's going to be > generally > > > > > fair), I > > > > > > > > feel > > > > > > > > > > that > > > > > > > > > > > > the > > > > > > > > > > > > > > > algorithm should try to be fair in general and > > use > > > > > > smoother > > > > > > > > > > signals > > > > > > > > > > > > by > > > > > > > > > > > > > > > default (e.g. a broker with choppier latency > may > > > get > > > > > much > > > > > > > > less > > > > > > > > > > load > > > > > > > > > > > > > even > > > > > > > > > > > > > > > though it can handle throughput, it then may > > > > > potentially > > > > > > > skew > > > > > > > > > > > > > > consumption), > > > > > > > > > > > > > > > note that the queue-size-based logic uses > > > > probabilities > > > > > > (so > > > > > > > > we > > > > > > > > > > > don't > > > > > > > > > > > > > > fully > > > > > > > > > > > > > > > remove brokers, just make it less likely) and > > > > relative > > > > > > info > > > > > > > > > > rather > > > > > > > > > > > > > than a > > > > > > > > > > > > > > > threshold (so if all brokers are heavily, but > > > equally > > > > > > > loaded, > > > > > > > > > > they > > > > > > > > > > > > will > > > > > > > > > > > > > > get > > > > > > > > > > > > > > > equal distribution, rather than get removed > > because > > > > > they > > > > > > > > exceed > > > > > > > > > > > some > > > > > > > > > > > > > > > threshold). So at the very least, I would like > > > this > > > > > > logic > > > > > > > > to be > > > > > > > > > > > > turned > > > > > > > > > > > > > > off > > > > > > > > > > > > > > > by default as it's hard to predict what it > could > > do > > > > > with > > > > > > > > > > different > > > > > > > > > > > > > > patterns > > > > > > > > > > > > > > > (which means that there would need to be some > > > > > > > > configuration). We > > > > > > > > > > > > could > > > > > > > > > > > > > > > just not use brokers that are not ready, but > > > again, I > > > > > > think > > > > > > > > that > > > > > > > > > > > it's > > > > > > > > > > > > > > good > > > > > > > > > > > > > > > to try to be fair under normal circumstances, > so > > if > > > > > > > normally > > > > > > > > > > > brokers > > > > > > > > > > > > > can > > > > > > > > > > > > > > > respond under some > > > partition.availability.timeout.ms > > > > > > > > threshold > > > > > > > > > > and > > > > > > > > > > > > the > > > > > > > > > > > > > > > application works well with those latencies, > then > > > we > > > > > > could > > > > > > > > > > > distribute > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > equally between brokers that don't exceed the > > > > > latencies. > > > > > > > The > > > > > > > > > > > value, > > > > > > > > > > > > of > > > > > > > > > > > > > > > course, would depend on the environment and app > > > > > > > requirements, > > > > > > > > > > hence > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > configurable. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 10. Added a statement at the beginning of the > > > > proposed > > > > > > > > changes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 17, 2022 at 3:46 PM Jun Rao > > > > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. A few comments below. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I agree with Luke that having the > > partitioner > > > > > > > returning > > > > > > > > -1 > > > > > > > > > > is > > > > > > > > > > > > kind > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > weird. Could we just change the > implementation > > of > > > > > > > > > > > > DefaultPartitioner > > > > > > > > > > > > > to > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > new behavior? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. partitioner.sticky.batch.size: Similar > > > question > > > > to > > > > > > > > Luke. I > > > > > > > > > > am > > > > > > > > > > > > not > > > > > > > > > > > > > > sure > > > > > > > > > > > > > > > > why we want to introduce this new > > configuration. > > > > > Could > > > > > > we > > > > > > > > just > > > > > > > > > > > use > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > existing batch.size? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. I also agree with Luke that it's not clear > > why > > > > we > > > > > > need > > > > > > > > > > > > > > > > partition.availability.timeout.ms. The KIP > > says > > > > the > > > > > > > broker > > > > > > > > > > > "would > > > > > > > > > > > > > not > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > chosen until the broker is able to accept the > > > next > > > > > > ready > > > > > > > > batch > > > > > > > > > > > from > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > partition". If we are keeping track of this, > > > could > > > > we > > > > > > > just > > > > > > > > > > avoid > > > > > > > > > > > > > > > assigning > > > > > > > > > > > > > > > > records to partitions whose leader is not > able > > to > > > > > > accept > > > > > > > > the > > > > > > > > > > next > > > > > > > > > > > > > > batch? > > > > > > > > > > > > > > > If > > > > > > > > > > > > > > > > we do that, perhaps we don't need > > > > > > > > > > > > partition.availability.timeout.ms. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 10. Currently, partitioner.class defaults to > > > > > > > > > > DefaultPartitioner, > > > > > > > > > > > > > which > > > > > > > > > > > > > > > uses > > > > > > > > > > > > > > > > StickyPartitioner when the key is specified. > > > Since > > > > > this > > > > > > > KIP > > > > > > > > > > > > improves > > > > > > > > > > > > > > upon > > > > > > > > > > > > > > > > StickyPartitioner, it would be useful to make > > the > > > > new > > > > > > > > behavior > > > > > > > > > > > the > > > > > > > > > > > > > > > default > > > > > > > > > > > > > > > > and document that in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 16, 2022 at 7:30 PM Luke Chen < > > > > > > > > show...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Artem, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Also, one more thing I think you need to > > know. > > > > > > > > > > > > > > > > > As this bug KAFKA-7572 < > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7572 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > mentioned, sometimes the custom partitioner > > > would > > > > > > > return > > > > > > > > > > > negative > > > > > > > > > > > > > > > > partition > > > > > > > > > > > > > > > > > id accidentally. > > > > > > > > > > > > > > > > > If it returned -1, how could you know if it > > is > > > > > > expected > > > > > > > > or > > > > > > > > > > not > > > > > > > > > > > > > > > expected? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 16, 2022 at 3:28 PM Luke Chen < > > > > > > > > show...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Artem, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the update. I have some > > questions > > > > > about > > > > > > > it: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Could you explain why you need the > > > > > `partitioner` > > > > > > > > return > > > > > > > > > > > -1? > > > > > > > > > > > > In > > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > > > case we need it? And how it is used in > your > > > > KIP? > > > > > > > > > > > > > > > > > > 2. What does the > > > > "partitioner.sticky.batch.size" > > > > > > > mean? > > > > > > > > In > > > > > > > > > > the > > > > > > > > > > > > > > > > > > "Configuration" part, you didn't explain > > it. > > > > And > > > > > > > > default to > > > > > > > > > > > 0, > > > > > > > > > > > > I > > > > > > > > > > > > > > > guess > > > > > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > > > > the same as current behavior for backward > > > > > > > > compatibility, > > > > > > > > > > > right? > > > > > > > > > > > > > You > > > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > > mention it. > > > > > > > > > > > > > > > > > > 3. I'm thinking we can have a threshold > to > > > the > > > > > > > > > > > > > > > > > > "partitioner.sticky.batch.size". Let's > say, > > > we > > > > > > > already > > > > > > > > > > > > accumulate > > > > > > > > > > > > > > > > 15.5KB > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > partition1, and sent. So when next batch > > > > created, > > > > > > in > > > > > > > > your > > > > > > > > > > > > current > > > > > > > > > > > > > > > > design, > > > > > > > > > > > > > > > > > > we still stick to partition1, until 16KB > > > > reached, > > > > > > and > > > > > > > > then > > > > > > > > > > we > > > > > > > > > > > > > > create > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > > batch to change to next partition, ex: > > > > > partition2. > > > > > > > But > > > > > > > > I > > > > > > > > > > > think > > > > > > > > > > > > if > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > set > > > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > threshold to 95% (for example), we can > know > > > > > > previous > > > > > > > > 15.5KB > > > > > > > > > > > > > already > > > > > > > > > > > > > > > > > exceeds > > > > > > > > > > > > > > > > > > the threshold so that we can directly > > create > > > > new > > > > > > > batch > > > > > > > > for > > > > > > > > > > > next > > > > > > > > > > > > > > > > records. > > > > > > > > > > > > > > > > > > This way should be able to make it more > > > > > efficient. > > > > > > > > WDYT? > > > > > > > > > > > > > > > > > > 4. I think the improved queuing logic > > should > > > be > > > > > > good > > > > > > > > > > enough. > > > > > > > > > > > I > > > > > > > > > > > > > > can't > > > > > > > > > > > > > > > > get > > > > > > > > > > > > > > > > > > the benefit of having ` > > > > > > > > partition.availability.timeout.ms` > > > > > > > > > > > > > config. > > > > > > > > > > > > > > In > > > > > > > > > > > > > > > > > > short, you want to make the partitioner > > take > > > > the > > > > > > > broker > > > > > > > > > > load > > > > > > > > > > > > into > > > > > > > > > > > > > > > > > > consideration. We can just improve that > in > > > the > > > > > > > queuing > > > > > > > > > > logic > > > > > > > > > > > > (and > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > > already did it). Why should we add the > > > config? > > > > > > Could > > > > > > > > you > > > > > > > > > > use > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > examples > > > > > > > > > > > > > > > > > > to explain why we need it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 16, 2022 at 8:57 AM Artem > > > Livshits > > > > > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hello, > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> Please add your comments about the KIP. > > If > > > > > there > > > > > > > are > > > > > > > > no > > > > > > > > > > > > > > > > considerations, > > > > > > > > > > > > > > > > > >> I'll put it up for vote in the next few > > > days. > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> -Artem > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> On Mon, Feb 7, 2022 at 6:01 PM Artem > > > Livshits > > > > < > > > > > > > > > > > > > > > alivsh...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Hello, > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > After trying a few prototypes, I've > made > > > > some > > > > > > > > changes to > > > > > > > > > > > the > > > > > > > > > > > > > > > public > > > > > > > > > > > > > > > > > >> > interface. Please see the updated > > > document > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > > > > > > > > > > > > > > > > >> > . > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > -Artem > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > On Thu, Nov 4, 2021 at 10:37 AM Artem > > > > > Livshits < > > > > > > > > > > > > > > > > > alivsh...@confluent.io> > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> >> Hello, > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> This is the discussion thread for > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > > > > > > > > > > > > > > > > >> >> . > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> The proposal is a bug fix for > > > > > > > > > > > > > > > > > >> >> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10888, > > > > > > > > but > > > > > > > > > > > it > > > > > > > > > > > > > does > > > > > > > > > > > > > > > > > >> include a > > > > > > > > > > > > > > > > > >> >> client config change, therefore we > > have a > > > > KIP > > > > > > to > > > > > > > > > > discuss. > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> -Artem > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >