Hi Guys,

Great information again as usual, very helpful!

Very appreciated, thanks so much!

Tianji

PS: The Kafka Community is simply great!



On Fri, Mar 17, 2017 at 3:00 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Tianji and Sachin (and also cc'ing people who I remember have reported
> similar RocksDB memory issues),
>
> Sharing my experience with RocksDB tuning and also chatting with the
> RocksDB community:
>
> 1. If you are frequently flushing the state stores (e.g. with high commit
> frequency) then you will end up with huge number of very small memtable
> files, and hence result in very high compaction pressure on RocksDB; if you
> use default number of compaction threads (1) it will not be able to catch
> up with the write throughput and compaction rate, and hence the gradual
> degradation of performance. We have changed the default
> num.compaction.threads in trunk but if you are under released version
> 0.10.2 or older, check your store's flush rate metrics and consider
> increasing the compaction threads.
>
> 2. The most common memory leaks from RocksDB JNI are iterator leaks. Make
> sure to close the iterator return for your range queries / fetches from the
> stores when you are done. If not the corresponding scanned memory will be
> pinned in memory and cannot be compacted.
>
>
> Guozhang
>
>
> On Fri, Mar 17, 2017 at 8:56 AM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
> > Sachin, you also have a PR for this that could help, right?:
> > https://github.com/apache/kafka/pull/2642#issuecomment-287372367 <
> > https://github.com/apache/kafka/pull/2642#issuecomment-287372367>.
> >
> > Thanks
> > Eno
> >
> >
> > > On 17 Mar 2017, at 15:19, Sachin Mittal <sjmit...@gmail.com> wrote:
> > >
> > > We also face same issues.
> > > What we have found is that rocksdb is the issue. With many instances of
> > > rocksdb per machine, over the time it slows down due to i/o operations,
> > > resulting in threads getting evicted because max.poll.interval exceeds
> > the
> > > set limit.
> > >
> > > Try running rocksdb in memory https://github.com/facebook/
> > > rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <skyah...@gmail.com> wrote:
> > >
> > >> Hi Eno,
> > >>
> > >> I used 150, 50, 20 threads and the probabilities of crashing decreased
> > with
> > >> this number. When using 1 thread, no crash!
> > >>
> > >> My max.poll.interval is 5 minutes and all the processing won't last
> that
> > >> long, so that parameter does not help.
> > >>
> > >>
> > >> Thanks
> > >> Tianji
> > >>
> > >> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <eno.there...@gmail.com
> >
> > >> wrote:
> > >>
> > >>> Hi Tianji,
> > >>>
> > >>> How many threads does your app use?
> > >>>
> > >>> One reason is explained here: https://groups.google.com/
> > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g <
> > https://groups.google.com/
> > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
> > >> increase
> > >>> max.poll.interval config value.
> > >>> If that doesn't work, could you revert to using one thread for now.
> > Also
> > >>> let us know either way since we might need to open a bug report.
> > >>>
> > >>> Thanks
> > >>> Eno
> > >>>
> > >>>> On 16 Mar 2017, at 20:47, Tianji Li <skyah...@gmail.com> wrote:
> > >>>>
> > >>>> Hi there,
> > >>>>
> > >>>> I always got this crashes and wonder if anyone knows why. Please let
> > me
> > >>>> know what information I should provide to help with trouble
> shooting.
> > >>>>
> > >>>> I am using 0.10.2.0. My application is reading one topic and then
> > >>>> groupBy().aggregate() 50 times on different keys.
> > >>>>
> > >>>> I use memory store, without backing to kafka.
> > >>>>
> > >>>> Thanks
> > >>>> Tianji
> > >>>>
> > >>>>
> > >>>> 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> > >>>> o.a.k.s.p.internals.StreamThread         : Could not create task
> 0_4.
> > >>> Will
> > >>>> retry.
> > >>>>
> > >>>> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
> > >> lock
> > >>>> the state directory: /tmp/kafka-streams/xxx-test28/0_4
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > >>> AbstractTask.java:73)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> StreamTask.<init>(StreamTask.java:108)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> StreamThread.createStreamTask(StreamThread.java:834)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.StreamThread$Ta
> > skCreator.
> > >>> createTask(StreamThread.java:1207)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> StreamThread.addStreamTasks(StreamThread.java:937)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> StreamThread.access$500(
> > >>> StreamThread.java:69)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
> > >>> onPartitionsAssigned(StreamThread.java:236)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > >>> onJoinComplete(ConsumerCoordinator.java:255)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >>> joinGroupIfNeeded(AbstractCoordinator.java:339)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >>> ensureActiveGroup(AbstractCoordinator.java:303)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > >>> ConsumerCoordinator.java:286)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > >>> pollOnce(KafkaConsumer.java:1030)
> > >>>>       at
> > >>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >>> KafkaConsumer.java:995)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >>> StreamThread.java:582)
> > >>>>       at
> > >>>> org.apache.kafka.streams.processor.internals.
> > >>> StreamThread.run(StreamThread.java:368)
> > >>>
> > >>>
> > >>
> >
> >
>
>
> --
> -- Guozhang
>

Reply via email to