I'm in favor of a global config that is then evenly divided amongst the threads of a Kafka Streams instance.
On Mon, Jun 13, 2016 at 6:23 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Although this KIP is not mainly for memory management of Kafka Streams, > since it touches on quite some part of it I think it is good to first think > of what we would REALLY want as an end goal for memory usage in order to > make sure that whatever we proposed in this KIP aligns with that long-term > plan. So I wrote up this discussion page that summarized my current > thoughts: > > > https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams > > As for its implication on this KIP, my personal take is that: > > 1. we should use a global config in terms of bytes, which will then be > evenly divided among the threads within the Kafka Streams instance, but > within a thread that config can be used to control the total size of all > caches instead of further dividing that among all caches. > > 2. instead of caching in terms of deserialized objects we may need to > consider just caching in terms of serialized bytes; admittedly it will > incur costs of doing serdes for caching, but without doing so I honestly > have no concrete clue how we can measure the current memory usage > accurately AND efficiently (after reading the links Ismael sent me I feel > the accurate estimates for collection types / composite types like String > will do some serialize with sun.misc.Unsafe anyways when it uses reflection > to crawl the object graph) although we may need to do some benchmarking > with https://github.com/jbellis/jamm, for example to validate this claim > or > someone tell me that there is actually a better way that I'm not aware of.. > > > Guozhang > > > On Mon, Jun 13, 2016 at 3:17 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > I am just catching up on this thread. > > > > From my point of view, easy tuning for the user is the most important > > thing, because Kafka Streams is a library. Thus, a global cache size > > parameter should be the best. > > > > About dividing the memory vs a single global cache. I would argue that > > in the first place dividing the memory would be good, as synchronization > > might kill the performance. About the cache sizes, I was thinking about > > starting with an even distribution and adjust the individual cache sizes > > during runtime. > > > > The dynamic adjustment can also be added later on. We need to figure out > > a good internal monitoring and "cost function" to determine which task > > needs more memory and which less. Some metrics to do this might be > > number-of-assigned-keys, size-of-key-value-pairs, update-frequency etc. > > > > I have to confess, that I have no idea right now, how to design the > > "cost function" to compute the memory size for each task. But if we want > > to add dynamic memory management later on, it might be a good idea to > > keep it in mind and align this KIP already for future improvements. > > > > -Matthias > > > > > > On 06/09/2016 05:24 AM, Henry Cai wrote: > > > One more thing for this KIP: > > > > > > Currently RocksDBWindowStore serialize the key/value before it puts > into > > > the in-memory cache, I think we should delay this > > > serialization/deserialization unless it needs flush to db. For a > simple > > > countByKey for 100 records, this would trigger 100 > > > serialization/deserialization even if everything is in-memory. > > > > > > If we move this internal cache from RocksDBStore to a global place, I > > hope > > > we can reduces the time it needs to do the serialization. > > > > > > > > > On Mon, Jun 6, 2016 at 11:07 AM, Ismael Juma <ism...@juma.me.uk> > wrote: > > > > > >> On Mon, Jun 6, 2016 at 6:48 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > >>> > > >>> About using Instrumentation.getObjectSize, yeah we were worried a lot > > >> about > > >>> its efficiency as well as accuracy when discussing internally, but > not > > a > > >>> better solution was proposed. So if people have better ideas, please > > >> throw > > >>> them here, as it is also the purpose for us to call out such KIP > > >> discussion > > >>> threads. > > >>> > > >> > > >> Note that this requires a Java agent to be configured. A few links: > > >> > > >> > > >> > > > https://github.com/apache/spark/blob/b0ce0d13127431fa7cd4c11064762eb0b12e3436/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala > > >> > > >> > > > https://github.com/apache/cassandra/blob/3dcbe90e02440e6ee534f643c7603d50ca08482b/src/java/org/apache/cassandra/utils/ObjectSizes.java > > >> https://github.com/jbellis/jamm > > >> http://openjdk.java.net/projects/code-tools/jol/ > > >> https://github.com/DimitrisAndreou/memory-measurer > > >> > > >> OK, maybe that's more than what you wanted. :) > > >> > > >> Ismael > > >> > > > > > > > > > > -- > -- Guozhang > -- Thanks, Neha