Hi Sagar,

Thanks for the added metrics, about its name, if it is proposed as a
task-level config, then we do not need to prefix its name as `task-`. But
on the other hand, it's better to give the full description of the metrics,
like its type name / tag maps / recording levels etc, an example is here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB

Guozhang

On Mon, Sep 20, 2021 at 10:04 AM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi All,
>
> Bumping this thread again.
>
> Thanks!
> Sagar.
>
> On Sat, Sep 11, 2021 at 2:04 PM Sagar <sagarmeansoc...@gmail.com> wrote:
>
> > Hi Mathias,
> >
> > I missed out on the metrics part.
> >
> > I have added the new metric in the proposed changes section along with
> the
> > small re-wording that you talked about.
> >
> > Let me know if that makes sense.
> >
> > Thanks!
> > Sagar.
> >
> > On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Thanks for the KIP.
> >>
> >> There was some discussion about adding a metric on the thread, but the
> >> KIP does not contain anything about it. Did we drop this suggestion or
> >> was the KIP not updated accordingly?
> >>
> >>
> >> Nit:
> >>
> >> > This would be a global config applicable per processing topology
> >>
> >> Can we change this to `per Kafka Streams instance.`
> >>
> >> Atm, a Stream instance executes a single topology, so it does not make
> >> any effective difference right now. However, it seems better (more
> >> logical) to bind the config to the instance (not the topology the
> >> instance executes).
> >>
> >>
> >> -Matthias
> >>
> >> On 9/2/21 6:08 AM, Sagar wrote:
> >> > Thanks Guozhang and Luke.
> >> >
> >> > I have updated the KIP with all the suggested changes.
> >> >
> >> > Do you think we could start voting for this?
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <show...@gmail.com> wrote:
> >> >
> >> >> Thanks for the KIP. Overall LGTM.
> >> >>
> >> >> Just one thought, if we "rename" the config directly as mentioned in
> >> the
> >> >> KIP, would that break existing applications?
> >> >> Should we deprecate the old one first, and make the old/new names
> >> co-exist
> >> >> for some period of time?
> >> >>
> >> >> Public Interfaces
> >> >>
> >> >>    - Adding a new config *input.buffer.max.bytes *applicable at a
> >> topology
> >> >>    level. The importance of this config would be *Medium*.
> >> >>    - Renaming *cache.max.bytes.buffering* to
> >> *statestore.cache.max.bytes*.
> >> >>
> >> >>
> >> >>
> >> >> Thank you.
> >> >> Luke
> >> >>
> >> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Currently the state store cache size default value is 10MB today,
> >> which
> >> >>> arguably is rather small. So I'm thinking maybe for this config
> >> default
> >> >> to
> >> >>> 512MB.
> >> >>>
> >> >>> Other than that, LGTM.
> >> >>>
> >> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansoc...@gmail.com>
> >> >> wrote:
> >> >>>
> >> >>>> Thanks Guozhang and Sophie.
> >> >>>>
> >> >>>> Yeah a small default value would lower the throughput. I didn't
> quite
> >> >>>> realise it earlier. It's slightly hard to predict this value so I
> >> would
> >> >>>> guess around 1/2 GB to 1 GB? WDYT?
> >> >>>>
> >> >>>> Regarding the renaming of the config and the new metric, sure would
> >> >>> include
> >> >>>> it in the KIP.
> >> >>>>
> >> >>>> Lastly, importance would also. be added. I guess Medium should be
> ok.
> >> >>>>
> >> >>>> Thanks!
> >> >>>> Sagar.
> >> >>>>
> >> >>>>
> >> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> >> >>>> <sop...@confluent.io.invalid> wrote:
> >> >>>>
> >> >>>>> 1) I agree that we should just distribute the bytes evenly, at
> least
> >> >>> for
> >> >>>>> now. It's simpler to understand and
> >> >>>>> we can always change it later, plus it makes sense to keep this
> >> >> aligned
> >> >>>>> with how the cache works today
> >> >>>>>
> >> >>>>> 2) +1 to being conservative in the generous sense, it's just not
> >> >>>> something
> >> >>>>> we can predict with any degree
> >> >>>>> of accuracy and even if we could, the appropriate value is going
> to
> >> >>>> differ
> >> >>>>> wildly across applications and use
> >> >>>>> cases. We might want to just pick some multiple of the default
> cache
> >> >>>> size,
> >> >>>>> and maybe do some research on
> >> >>>>> other relevant defaults or sizes (default JVM heap, size of
> >> available
> >> >>>>> memory in common hosts eg EC2
> >> >>>>> instances, etc). We don't need to worry as much about erring on
> the
> >> >>> side
> >> >>>> of
> >> >>>>> too big, since other configs like
> >> >>>>> the max.poll.records will help somewhat to keep it from exploding.
> >> >>>>>
> >> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config
> name
> >> >> to
> >> >>> be
> >> >>>>> incredibly confusing. Deprecating this in
> >> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new
> >> >>> input
> >> >>>>> buffer config sounds good to me to include here.
> >> >>>>>
> >> >>>>> 5) The KIP should list all relevant public-facing changes,
> including
> >> >>>>> metadata like the config's "Importance". Personally
> >> >>>>> I would recommend Medium, or even High if we're really worried
> about
> >> >>> the
> >> >>>>> default being wrong for a lot of users
> >> >>>>>
> >> >>>>> Thanks for the KIP, besides those few things that Guozhang brought
> >> up
> >> >>> and
> >> >>>>> the config importance, everything SGTM
> >> >>>>>
> >> >>>>> -Sophie
> >> >>>>>
> >> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>
> >> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the
> >> >>>> configured
> >> >>>>>> bytes among threads evenly.
> >> >>>>>>
> >> >>>>>> 2) I was actually thinking about making the default a large
> enough
> >> >>>> value
> >> >>>>> so
> >> >>>>>> that we would not introduce performance regression: thinking
> about
> >> >> a
> >> >>>> use
> >> >>>>>> case with many partitions and each record may be large, then
> >> >>>> effectively
> >> >>>>> we
> >> >>>>>> would only start pausing when the total bytes buffered is pretty
> >> >>> large.
> >> >>>>> If
> >> >>>>>> we set the default value to small, we would be "more aggressive"
> on
> >> >>>>> pausing
> >> >>>>>> which may impact throughput.
> >> >>>>>>
> >> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group"
> >> >>> class
> >> >>>>>> since that represents the task's all input partitions.
> >> >>>>>>
> >> >>>>>> 4) This is just a bold thought, I'm interested to see other's
> >> >>> thoughts.
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> Guozhang
> >> >>>>>>
> >> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>>
> >> >>>>>>> Thanks Guozhang.
> >> >>>>>>>
> >> >>>>>>> 1) Just for my confirmation, when you say we should proceed with
> >> >>> the
> >> >>>>> even
> >> >>>>>>> distribution of bytes, are you referring to the Proposed
> Solution
> >> >>> in
> >> >>>>> the
> >> >>>>>>> KIP or the option you had considered in the JIRA?
> >> >>>>>>> 2) Default value for the config is something that I missed. I
> >> >> agree
> >> >>>> we
> >> >>>>>>> can't have really large values as it might be detrimental to the
> >> >>>>>>> performance. Maybe, as a starting point, we assume that only 1
> >> >>> Stream
> >> >>>>>> Task
> >> >>>>>>> is running so what could be the ideal value in such a scenario?
> >> >>>>> Somewhere
> >> >>>>>>> around 10MB similar to the caching config?
> >> >>>>>>> 3) When you say,  *a task level metric indicating the current
> >> >>> totally
> >> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task
> >> >>> level?
> >> >>>>>>> 4) I am ok with the name change, but would like to know others'
> >> >>>>> thoughts.
> >> >>>>>>>
> >> >>>>>>> Thanks!
> >> >>>>>>> Sagar.
> >> >>>>>>>
> >> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
> >> >> wangg...@gmail.com
> >> >>>>
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>>> Thanks Sagar for writing this PR.
> >> >>>>>>>>
> >> >>>>>>>> I think twice about the options that have been proposed in
> >> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel
> >> >> that
> >> >>>> at
> >> >>>>>> the
> >> >>>>>>>> moment it's simpler to just do the even distribution of the
> >> >>>>> configured
> >> >>>>>>>> total bytes. My rationale is that right now we have a static
> >> >>> tasks
> >> >>>> ->
> >> >>>>>>>> threads mapping, and hence each partition would only be fetched
> >> >>> by
> >> >>>> a
> >> >>>>>>> single
> >> >>>>>>>> thread / consumer at a given time. If in the future we break
> >> >> that
> >> >>>>>> static
> >> >>>>>>>> mapping into dynamic mapping, then we would not be able to do
> >> >>> this
> >> >>>>> even
> >> >>>>>>>> distribution. Instead we would have other threads polling from
> >> >>>>> consumer
> >> >>>>>>>> only, and those threads would be responsible for checking the
> >> >>>> config
> >> >>>>>> and
> >> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But
> >> >>>> since
> >> >>>>>> at
> >> >>>>>>>> that time we would not change the config but just how it would
> >> >> be
> >> >>>>>>>> implemented behind the scenes we would not need another KIP to
> >> >>>> change
> >> >>>>>> it.
> >> >>>>>>>>
> >> >>>>>>>> Some more comments:
> >> >>>>>>>>
> >> >>>>>>>> 1. We need to discuss a bit about the default value of this new
> >> >>>>> config.
> >> >>>>>>>> Personally I think we need to be a bit conservative with large
> >> >>>> values
> >> >>>>>> so
> >> >>>>>>>> that it would not have any perf regression compared with old
> >> >>>> configs
> >> >>>>>>>> especially with large topology and large number of partitions.
> >> >>>>>>>> 2. I looked at the existing metrics, and do not have
> >> >>> corresponding
> >> >>>>>>> sensors.
> >> >>>>>>>> How about also adding a task level metric indicating the
> >> >> current
> >> >>>>>> totally
> >> >>>>>>>> aggregated metrics. The reason I do not suggest this metric on
> >> >>> the
> >> >>>>>>>> per-thread level is that in the future we may break the static
> >> >>>>> mapping
> >> >>>>>> of
> >> >>>>>>>> tasks -> threads.
> >> >>>>>>>>
> >> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can
> >> >>>> rename
> >> >>>>>> the
> >> >>>>>>>> other "*cache.max.bytes.buffering*" as
> >> >>> "statestore.cache.max.bytes"
> >> >>>>>> (via
> >> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to
> >> >>>> hear
> >> >>>>>>>> others' thoughts.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Guozhang
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar <
> >> >> sagarmeansoc...@gmail.com
> >> >>>>
> >> >>>>>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> Hi All,
> >> >>>>>>>>>
> >> >>>>>>>>> I would like to start a discussion on the following KIP:
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks!
> >> >>>>>>>>> Sagar.
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> --
> >> >>>>>>>> -- Guozhang
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> --
> >> >>>>>> -- Guozhang
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> -- Guozhang
> >> >>>
> >> >>
> >> >
> >>
> >
>


-- 
-- Guozhang

Reply via email to