Hi Guys, Great information again as usual, very helpful!
Very appreciated, thanks so much! Tianji PS: The Kafka Community is simply great! On Fri, Mar 17, 2017 at 3:00 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Tianji and Sachin (and also cc'ing people who I remember have reported > similar RocksDB memory issues), > > Sharing my experience with RocksDB tuning and also chatting with the > RocksDB community: > > 1. If you are frequently flushing the state stores (e.g. with high commit > frequency) then you will end up with huge number of very small memtable > files, and hence result in very high compaction pressure on RocksDB; if you > use default number of compaction threads (1) it will not be able to catch > up with the write throughput and compaction rate, and hence the gradual > degradation of performance. We have changed the default > num.compaction.threads in trunk but if you are under released version > 0.10.2 or older, check your store's flush rate metrics and consider > increasing the compaction threads. > > 2. The most common memory leaks from RocksDB JNI are iterator leaks. Make > sure to close the iterator return for your range queries / fetches from the > stores when you are done. If not the corresponding scanned memory will be > pinned in memory and cannot be compacted. > > > Guozhang > > > On Fri, Mar 17, 2017 at 8:56 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > Sachin, you also have a PR for this that could help, right?: > > https://github.com/apache/kafka/pull/2642#issuecomment-287372367 < > > https://github.com/apache/kafka/pull/2642#issuecomment-287372367>. > > > > Thanks > > Eno > > > > > > > On 17 Mar 2017, at 15:19, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > > > We also face same issues. > > > What we have found is that rocksdb is the issue. With many instances of > > > rocksdb per machine, over the time it slows down due to i/o operations, > > > resulting in threads getting evicted because max.poll.interval exceeds > > the > > > set limit. > > > > > > Try running rocksdb in memory https://github.com/facebook/ > > > rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F. > > > > > > Thanks > > > Sachin > > > > > > > > > > > > On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <skyah...@gmail.com> wrote: > > > > > >> Hi Eno, > > >> > > >> I used 150, 50, 20 threads and the probabilities of crashing decreased > > with > > >> this number. When using 1 thread, no crash! > > >> > > >> My max.poll.interval is 5 minutes and all the processing won't last > that > > >> long, so that parameter does not help. > > >> > > >> > > >> Thanks > > >> Tianji > > >> > > >> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <eno.there...@gmail.com > > > > >> wrote: > > >> > > >>> Hi Tianji, > > >>> > > >>> How many threads does your app use? > > >>> > > >>> One reason is explained here: https://groups.google.com/ > > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g < > > https://groups.google.com/ > > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to > > >> increase > > >>> max.poll.interval config value. > > >>> If that doesn't work, could you revert to using one thread for now. > > Also > > >>> let us know either way since we might need to open a bug report. > > >>> > > >>> Thanks > > >>> Eno > > >>> > > >>>> On 16 Mar 2017, at 20:47, Tianji Li <skyah...@gmail.com> wrote: > > >>>> > > >>>> Hi there, > > >>>> > > >>>> I always got this crashes and wonder if anyone knows why. Please let > > me > > >>>> know what information I should provide to help with trouble > shooting. > > >>>> > > >>>> I am using 0.10.2.0. My application is reading one topic and then > > >>>> groupBy().aggregate() 50 times on different keys. > > >>>> > > >>>> I use memory store, without backing to kafka. > > >>>> > > >>>> Thanks > > >>>> Tianji > > >>>> > > >>>> > > >>>> 2017-03-16 16:37:14.060 WARN 26139 --- [StreamThread-14] > > >>>> o.a.k.s.p.internals.StreamThread : Could not create task > 0_4. > > >>> Will > > >>>> retry. > > >>>> > > >>>> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to > > >> lock > > >>>> the state directory: /tmp/kafka-streams/xxx-test28/0_4 > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals. > > >>> ProcessorStateManager.<init>(ProcessorStateManager.java:102) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>( > > >>> AbstractTask.java:73) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals. > > >>> StreamTask.<init>(StreamTask.java:108) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals. > > >>> StreamThread.createStreamTask(StreamThread.java:834) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals.StreamThread$Ta > > skCreator. > > >>> createTask(StreamThread.java:1207) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals.StreamThread$ > > >>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals. > > >>> StreamThread.addStreamTasks(StreamThread.java:937) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals. > StreamThread.access$500( > > >>> StreamThread.java:69) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals.StreamThread$1. > > >>> onPartitionsAssigned(StreamThread.java:236) > > >>>> at > > >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > > >>> onJoinComplete(ConsumerCoordinator.java:255) > > >>>> at > > >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > >>> joinGroupIfNeeded(AbstractCoordinator.java:339) > > >>>> at > > >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > >>> ensureActiveGroup(AbstractCoordinator.java:303) > > >>>> at > > >>>> org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator.poll( > > >>> ConsumerCoordinator.java:286) > > >>>> at > > >>>> org.apache.kafka.clients.consumer.KafkaConsumer. > > >>> pollOnce(KafkaConsumer.java:1030) > > >>>> at > > >>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > >>> KafkaConsumer.java:995) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > >>> StreamThread.java:582) > > >>>> at > > >>>> org.apache.kafka.streams.processor.internals. > > >>> StreamThread.run(StreamThread.java:368) > > >>> > > >>> > > >> > > > > > > > -- > -- Guozhang >