Hi Guozhang, About your first point: the alternative is not knowing how much memory a KafkaStreams instance will consume, since, as you mention, M and N can change. I agree the implementation is slightly harder since each cache now can change size dynamically (and the Kafka Streams instance needs to coordinate that).
About the different cache hit rates argument, I agree, a more sophisticated solution would provide variable-sized caches. But precisely this argument leads to a global configuration parameter being better in my opinion, since the Kafka Streams instance would get a total memory budged and do what's best with it. Note I am not suggesting we attempt this for the V1 implementation, just pointing out that it is possible to do variable-size caches with the global config. So overall we have a tradeoff between a more complex implementation but a guarantee on total memory usage (global setting), and a simple implementation but with variable memory usage (per-cache setting). Eno > On 5 Jun 2016, at 20:44, Guozhang Wang <wangg...@gmail.com> wrote: > > There are some details needed to be figured out if we go global: > > A KafkaStreams instance could have M threads, and each thread could various > number (let's say N, but in practice it may be different from thread to > thread) tasks, and each task contains a sub-topology with P caches (again > in practice it may be different depending on which sub-topology it > contains). > > Say if user specified X Gb for this KafkaStreams instance, then each cache > will get X / M / N / P Gb. But remember N and P can change from rebalance > to rebalance, and threads does not communicate with each other during their > life time. So it is hard to determine M and N dynamically. > > Plus, different caches may have different cache hit rate, so distributing > the memory evenly to them may not be an optimal solution (some caches may > be flushed much more frequently than others), and also since we are > considering to use instrumentation.getObjectSize which is approximate, it > may exaggerate the imbalance. > > > Guozhang > > > On Sat, Jun 4, 2016 at 11:54 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Jay, >> >> We can make it global instead of per-processor, sounds good. >> >> Thanks >> Eno >> >> >>> On 3 Jun 2016, at 23:15, Jay Kreps <j...@confluent.io> wrote: >>> >>> Hey Eno, >>> >>> Should the config be the global memory use rather than the per-processor? >>> That is, let’s say I know I have fixed a 1GB heap because that is what I >>> set for Java, and want to use 100MB for caching, it seems like right now >>> I’d have to do some math that depends on my knowing a bit about how >> caching >>> works to figure out how to set that parameter so I don't run out of >> memory. >>> Does it also depend on the number of partitions assigned (and hence the >>> number of task), if so that makes it even harder to set since each time >>> rebalancing happens that changes so it is then pretty hard to set safely. >>> >>> You could theoretically argue for either bottom up (you know how much >> cache >>> you need per processor as you have it and you want to get exactly that) >> or >>> top down (you know how much memory you have to spare but can't be >> bothered >>> to work out what that amounts to per-processor). I think our experience >> has >>> been that 99% of people never change the default and if it runs out of >>> memory they really struggle to fix it and kind of blame us, so I think >> top >>> down and a global config might be better. :-) >>> >>> Example: https://issues.apache.org/jira/browse/KAFKA-3775 >>> >>> -Jay >>> >>> On Fri, Jun 3, 2016 at 2:39 PM, Eno Thereska <eno.there...@gmail.com> >> wrote: >>> >>>> Hi Gwen, >>>> >>>> Yes. As an example, if cache.max.bytes.buffering set to X, and if users >>>> have A aggregation operators and T KTable.to() operators, then X*(A + T) >>>> total bytes will be allocated for caching. >>>> >>>> Eno >>>> >>>>> On 3 Jun 2016, at 21:37, Gwen Shapira <g...@confluent.io> wrote: >>>>> >>>>> Just to clarify: "cache.max.bytes.buffering" is per processor? >>>>> >>>>> >>>>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <eno.there...@gmail.com> >>>> wrote: >>>>>> Hi there, >>>>>> >>>>>> I have created KIP-63: Unify store and downstream caching in streams >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams >>>> < >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams >>>>> >>>>>> >>>>>> >>>>>> Feedback is appreciated. >>>>>> >>>>>> Thank you >>>>>> Eno >>>> >>>> >> >> > > > -- > -- Guozhang