If we are going with a global setting, can the cache just be one gigantic
LRU cache instead of many smaller ones?  (partition_name, task_name) can be
part of cache key.

If there are many smaller caches, it's hard to achieve efficient global
resource utilization (some caches are busy, some caches are very idle, and
the number of caches can change because of rebalancing).

On Mon, Jun 6, 2016 at 2:59 AM, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Guozhang,
>
> About your first point: the alternative is not knowing how much memory a
> KafkaStreams instance will consume, since, as you mention, M and N can
> change. I agree the implementation is slightly harder since each cache now
> can change size dynamically (and the Kafka Streams instance needs to
> coordinate that).
>
> About the different cache hit rates argument, I agree, a more
> sophisticated solution would provide variable-sized caches. But precisely
> this argument leads to a global configuration parameter being better in my
> opinion, since the Kafka Streams instance would get a total memory budged
> and do what's best with it. Note I am not suggesting we attempt this for
> the V1 implementation, just pointing out that it is possible to do
> variable-size caches with the global config.
>
> So overall we have a tradeoff between a more complex implementation but a
> guarantee on total memory usage (global setting), and a simple
> implementation but with variable memory usage (per-cache setting).
>
> Eno
>
> > On 5 Jun 2016, at 20:44, Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > There are some details needed to be figured out if we go global:
> >
> > A KafkaStreams instance could have M threads, and each thread could
> various
> > number (let's say N, but in practice it may be different from thread to
> > thread) tasks, and each task contains a sub-topology with P caches (again
> > in practice it may be different depending on which sub-topology it
> > contains).
> >
> > Say if user specified X Gb for this KafkaStreams instance, then each
> cache
> > will get X / M / N / P Gb. But remember N and P can change from rebalance
> > to rebalance, and threads does not communicate with each other during
> their
> > life time. So it is hard to determine M and N dynamically.
> >
> > Plus, different caches may have different cache hit rate, so distributing
> > the memory evenly to them may not be an optimal solution (some caches may
> > be flushed much more frequently than others), and also since we are
> > considering to use instrumentation.getObjectSize which is approximate, it
> > may exaggerate the imbalance.
> >
> >
> > Guozhang
> >
> >
> > On Sat, Jun 4, 2016 at 11:54 PM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Hi Jay,
> >>
> >> We can make it global instead of per-processor, sounds good.
> >>
> >> Thanks
> >> Eno
> >>
> >>
> >>> On 3 Jun 2016, at 23:15, Jay Kreps <j...@confluent.io> wrote:
> >>>
> >>> Hey Eno,
> >>>
> >>> Should the config be the global memory use rather than the
> per-processor?
> >>> That is, let’s say I know I have fixed a 1GB heap because that is what
> I
> >>> set for Java, and want to use 100MB for caching, it seems like right
> now
> >>> I’d have to do some math that depends on my knowing a bit about how
> >> caching
> >>> works to figure out how to set that parameter so I don't run out of
> >> memory.
> >>> Does it also depend on the number of partitions assigned (and hence the
> >>> number of task), if so that makes it even harder to set since each time
> >>> rebalancing happens that changes so it is then pretty hard to set
> safely.
> >>>
> >>> You could theoretically argue for either bottom up (you know how much
> >> cache
> >>> you need per processor as you have it and you want to get exactly that)
> >> or
> >>> top down (you know how much memory you have to spare but can't be
> >> bothered
> >>> to work out what that amounts to per-processor). I think our experience
> >> has
> >>> been that 99% of people never change the default and if it runs out of
> >>> memory they really struggle to fix it and kind of blame us, so I think
> >> top
> >>> down and a global config might be better. :-)
> >>>
> >>> Example: https://issues.apache.org/jira/browse/KAFKA-3775
> >>>
> >>> -Jay
> >>>
> >>> On Fri, Jun 3, 2016 at 2:39 PM, Eno Thereska <eno.there...@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Gwen,
> >>>>
> >>>> Yes. As an example, if cache.max.bytes.buffering set to X, and if
> users
> >>>> have A aggregation operators and T KTable.to() operators, then X*(A +
> T)
> >>>> total bytes will be allocated for caching.
> >>>>
> >>>> Eno
> >>>>
> >>>>> On 3 Jun 2016, at 21:37, Gwen Shapira <g...@confluent.io> wrote:
> >>>>>
> >>>>> Just to clarify: "cache.max.bytes.buffering" is per processor?
> >>>>>
> >>>>>
> >>>>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <
> eno.there...@gmail.com>
> >>>> wrote:
> >>>>>> Hi there,
> >>>>>>
> >>>>>> I have created KIP-63: Unify store and downstream caching in streams
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
> >>>> <
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> >>>>>
> >>>>>>
> >>>>>>
> >>>>>> Feedback is appreciated.
> >>>>>>
> >>>>>> Thank you
> >>>>>> Eno
> >>>>
> >>>>
> >>
> >>
> >
> >
> > --
> > -- Guozhang
>
>

Reply via email to