Thanks for the KIP. There was some discussion about adding a metric on the thread, but the KIP does not contain anything about it. Did we drop this suggestion or was the KIP not updated accordingly?
Nit: > This would be a global config applicable per processing topology Can we change this to `per Kafka Streams instance.` Atm, a Stream instance executes a single topology, so it does not make any effective difference right now. However, it seems better (more logical) to bind the config to the instance (not the topology the instance executes). -Matthias On 9/2/21 6:08 AM, Sagar wrote: > Thanks Guozhang and Luke. > > I have updated the KIP with all the suggested changes. > > Do you think we could start voting for this? > > Thanks! > Sagar. > > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <show...@gmail.com> wrote: > >> Thanks for the KIP. Overall LGTM. >> >> Just one thought, if we "rename" the config directly as mentioned in the >> KIP, would that break existing applications? >> Should we deprecate the old one first, and make the old/new names co-exist >> for some period of time? >> >> Public Interfaces >> >> - Adding a new config *input.buffer.max.bytes *applicable at a topology >> level. The importance of this config would be *Medium*. >> - Renaming *cache.max.bytes.buffering* to *statestore.cache.max.bytes*. >> >> >> >> Thank you. >> Luke >> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Currently the state store cache size default value is 10MB today, which >>> arguably is rather small. So I'm thinking maybe for this config default >> to >>> 512MB. >>> >>> Other than that, LGTM. >>> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansoc...@gmail.com> >> wrote: >>> >>>> Thanks Guozhang and Sophie. >>>> >>>> Yeah a small default value would lower the throughput. I didn't quite >>>> realise it earlier. It's slightly hard to predict this value so I would >>>> guess around 1/2 GB to 1 GB? WDYT? >>>> >>>> Regarding the renaming of the config and the new metric, sure would >>> include >>>> it in the KIP. >>>> >>>> Lastly, importance would also. be added. I guess Medium should be ok. >>>> >>>> Thanks! >>>> Sagar. >>>> >>>> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman >>>> <sop...@confluent.io.invalid> wrote: >>>> >>>>> 1) I agree that we should just distribute the bytes evenly, at least >>> for >>>>> now. It's simpler to understand and >>>>> we can always change it later, plus it makes sense to keep this >> aligned >>>>> with how the cache works today >>>>> >>>>> 2) +1 to being conservative in the generous sense, it's just not >>>> something >>>>> we can predict with any degree >>>>> of accuracy and even if we could, the appropriate value is going to >>>> differ >>>>> wildly across applications and use >>>>> cases. We might want to just pick some multiple of the default cache >>>> size, >>>>> and maybe do some research on >>>>> other relevant defaults or sizes (default JVM heap, size of available >>>>> memory in common hosts eg EC2 >>>>> instances, etc). We don't need to worry as much about erring on the >>> side >>>> of >>>>> too big, since other configs like >>>>> the max.poll.records will help somewhat to keep it from exploding. >>>>> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config name >> to >>> be >>>>> incredibly confusing. Deprecating this in >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new >>> input >>>>> buffer config sounds good to me to include here. >>>>> >>>>> 5) The KIP should list all relevant public-facing changes, including >>>>> metadata like the config's "Importance". Personally >>>>> I would recommend Medium, or even High if we're really worried about >>> the >>>>> default being wrong for a lot of users >>>>> >>>>> Thanks for the KIP, besides those few things that Guozhang brought up >>> and >>>>> the config importance, everything SGTM >>>>> >>>>> -Sophie >>>>> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>>> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the >>>> configured >>>>>> bytes among threads evenly. >>>>>> >>>>>> 2) I was actually thinking about making the default a large enough >>>> value >>>>> so >>>>>> that we would not introduce performance regression: thinking about >> a >>>> use >>>>>> case with many partitions and each record may be large, then >>>> effectively >>>>> we >>>>>> would only start pausing when the total bytes buffered is pretty >>> large. >>>>> If >>>>>> we set the default value to small, we would be "more aggressive" on >>>>> pausing >>>>>> which may impact throughput. >>>>>> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group" >>> class >>>>>> since that represents the task's all input partitions. >>>>>> >>>>>> 4) This is just a bold thought, I'm interested to see other's >>> thoughts. >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com> >>>> wrote: >>>>>> >>>>>>> Thanks Guozhang. >>>>>>> >>>>>>> 1) Just for my confirmation, when you say we should proceed with >>> the >>>>> even >>>>>>> distribution of bytes, are you referring to the Proposed Solution >>> in >>>>> the >>>>>>> KIP or the option you had considered in the JIRA? >>>>>>> 2) Default value for the config is something that I missed. I >> agree >>>> we >>>>>>> can't have really large values as it might be detrimental to the >>>>>>> performance. Maybe, as a starting point, we assume that only 1 >>> Stream >>>>>> Task >>>>>>> is running so what could be the ideal value in such a scenario? >>>>> Somewhere >>>>>>> around 10MB similar to the caching config? >>>>>>> 3) When you say, *a task level metric indicating the current >>> totally >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task >>> level? >>>>>>> 4) I am ok with the name change, but would like to know others' >>>>> thoughts. >>>>>>> >>>>>>> Thanks! >>>>>>> Sagar. >>>>>>> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang < >> wangg...@gmail.com >>>> >>>>>> wrote: >>>>>>> >>>>>>>> Thanks Sagar for writing this PR. >>>>>>>> >>>>>>>> I think twice about the options that have been proposed in >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel >> that >>>> at >>>>>> the >>>>>>>> moment it's simpler to just do the even distribution of the >>>>> configured >>>>>>>> total bytes. My rationale is that right now we have a static >>> tasks >>>> -> >>>>>>>> threads mapping, and hence each partition would only be fetched >>> by >>>> a >>>>>>> single >>>>>>>> thread / consumer at a given time. If in the future we break >> that >>>>>> static >>>>>>>> mapping into dynamic mapping, then we would not be able to do >>> this >>>>> even >>>>>>>> distribution. Instead we would have other threads polling from >>>>> consumer >>>>>>>> only, and those threads would be responsible for checking the >>>> config >>>>>> and >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But >>>> since >>>>>> at >>>>>>>> that time we would not change the config but just how it would >> be >>>>>>>> implemented behind the scenes we would not need another KIP to >>>> change >>>>>> it. >>>>>>>> >>>>>>>> Some more comments: >>>>>>>> >>>>>>>> 1. We need to discuss a bit about the default value of this new >>>>> config. >>>>>>>> Personally I think we need to be a bit conservative with large >>>> values >>>>>> so >>>>>>>> that it would not have any perf regression compared with old >>>> configs >>>>>>>> especially with large topology and large number of partitions. >>>>>>>> 2. I looked at the existing metrics, and do not have >>> corresponding >>>>>>> sensors. >>>>>>>> How about also adding a task level metric indicating the >> current >>>>>> totally >>>>>>>> aggregated metrics. The reason I do not suggest this metric on >>> the >>>>>>>> per-thread level is that in the future we may break the static >>>>> mapping >>>>>> of >>>>>>>> tasks -> threads. >>>>>>>> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can >>>> rename >>>>>> the >>>>>>>> other "*cache.max.bytes.buffering*" as >>> "statestore.cache.max.bytes" >>>>>> (via >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to >>>> hear >>>>>>>> others' thoughts. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar < >> sagarmeansoc...@gmail.com >>>> >>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I would like to start a discussion on the following KIP: >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> Sagar. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>> >>> >>> >>> -- >>> -- Guozhang >>> >> >