About the distribution of memory: I think it is good to evenly distribute
the memory across threads, while what I was calling our for considerata is
the distribution of memory within a thread, to its tasks / caches of the
task, as the number of tasks and the number of caches in the task can vary
from rebalance to rebalance, hence the cache size (if distribute evenly
again) will be different from time to time as well; and it may be
sub-optimal to still do even distribution within a thread.

Henry's suggestion of having a global cache per thread (i.e. we are still
partitioning the cache across threads, but within a thread we use a single
cache) sounds interesting, and we still need to figure out how to avoid
conflicts for the same key objects from different operators (assuming this
global cache is a map from Object to Object).

About using Instrumentation.getObjectSize, yeah we were worried a lot about
its efficiency as well as accuracy when discussing internally, but not a
better solution was proposed. So if people have better ideas, please throw
them here, as it is also the purpose for us to call out such KIP discussion
threads.


Guozhang



On Mon, Jun 6, 2016 at 8:49 AM, Jay Kreps <j...@confluent.io> wrote:

> For the threads, I think it might be reasonable to just give each thread
> cache_bytes/num_threads for simplicity? Presumably you wouldn't want to
> have to make the cache threadsafe so you'd want to avoid sharing? Giving
> each thread the same cache isn't quite right, as you point out, but
> depending on how random task assignment is might be okay? This type of
> randomization actually seems important from a CPU load balancing
> point-of-view too (i.e. you don't want all the tasks for some sub-topology
> going to the same cpu core).
>
> For Instrumentation.getObjectSize, does it really give the size of the
> referenced object graph or is it more like size_of in C? Is it fast enough
> to use for this kind of thing (seems like computing the "deep" object size
> could be easy to overwhelm the cost of the hash table lookup)?
>
> -Jay
>
>
>
> On Sun, Jun 5, 2016 at 12:44 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > There are some details needed to be figured out if we go global:
> >
> > A KafkaStreams instance could have M threads, and each thread could
> various
> > number (let's say N, but in practice it may be different from thread to
> > thread) tasks, and each task contains a sub-topology with P caches (again
> > in practice it may be different depending on which sub-topology it
> > contains).
> >
> > Say if user specified X Gb for this KafkaStreams instance, then each
> cache
> > will get X / M / N / P Gb. But remember N and P can change from rebalance
> > to rebalance, and threads does not communicate with each other during
> their
> > life time. So it is hard to determine M and N dynamically.
> >
> > Plus, different caches may have different cache hit rate, so distributing
> > the memory evenly to them may not be an optimal solution (some caches may
> > be flushed much more frequently than others), and also since we are
> > considering to use instrumentation.getObjectSize which is approximate, it
> > may exaggerate the imbalance.
> >
> >
> > Guozhang
> >
> >
> > On Sat, Jun 4, 2016 at 11:54 PM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> > > Hi Jay,
> > >
> > > We can make it global instead of per-processor, sounds good.
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On 3 Jun 2016, at 23:15, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > Hey Eno,
> > > >
> > > > Should the config be the global memory use rather than the
> > per-processor?
> > > > That is, let’s say I know I have fixed a 1GB heap because that is
> what
> > I
> > > > set for Java, and want to use 100MB for caching, it seems like right
> > now
> > > > I’d have to do some math that depends on my knowing a bit about how
> > > caching
> > > > works to figure out how to set that parameter so I don't run out of
> > > memory.
> > > > Does it also depend on the number of partitions assigned (and hence
> the
> > > > number of task), if so that makes it even harder to set since each
> time
> > > > rebalancing happens that changes so it is then pretty hard to set
> > safely.
> > > >
> > > > You could theoretically argue for either bottom up (you know how much
> > > cache
> > > > you need per processor as you have it and you want to get exactly
> that)
> > > or
> > > > top down (you know how much memory you have to spare but can't be
> > > bothered
> > > > to work out what that amounts to per-processor). I think our
> experience
> > > has
> > > > been that 99% of people never change the default and if it runs out
> of
> > > > memory they really struggle to fix it and kind of blame us, so I
> think
> > > top
> > > > down and a global config might be better. :-)
> > > >
> > > > Example: https://issues.apache.org/jira/browse/KAFKA-3775
> > > >
> > > > -Jay
> > > >
> > > > On Fri, Jun 3, 2016 at 2:39 PM, Eno Thereska <eno.there...@gmail.com
> >
> > > wrote:
> > > >
> > > >> Hi Gwen,
> > > >>
> > > >> Yes. As an example, if cache.max.bytes.buffering set to X, and if
> > users
> > > >> have A aggregation operators and T KTable.to() operators, then X*(A
> +
> > T)
> > > >> total bytes will be allocated for caching.
> > > >>
> > > >> Eno
> > > >>
> > > >>> On 3 Jun 2016, at 21:37, Gwen Shapira <g...@confluent.io> wrote:
> > > >>>
> > > >>> Just to clarify: "cache.max.bytes.buffering" is per processor?
> > > >>>
> > > >>>
> > > >>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <
> > eno.there...@gmail.com>
> > > >> wrote:
> > > >>>> Hi there,
> > > >>>>
> > > >>>> I have created KIP-63: Unify store and downstream caching in
> streams
> > > >>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
> > > >> <
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > > >>>
> > > >>>>
> > > >>>>
> > > >>>> Feedback is appreciated.
> > > >>>>
> > > >>>> Thank you
> > > >>>> Eno
> > > >>
> > > >>
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to