Tianji and Sachin (and also cc'ing people who I remember have reported similar RocksDB memory issues),
Sharing my experience with RocksDB tuning and also chatting with the RocksDB community: 1. If you are frequently flushing the state stores (e.g. with high commit frequency) then you will end up with huge number of very small memtable files, and hence result in very high compaction pressure on RocksDB; if you use default number of compaction threads (1) it will not be able to catch up with the write throughput and compaction rate, and hence the gradual degradation of performance. We have changed the default num.compaction.threads in trunk but if you are under released version 0.10.2 or older, check your store's flush rate metrics and consider increasing the compaction threads. 2. The most common memory leaks from RocksDB JNI are iterator leaks. Make sure to close the iterator return for your range queries / fetches from the stores when you are done. If not the corresponding scanned memory will be pinned in memory and cannot be compacted. Guozhang On Fri, Mar 17, 2017 at 8:56 AM, Eno Thereska <eno.there...@gmail.com> wrote: > Sachin, you also have a PR for this that could help, right?: > https://github.com/apache/kafka/pull/2642#issuecomment-287372367 < > https://github.com/apache/kafka/pull/2642#issuecomment-287372367>. > > Thanks > Eno > > > > On 17 Mar 2017, at 15:19, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > We also face same issues. > > What we have found is that rocksdb is the issue. With many instances of > > rocksdb per machine, over the time it slows down due to i/o operations, > > resulting in threads getting evicted because max.poll.interval exceeds > the > > set limit. > > > > Try running rocksdb in memory https://github.com/facebook/ > > rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F. > > > > Thanks > > Sachin > > > > > > > > On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <skyah...@gmail.com> wrote: > > > >> Hi Eno, > >> > >> I used 150, 50, 20 threads and the probabilities of crashing decreased > with > >> this number. When using 1 thread, no crash! > >> > >> My max.poll.interval is 5 minutes and all the processing won't last that > >> long, so that parameter does not help. > >> > >> > >> Thanks > >> Tianji > >> > >> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <eno.there...@gmail.com> > >> wrote: > >> > >>> Hi Tianji, > >>> > >>> How many threads does your app use? > >>> > >>> One reason is explained here: https://groups.google.com/ > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g < > https://groups.google.com/ > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to > >> increase > >>> max.poll.interval config value. > >>> If that doesn't work, could you revert to using one thread for now. > Also > >>> let us know either way since we might need to open a bug report. > >>> > >>> Thanks > >>> Eno > >>> > >>>> On 16 Mar 2017, at 20:47, Tianji Li <skyah...@gmail.com> wrote: > >>>> > >>>> Hi there, > >>>> > >>>> I always got this crashes and wonder if anyone knows why. Please let > me > >>>> know what information I should provide to help with trouble shooting. > >>>> > >>>> I am using 0.10.2.0. My application is reading one topic and then > >>>> groupBy().aggregate() 50 times on different keys. > >>>> > >>>> I use memory store, without backing to kafka. > >>>> > >>>> Thanks > >>>> Tianji > >>>> > >>>> > >>>> 2017-03-16 16:37:14.060 WARN 26139 --- [StreamThread-14] > >>>> o.a.k.s.p.internals.StreamThread : Could not create task 0_4. > >>> Will > >>>> retry. > >>>> > >>>> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to > >> lock > >>>> the state directory: /tmp/kafka-streams/xxx-test28/0_4 > >>>> at > >>>> org.apache.kafka.streams.processor.internals. > >>> ProcessorStateManager.<init>(ProcessorStateManager.java:102) > >>>> at > >>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>( > >>> AbstractTask.java:73) > >>>> at > >>>> org.apache.kafka.streams.processor.internals. > >>> StreamTask.<init>(StreamTask.java:108) > >>>> at > >>>> org.apache.kafka.streams.processor.internals. > >>> StreamThread.createStreamTask(StreamThread.java:834) > >>>> at > >>>> org.apache.kafka.streams.processor.internals.StreamThread$Ta > skCreator. > >>> createTask(StreamThread.java:1207) > >>>> at > >>>> org.apache.kafka.streams.processor.internals.StreamThread$ > >>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > >>>> at > >>>> org.apache.kafka.streams.processor.internals. > >>> StreamThread.addStreamTasks(StreamThread.java:937) > >>>> at > >>>> org.apache.kafka.streams.processor.internals.StreamThread.access$500( > >>> StreamThread.java:69) > >>>> at > >>>> org.apache.kafka.streams.processor.internals.StreamThread$1. > >>> onPartitionsAssigned(StreamThread.java:236) > >>>> at > >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > >>> onJoinComplete(ConsumerCoordinator.java:255) > >>>> at > >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >>> joinGroupIfNeeded(AbstractCoordinator.java:339) > >>>> at > >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >>> ensureActiveGroup(AbstractCoordinator.java:303) > >>>> at > >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > >>> ConsumerCoordinator.java:286) > >>>> at > >>>> org.apache.kafka.clients.consumer.KafkaConsumer. > >>> pollOnce(KafkaConsumer.java:1030) > >>>> at > >>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >>> KafkaConsumer.java:995) > >>>> at > >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >>> StreamThread.java:582) > >>>> at > >>>> org.apache.kafka.streams.processor.internals. > >>> StreamThread.run(StreamThread.java:368) > >>> > >>> > >> > > -- -- Guozhang