There are some details needed to be figured out if we go global: A KafkaStreams instance could have M threads, and each thread could various number (let's say N, but in practice it may be different from thread to thread) tasks, and each task contains a sub-topology with P caches (again in practice it may be different depending on which sub-topology it contains).
Say if user specified X Gb for this KafkaStreams instance, then each cache will get X / M / N / P Gb. But remember N and P can change from rebalance to rebalance, and threads does not communicate with each other during their life time. So it is hard to determine M and N dynamically. Plus, different caches may have different cache hit rate, so distributing the memory evenly to them may not be an optimal solution (some caches may be flushed much more frequently than others), and also since we are considering to use instrumentation.getObjectSize which is approximate, it may exaggerate the imbalance. Guozhang On Sat, Jun 4, 2016 at 11:54 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Jay, > > We can make it global instead of per-processor, sounds good. > > Thanks > Eno > > > > On 3 Jun 2016, at 23:15, Jay Kreps <j...@confluent.io> wrote: > > > > Hey Eno, > > > > Should the config be the global memory use rather than the per-processor? > > That is, let’s say I know I have fixed a 1GB heap because that is what I > > set for Java, and want to use 100MB for caching, it seems like right now > > I’d have to do some math that depends on my knowing a bit about how > caching > > works to figure out how to set that parameter so I don't run out of > memory. > > Does it also depend on the number of partitions assigned (and hence the > > number of task), if so that makes it even harder to set since each time > > rebalancing happens that changes so it is then pretty hard to set safely. > > > > You could theoretically argue for either bottom up (you know how much > cache > > you need per processor as you have it and you want to get exactly that) > or > > top down (you know how much memory you have to spare but can't be > bothered > > to work out what that amounts to per-processor). I think our experience > has > > been that 99% of people never change the default and if it runs out of > > memory they really struggle to fix it and kind of blame us, so I think > top > > down and a global config might be better. :-) > > > > Example: https://issues.apache.org/jira/browse/KAFKA-3775 > > > > -Jay > > > > On Fri, Jun 3, 2016 at 2:39 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > >> Hi Gwen, > >> > >> Yes. As an example, if cache.max.bytes.buffering set to X, and if users > >> have A aggregation operators and T KTable.to() operators, then X*(A + T) > >> total bytes will be allocated for caching. > >> > >> Eno > >> > >>> On 3 Jun 2016, at 21:37, Gwen Shapira <g...@confluent.io> wrote: > >>> > >>> Just to clarify: "cache.max.bytes.buffering" is per processor? > >>> > >>> > >>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <eno.there...@gmail.com> > >> wrote: > >>>> Hi there, > >>>> > >>>> I have created KIP-63: Unify store and downstream caching in streams > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams > >>> > >>>> > >>>> > >>>> Feedback is appreciated. > >>>> > >>>> Thank you > >>>> Eno > >> > >> > > -- -- Guozhang