Hi Sagar, Thanks for the added metrics, about its name, if it is proposed as a task-level config, then we do not need to prefix its name as `task-`. But on the other hand, it's better to give the full description of the metrics, like its type name / tag maps / recording levels etc, an example is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB Guozhang On Mon, Sep 20, 2021 at 10:04 AM Sagar <sagarmeansoc...@gmail.com> wrote: > Hi All, > > Bumping this thread again. > > Thanks! > Sagar. > > On Sat, Sep 11, 2021 at 2:04 PM Sagar <sagarmeansoc...@gmail.com> wrote: > > > Hi Mathias, > > > > I missed out on the metrics part. > > > > I have added the new metric in the proposed changes section along with > the > > small re-wording that you talked about. > > > > Let me know if that makes sense. > > > > Thanks! > > Sagar. > > > > On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> Thanks for the KIP. > >> > >> There was some discussion about adding a metric on the thread, but the > >> KIP does not contain anything about it. Did we drop this suggestion or > >> was the KIP not updated accordingly? > >> > >> > >> Nit: > >> > >> > This would be a global config applicable per processing topology > >> > >> Can we change this to `per Kafka Streams instance.` > >> > >> Atm, a Stream instance executes a single topology, so it does not make > >> any effective difference right now. However, it seems better (more > >> logical) to bind the config to the instance (not the topology the > >> instance executes). > >> > >> > >> -Matthias > >> > >> On 9/2/21 6:08 AM, Sagar wrote: > >> > Thanks Guozhang and Luke. > >> > > >> > I have updated the KIP with all the suggested changes. > >> > > >> > Do you think we could start voting for this? > >> > > >> > Thanks! > >> > Sagar. > >> > > >> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <show...@gmail.com> wrote: > >> > > >> >> Thanks for the KIP. Overall LGTM. > >> >> > >> >> Just one thought, if we "rename" the config directly as mentioned in > >> the > >> >> KIP, would that break existing applications? > >> >> Should we deprecate the old one first, and make the old/new names > >> co-exist > >> >> for some period of time? > >> >> > >> >> Public Interfaces > >> >> > >> >> - Adding a new config *input.buffer.max.bytes *applicable at a > >> topology > >> >> level. The importance of this config would be *Medium*. > >> >> - Renaming *cache.max.bytes.buffering* to > >> *statestore.cache.max.bytes*. > >> >> > >> >> > >> >> > >> >> Thank you. > >> >> Luke > >> >> > >> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> >> > >> >>> Currently the state store cache size default value is 10MB today, > >> which > >> >>> arguably is rather small. So I'm thinking maybe for this config > >> default > >> >> to > >> >>> 512MB. > >> >>> > >> >>> Other than that, LGTM. > >> >>> > >> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansoc...@gmail.com> > >> >> wrote: > >> >>> > >> >>>> Thanks Guozhang and Sophie. > >> >>>> > >> >>>> Yeah a small default value would lower the throughput. I didn't > quite > >> >>>> realise it earlier. It's slightly hard to predict this value so I > >> would > >> >>>> guess around 1/2 GB to 1 GB? WDYT? > >> >>>> > >> >>>> Regarding the renaming of the config and the new metric, sure would > >> >>> include > >> >>>> it in the KIP. > >> >>>> > >> >>>> Lastly, importance would also. be added. I guess Medium should be > ok. > >> >>>> > >> >>>> Thanks! > >> >>>> Sagar. > >> >>>> > >> >>>> > >> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman > >> >>>> <sop...@confluent.io.invalid> wrote: > >> >>>> > >> >>>>> 1) I agree that we should just distribute the bytes evenly, at > least > >> >>> for > >> >>>>> now. It's simpler to understand and > >> >>>>> we can always change it later, plus it makes sense to keep this > >> >> aligned > >> >>>>> with how the cache works today > >> >>>>> > >> >>>>> 2) +1 to being conservative in the generous sense, it's just not > >> >>>> something > >> >>>>> we can predict with any degree > >> >>>>> of accuracy and even if we could, the appropriate value is going > to > >> >>>> differ > >> >>>>> wildly across applications and use > >> >>>>> cases. We might want to just pick some multiple of the default > cache > >> >>>> size, > >> >>>>> and maybe do some research on > >> >>>>> other relevant defaults or sizes (default JVM heap, size of > >> available > >> >>>>> memory in common hosts eg EC2 > >> >>>>> instances, etc). We don't need to worry as much about erring on > the > >> >>> side > >> >>>> of > >> >>>>> too big, since other configs like > >> >>>>> the max.poll.records will help somewhat to keep it from exploding. > >> >>>>> > >> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config > name > >> >> to > >> >>> be > >> >>>>> incredibly confusing. Deprecating this in > >> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new > >> >>> input > >> >>>>> buffer config sounds good to me to include here. > >> >>>>> > >> >>>>> 5) The KIP should list all relevant public-facing changes, > including > >> >>>>> metadata like the config's "Importance". Personally > >> >>>>> I would recommend Medium, or even High if we're really worried > about > >> >>> the > >> >>>>> default being wrong for a lot of users > >> >>>>> > >> >>>>> Thanks for the KIP, besides those few things that Guozhang brought > >> up > >> >>> and > >> >>>>> the config importance, everything SGTM > >> >>>>> > >> >>>>> -Sophie > >> >>>>> > >> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com > > > >> >>>> wrote: > >> >>>>> > >> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the > >> >>>> configured > >> >>>>>> bytes among threads evenly. > >> >>>>>> > >> >>>>>> 2) I was actually thinking about making the default a large > enough > >> >>>> value > >> >>>>> so > >> >>>>>> that we would not introduce performance regression: thinking > about > >> >> a > >> >>>> use > >> >>>>>> case with many partitions and each record may be large, then > >> >>>> effectively > >> >>>>> we > >> >>>>>> would only start pausing when the total bytes buffered is pretty > >> >>> large. > >> >>>>> If > >> >>>>>> we set the default value to small, we would be "more aggressive" > on > >> >>>>> pausing > >> >>>>>> which may impact throughput. > >> >>>>>> > >> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group" > >> >>> class > >> >>>>>> since that represents the task's all input partitions. > >> >>>>>> > >> >>>>>> 4) This is just a bold thought, I'm interested to see other's > >> >>> thoughts. > >> >>>>>> > >> >>>>>> > >> >>>>>> Guozhang > >> >>>>>> > >> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com > > > >> >>>> wrote: > >> >>>>>> > >> >>>>>>> Thanks Guozhang. > >> >>>>>>> > >> >>>>>>> 1) Just for my confirmation, when you say we should proceed with > >> >>> the > >> >>>>> even > >> >>>>>>> distribution of bytes, are you referring to the Proposed > Solution > >> >>> in > >> >>>>> the > >> >>>>>>> KIP or the option you had considered in the JIRA? > >> >>>>>>> 2) Default value for the config is something that I missed. I > >> >> agree > >> >>>> we > >> >>>>>>> can't have really large values as it might be detrimental to the > >> >>>>>>> performance. Maybe, as a starting point, we assume that only 1 > >> >>> Stream > >> >>>>>> Task > >> >>>>>>> is running so what could be the ideal value in such a scenario? > >> >>>>> Somewhere > >> >>>>>>> around 10MB similar to the caching config? > >> >>>>>>> 3) When you say, *a task level metric indicating the current > >> >>> totally > >> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task > >> >>> level? > >> >>>>>>> 4) I am ok with the name change, but would like to know others' > >> >>>>> thoughts. > >> >>>>>>> > >> >>>>>>> Thanks! > >> >>>>>>> Sagar. > >> >>>>>>> > >> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang < > >> >> wangg...@gmail.com > >> >>>> > >> >>>>>> wrote: > >> >>>>>>> > >> >>>>>>>> Thanks Sagar for writing this PR. > >> >>>>>>>> > >> >>>>>>>> I think twice about the options that have been proposed in > >> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel > >> >> that > >> >>>> at > >> >>>>>> the > >> >>>>>>>> moment it's simpler to just do the even distribution of the > >> >>>>> configured > >> >>>>>>>> total bytes. My rationale is that right now we have a static > >> >>> tasks > >> >>>> -> > >> >>>>>>>> threads mapping, and hence each partition would only be fetched > >> >>> by > >> >>>> a > >> >>>>>>> single > >> >>>>>>>> thread / consumer at a given time. If in the future we break > >> >> that > >> >>>>>> static > >> >>>>>>>> mapping into dynamic mapping, then we would not be able to do > >> >>> this > >> >>>>> even > >> >>>>>>>> distribution. Instead we would have other threads polling from > >> >>>>> consumer > >> >>>>>>>> only, and those threads would be responsible for checking the > >> >>>> config > >> >>>>>> and > >> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But > >> >>>> since > >> >>>>>> at > >> >>>>>>>> that time we would not change the config but just how it would > >> >> be > >> >>>>>>>> implemented behind the scenes we would not need another KIP to > >> >>>> change > >> >>>>>> it. > >> >>>>>>>> > >> >>>>>>>> Some more comments: > >> >>>>>>>> > >> >>>>>>>> 1. We need to discuss a bit about the default value of this new > >> >>>>> config. > >> >>>>>>>> Personally I think we need to be a bit conservative with large > >> >>>> values > >> >>>>>> so > >> >>>>>>>> that it would not have any perf regression compared with old > >> >>>> configs > >> >>>>>>>> especially with large topology and large number of partitions. > >> >>>>>>>> 2. I looked at the existing metrics, and do not have > >> >>> corresponding > >> >>>>>>> sensors. > >> >>>>>>>> How about also adding a task level metric indicating the > >> >> current > >> >>>>>> totally > >> >>>>>>>> aggregated metrics. The reason I do not suggest this metric on > >> >>> the > >> >>>>>>>> per-thread level is that in the future we may break the static > >> >>>>> mapping > >> >>>>>> of > >> >>>>>>>> tasks -> threads. > >> >>>>>>>> > >> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can > >> >>>> rename > >> >>>>>> the > >> >>>>>>>> other "*cache.max.bytes.buffering*" as > >> >>> "statestore.cache.max.bytes" > >> >>>>>> (via > >> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to > >> >>>> hear > >> >>>>>>>> others' thoughts. > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> Guozhang > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar < > >> >> sagarmeansoc...@gmail.com > >> >>>> > >> >>>>>> wrote: > >> >>>>>>>> > >> >>>>>>>>> Hi All, > >> >>>>>>>>> > >> >>>>>>>>> I would like to start a discussion on the following KIP: > >> >>>>>>>>> > >> >>>>>>>> > >> >>>>>>> > >> >>>>>> > >> >>>>> > >> >>>> > >> >>> > >> >> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 > >> >>>>>>>>> > >> >>>>>>>>> Thanks! > >> >>>>>>>>> Sagar. > >> >>>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> -- > >> >>>>>>>> -- Guozhang > >> >>>>>>>> > >> >>>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> -- > >> >>>>>> -- Guozhang > >> >>>>>> > >> >>>>> > >> >>>> > >> >>> > >> >>> > >> >>> -- > >> >>> -- Guozhang > >> >>> > >> >> > >> > > >> > > > -- -- Guozhang