So you were listing the difficulties in order to provide context for the upcoming design change and discussion? Its all good then :)
On Sun, Jun 5, 2016 at 11:26 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Don't get me wrong Gwen :) I'm definitely for removing as less burden as > possible from users. All I'm saying it is not straight-forward to do so, > and we'd better at least have a concrete implementation design on the KIP > page rather than just a one-line change of the config semantics. > > On Sun, Jun 5, 2016 at 1:09 PM, Gwen Shapira <g...@confluent.io> wrote: > >> If it is hard for you, just imagine how much fun the users will have. >> >> Machines have X GB available RAM. Someone has to figure out how to >> divide it for processors and rocksDB. The user doesn't have any >> special knowledge that you don't in this case, so there's no point in >> pushing the decision to the user - he won't be able to make a better >> decision. >> >> Gwen >> >> On Sun, Jun 5, 2016 at 10:44 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> > There are some details needed to be figured out if we go global: >> > >> > A KafkaStreams instance could have M threads, and each thread could >> various >> > number (let's say N, but in practice it may be different from thread to >> > thread) tasks, and each task contains a sub-topology with P caches (again >> > in practice it may be different depending on which sub-topology it >> > contains). >> > >> > Say if user specified X Gb for this KafkaStreams instance, then each >> cache >> > will get X / M / N / P Gb. But remember N and P can change from rebalance >> > to rebalance, and threads does not communicate with each other during >> their >> > life time. So it is hard to determine M and N dynamically. >> > >> > Plus, different caches may have different cache hit rate, so distributing >> > the memory evenly to them may not be an optimal solution (some caches may >> > be flushed much more frequently than others), and also since we are >> > considering to use instrumentation.getObjectSize which is approximate, it >> > may exaggerate the imbalance. >> > >> > >> > Guozhang >> > >> > >> > On Sat, Jun 4, 2016 at 11:54 PM, Eno Thereska <eno.there...@gmail.com> >> > wrote: >> > >> >> Hi Jay, >> >> >> >> We can make it global instead of per-processor, sounds good. >> >> >> >> Thanks >> >> Eno >> >> >> >> >> >> > On 3 Jun 2016, at 23:15, Jay Kreps <j...@confluent.io> wrote: >> >> > >> >> > Hey Eno, >> >> > >> >> > Should the config be the global memory use rather than the >> per-processor? >> >> > That is, let’s say I know I have fixed a 1GB heap because that is >> what I >> >> > set for Java, and want to use 100MB for caching, it seems like right >> now >> >> > I’d have to do some math that depends on my knowing a bit about how >> >> caching >> >> > works to figure out how to set that parameter so I don't run out of >> >> memory. >> >> > Does it also depend on the number of partitions assigned (and hence >> the >> >> > number of task), if so that makes it even harder to set since each >> time >> >> > rebalancing happens that changes so it is then pretty hard to set >> safely. >> >> > >> >> > You could theoretically argue for either bottom up (you know how much >> >> cache >> >> > you need per processor as you have it and you want to get exactly >> that) >> >> or >> >> > top down (you know how much memory you have to spare but can't be >> >> bothered >> >> > to work out what that amounts to per-processor). I think our >> experience >> >> has >> >> > been that 99% of people never change the default and if it runs out of >> >> > memory they really struggle to fix it and kind of blame us, so I think >> >> top >> >> > down and a global config might be better. :-) >> >> > >> >> > Example: https://issues.apache.org/jira/browse/KAFKA-3775 >> >> > >> >> > -Jay >> >> > >> >> > On Fri, Jun 3, 2016 at 2:39 PM, Eno Thereska <eno.there...@gmail.com> >> >> wrote: >> >> > >> >> >> Hi Gwen, >> >> >> >> >> >> Yes. As an example, if cache.max.bytes.buffering set to X, and if >> users >> >> >> have A aggregation operators and T KTable.to() operators, then X*(A >> + T) >> >> >> total bytes will be allocated for caching. >> >> >> >> >> >> Eno >> >> >> >> >> >>> On 3 Jun 2016, at 21:37, Gwen Shapira <g...@confluent.io> wrote: >> >> >>> >> >> >>> Just to clarify: "cache.max.bytes.buffering" is per processor? >> >> >>> >> >> >>> >> >> >>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska < >> eno.there...@gmail.com> >> >> >> wrote: >> >> >>>> Hi there, >> >> >>>> >> >> >>>> I have created KIP-63: Unify store and downstream caching in >> streams >> >> >>>> >> >> >> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams >> >> >> < >> >> >> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams >> >> >>> >> >> >>>> >> >> >>>> >> >> >>>> Feedback is appreciated. >> >> >>>> >> >> >>>> Thank you >> >> >>>> Eno >> >> >> >> >> >> >> >> >> >> >> > >> > >> > -- >> > -- Guozhang >> > > > > -- > -- Guozhang