Hi Mathias, I missed out on the metrics part.
I have added the new metric in the proposed changes section along with the small re-wording that you talked about. Let me know if that makes sense. Thanks! Sagar. On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org> wrote: > Thanks for the KIP. > > There was some discussion about adding a metric on the thread, but the > KIP does not contain anything about it. Did we drop this suggestion or > was the KIP not updated accordingly? > > > Nit: > > > This would be a global config applicable per processing topology > > Can we change this to `per Kafka Streams instance.` > > Atm, a Stream instance executes a single topology, so it does not make > any effective difference right now. However, it seems better (more > logical) to bind the config to the instance (not the topology the > instance executes). > > > -Matthias > > On 9/2/21 6:08 AM, Sagar wrote: > > Thanks Guozhang and Luke. > > > > I have updated the KIP with all the suggested changes. > > > > Do you think we could start voting for this? > > > > Thanks! > > Sagar. > > > > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <show...@gmail.com> wrote: > > > >> Thanks for the KIP. Overall LGTM. > >> > >> Just one thought, if we "rename" the config directly as mentioned in the > >> KIP, would that break existing applications? > >> Should we deprecate the old one first, and make the old/new names > co-exist > >> for some period of time? > >> > >> Public Interfaces > >> > >> - Adding a new config *input.buffer.max.bytes *applicable at a > topology > >> level. The importance of this config would be *Medium*. > >> - Renaming *cache.max.bytes.buffering* to > *statestore.cache.max.bytes*. > >> > >> > >> > >> Thank you. > >> Luke > >> > >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >>> Currently the state store cache size default value is 10MB today, which > >>> arguably is rather small. So I'm thinking maybe for this config default > >> to > >>> 512MB. > >>> > >>> Other than that, LGTM. > >>> > >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansoc...@gmail.com> > >> wrote: > >>> > >>>> Thanks Guozhang and Sophie. > >>>> > >>>> Yeah a small default value would lower the throughput. I didn't quite > >>>> realise it earlier. It's slightly hard to predict this value so I > would > >>>> guess around 1/2 GB to 1 GB? WDYT? > >>>> > >>>> Regarding the renaming of the config and the new metric, sure would > >>> include > >>>> it in the KIP. > >>>> > >>>> Lastly, importance would also. be added. I guess Medium should be ok. > >>>> > >>>> Thanks! > >>>> Sagar. > >>>> > >>>> > >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman > >>>> <sop...@confluent.io.invalid> wrote: > >>>> > >>>>> 1) I agree that we should just distribute the bytes evenly, at least > >>> for > >>>>> now. It's simpler to understand and > >>>>> we can always change it later, plus it makes sense to keep this > >> aligned > >>>>> with how the cache works today > >>>>> > >>>>> 2) +1 to being conservative in the generous sense, it's just not > >>>> something > >>>>> we can predict with any degree > >>>>> of accuracy and even if we could, the appropriate value is going to > >>>> differ > >>>>> wildly across applications and use > >>>>> cases. We might want to just pick some multiple of the default cache > >>>> size, > >>>>> and maybe do some research on > >>>>> other relevant defaults or sizes (default JVM heap, size of available > >>>>> memory in common hosts eg EC2 > >>>>> instances, etc). We don't need to worry as much about erring on the > >>> side > >>>> of > >>>>> too big, since other configs like > >>>>> the max.poll.records will help somewhat to keep it from exploding. > >>>>> > >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config name > >> to > >>> be > >>>>> incredibly confusing. Deprecating this in > >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new > >>> input > >>>>> buffer config sounds good to me to include here. > >>>>> > >>>>> 5) The KIP should list all relevant public-facing changes, including > >>>>> metadata like the config's "Importance". Personally > >>>>> I would recommend Medium, or even High if we're really worried about > >>> the > >>>>> default being wrong for a lot of users > >>>>> > >>>>> Thanks for the KIP, besides those few things that Guozhang brought up > >>> and > >>>>> the config importance, everything SGTM > >>>>> > >>>>> -Sophie > >>>>> > >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> 1) I meant for your proposed solution. I.e. to distribute the > >>>> configured > >>>>>> bytes among threads evenly. > >>>>>> > >>>>>> 2) I was actually thinking about making the default a large enough > >>>> value > >>>>> so > >>>>>> that we would not introduce performance regression: thinking about > >> a > >>>> use > >>>>>> case with many partitions and each record may be large, then > >>>> effectively > >>>>> we > >>>>>> would only start pausing when the total bytes buffered is pretty > >>> large. > >>>>> If > >>>>>> we set the default value to small, we would be "more aggressive" on > >>>>> pausing > >>>>>> which may impact throughput. > >>>>>> > >>>>>> 3) Yes exactly, this would naturally be at the "partition-group" > >>> class > >>>>>> since that represents the task's all input partitions. > >>>>>> > >>>>>> 4) This is just a bold thought, I'm interested to see other's > >>> thoughts. > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com> > >>>> wrote: > >>>>>> > >>>>>>> Thanks Guozhang. > >>>>>>> > >>>>>>> 1) Just for my confirmation, when you say we should proceed with > >>> the > >>>>> even > >>>>>>> distribution of bytes, are you referring to the Proposed Solution > >>> in > >>>>> the > >>>>>>> KIP or the option you had considered in the JIRA? > >>>>>>> 2) Default value for the config is something that I missed. I > >> agree > >>>> we > >>>>>>> can't have really large values as it might be detrimental to the > >>>>>>> performance. Maybe, as a starting point, we assume that only 1 > >>> Stream > >>>>>> Task > >>>>>>> is running so what could be the ideal value in such a scenario? > >>>>> Somewhere > >>>>>>> around 10MB similar to the caching config? > >>>>>>> 3) When you say, *a task level metric indicating the current > >>> totally > >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task > >>> level? > >>>>>>> 4) I am ok with the name change, but would like to know others' > >>>>> thoughts. > >>>>>>> > >>>>>>> Thanks! > >>>>>>> Sagar. > >>>>>>> > >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang < > >> wangg...@gmail.com > >>>> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Thanks Sagar for writing this PR. > >>>>>>>> > >>>>>>>> I think twice about the options that have been proposed in > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel > >> that > >>>> at > >>>>>> the > >>>>>>>> moment it's simpler to just do the even distribution of the > >>>>> configured > >>>>>>>> total bytes. My rationale is that right now we have a static > >>> tasks > >>>> -> > >>>>>>>> threads mapping, and hence each partition would only be fetched > >>> by > >>>> a > >>>>>>> single > >>>>>>>> thread / consumer at a given time. If in the future we break > >> that > >>>>>> static > >>>>>>>> mapping into dynamic mapping, then we would not be able to do > >>> this > >>>>> even > >>>>>>>> distribution. Instead we would have other threads polling from > >>>>> consumer > >>>>>>>> only, and those threads would be responsible for checking the > >>>> config > >>>>>> and > >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But > >>>> since > >>>>>> at > >>>>>>>> that time we would not change the config but just how it would > >> be > >>>>>>>> implemented behind the scenes we would not need another KIP to > >>>> change > >>>>>> it. > >>>>>>>> > >>>>>>>> Some more comments: > >>>>>>>> > >>>>>>>> 1. We need to discuss a bit about the default value of this new > >>>>> config. > >>>>>>>> Personally I think we need to be a bit conservative with large > >>>> values > >>>>>> so > >>>>>>>> that it would not have any perf regression compared with old > >>>> configs > >>>>>>>> especially with large topology and large number of partitions. > >>>>>>>> 2. I looked at the existing metrics, and do not have > >>> corresponding > >>>>>>> sensors. > >>>>>>>> How about also adding a task level metric indicating the > >> current > >>>>>> totally > >>>>>>>> aggregated metrics. The reason I do not suggest this metric on > >>> the > >>>>>>>> per-thread level is that in the future we may break the static > >>>>> mapping > >>>>>> of > >>>>>>>> tasks -> threads. > >>>>>>>> > >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can > >>>> rename > >>>>>> the > >>>>>>>> other "*cache.max.bytes.buffering*" as > >>> "statestore.cache.max.bytes" > >>>>>> (via > >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to > >>>> hear > >>>>>>>> others' thoughts. > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar < > >> sagarmeansoc...@gmail.com > >>>> > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi All, > >>>>>>>>> > >>>>>>>>> I would like to start a discussion on the following KIP: > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 > >>>>>>>>> > >>>>>>>>> Thanks! > >>>>>>>>> Sagar. > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>> > >>> > >>> > >>> -- > >>> -- Guozhang > >>> > >> > > >