Hi Mathias,

I missed out on the metrics part.

I have added the new metric in the proposed changes section along with the
small re-wording that you talked about.

Let me know if that makes sense.

Thanks!
Sagar.

On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for the KIP.
>
> There was some discussion about adding a metric on the thread, but the
> KIP does not contain anything about it. Did we drop this suggestion or
> was the KIP not updated accordingly?
>
>
> Nit:
>
> > This would be a global config applicable per processing topology
>
> Can we change this to `per Kafka Streams instance.`
>
> Atm, a Stream instance executes a single topology, so it does not make
> any effective difference right now. However, it seems better (more
> logical) to bind the config to the instance (not the topology the
> instance executes).
>
>
> -Matthias
>
> On 9/2/21 6:08 AM, Sagar wrote:
> > Thanks Guozhang and Luke.
> >
> > I have updated the KIP with all the suggested changes.
> >
> > Do you think we could start voting for this?
> >
> > Thanks!
> > Sagar.
> >
> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <show...@gmail.com> wrote:
> >
> >> Thanks for the KIP. Overall LGTM.
> >>
> >> Just one thought, if we "rename" the config directly as mentioned in the
> >> KIP, would that break existing applications?
> >> Should we deprecate the old one first, and make the old/new names
> co-exist
> >> for some period of time?
> >>
> >> Public Interfaces
> >>
> >>    - Adding a new config *input.buffer.max.bytes *applicable at a
> topology
> >>    level. The importance of this config would be *Medium*.
> >>    - Renaming *cache.max.bytes.buffering* to
> *statestore.cache.max.bytes*.
> >>
> >>
> >>
> >> Thank you.
> >> Luke
> >>
> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> Currently the state store cache size default value is 10MB today, which
> >>> arguably is rather small. So I'm thinking maybe for this config default
> >> to
> >>> 512MB.
> >>>
> >>> Other than that, LGTM.
> >>>
> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansoc...@gmail.com>
> >> wrote:
> >>>
> >>>> Thanks Guozhang and Sophie.
> >>>>
> >>>> Yeah a small default value would lower the throughput. I didn't quite
> >>>> realise it earlier. It's slightly hard to predict this value so I
> would
> >>>> guess around 1/2 GB to 1 GB? WDYT?
> >>>>
> >>>> Regarding the renaming of the config and the new metric, sure would
> >>> include
> >>>> it in the KIP.
> >>>>
> >>>> Lastly, importance would also. be added. I guess Medium should be ok.
> >>>>
> >>>> Thanks!
> >>>> Sagar.
> >>>>
> >>>>
> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> >>>> <sop...@confluent.io.invalid> wrote:
> >>>>
> >>>>> 1) I agree that we should just distribute the bytes evenly, at least
> >>> for
> >>>>> now. It's simpler to understand and
> >>>>> we can always change it later, plus it makes sense to keep this
> >> aligned
> >>>>> with how the cache works today
> >>>>>
> >>>>> 2) +1 to being conservative in the generous sense, it's just not
> >>>> something
> >>>>> we can predict with any degree
> >>>>> of accuracy and even if we could, the appropriate value is going to
> >>>> differ
> >>>>> wildly across applications and use
> >>>>> cases. We might want to just pick some multiple of the default cache
> >>>> size,
> >>>>> and maybe do some research on
> >>>>> other relevant defaults or sizes (default JVM heap, size of available
> >>>>> memory in common hosts eg EC2
> >>>>> instances, etc). We don't need to worry as much about erring on the
> >>> side
> >>>> of
> >>>>> too big, since other configs like
> >>>>> the max.poll.records will help somewhat to keep it from exploding.
> >>>>>
> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config name
> >> to
> >>> be
> >>>>> incredibly confusing. Deprecating this in
> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new
> >>> input
> >>>>> buffer config sounds good to me to include here.
> >>>>>
> >>>>> 5) The KIP should list all relevant public-facing changes, including
> >>>>> metadata like the config's "Importance". Personally
> >>>>> I would recommend Medium, or even High if we're really worried about
> >>> the
> >>>>> default being wrong for a lot of users
> >>>>>
> >>>>> Thanks for the KIP, besides those few things that Guozhang brought up
> >>> and
> >>>>> the config importance, everything SGTM
> >>>>>
> >>>>> -Sophie
> >>>>>
> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the
> >>>> configured
> >>>>>> bytes among threads evenly.
> >>>>>>
> >>>>>> 2) I was actually thinking about making the default a large enough
> >>>> value
> >>>>> so
> >>>>>> that we would not introduce performance regression: thinking about
> >> a
> >>>> use
> >>>>>> case with many partitions and each record may be large, then
> >>>> effectively
> >>>>> we
> >>>>>> would only start pausing when the total bytes buffered is pretty
> >>> large.
> >>>>> If
> >>>>>> we set the default value to small, we would be "more aggressive" on
> >>>>> pausing
> >>>>>> which may impact throughput.
> >>>>>>
> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group"
> >>> class
> >>>>>> since that represents the task's all input partitions.
> >>>>>>
> >>>>>> 4) This is just a bold thought, I'm interested to see other's
> >>> thoughts.
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Thanks Guozhang.
> >>>>>>>
> >>>>>>> 1) Just for my confirmation, when you say we should proceed with
> >>> the
> >>>>> even
> >>>>>>> distribution of bytes, are you referring to the Proposed Solution
> >>> in
> >>>>> the
> >>>>>>> KIP or the option you had considered in the JIRA?
> >>>>>>> 2) Default value for the config is something that I missed. I
> >> agree
> >>>> we
> >>>>>>> can't have really large values as it might be detrimental to the
> >>>>>>> performance. Maybe, as a starting point, we assume that only 1
> >>> Stream
> >>>>>> Task
> >>>>>>> is running so what could be the ideal value in such a scenario?
> >>>>> Somewhere
> >>>>>>> around 10MB similar to the caching config?
> >>>>>>> 3) When you say,  *a task level metric indicating the current
> >>> totally
> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task
> >>> level?
> >>>>>>> 4) I am ok with the name change, but would like to know others'
> >>>>> thoughts.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
> >> wangg...@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks Sagar for writing this PR.
> >>>>>>>>
> >>>>>>>> I think twice about the options that have been proposed in
> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel
> >> that
> >>>> at
> >>>>>> the
> >>>>>>>> moment it's simpler to just do the even distribution of the
> >>>>> configured
> >>>>>>>> total bytes. My rationale is that right now we have a static
> >>> tasks
> >>>> ->
> >>>>>>>> threads mapping, and hence each partition would only be fetched
> >>> by
> >>>> a
> >>>>>>> single
> >>>>>>>> thread / consumer at a given time. If in the future we break
> >> that
> >>>>>> static
> >>>>>>>> mapping into dynamic mapping, then we would not be able to do
> >>> this
> >>>>> even
> >>>>>>>> distribution. Instead we would have other threads polling from
> >>>>> consumer
> >>>>>>>> only, and those threads would be responsible for checking the
> >>>> config
> >>>>>> and
> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But
> >>>> since
> >>>>>> at
> >>>>>>>> that time we would not change the config but just how it would
> >> be
> >>>>>>>> implemented behind the scenes we would not need another KIP to
> >>>> change
> >>>>>> it.
> >>>>>>>>
> >>>>>>>> Some more comments:
> >>>>>>>>
> >>>>>>>> 1. We need to discuss a bit about the default value of this new
> >>>>> config.
> >>>>>>>> Personally I think we need to be a bit conservative with large
> >>>> values
> >>>>>> so
> >>>>>>>> that it would not have any perf regression compared with old
> >>>> configs
> >>>>>>>> especially with large topology and large number of partitions.
> >>>>>>>> 2. I looked at the existing metrics, and do not have
> >>> corresponding
> >>>>>>> sensors.
> >>>>>>>> How about also adding a task level metric indicating the
> >> current
> >>>>>> totally
> >>>>>>>> aggregated metrics. The reason I do not suggest this metric on
> >>> the
> >>>>>>>> per-thread level is that in the future we may break the static
> >>>>> mapping
> >>>>>> of
> >>>>>>>> tasks -> threads.
> >>>>>>>>
> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can
> >>>> rename
> >>>>>> the
> >>>>>>>> other "*cache.max.bytes.buffering*" as
> >>> "statestore.cache.max.bytes"
> >>>>>> (via
> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to
> >>>> hear
> >>>>>>>> others' thoughts.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar <
> >> sagarmeansoc...@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi All,
> >>>>>>>>>
> >>>>>>>>> I would like to start a discussion on the following KIP:
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> Sagar.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
>

Reply via email to