We also face same issues.
What we have found is that rocksdb is the issue. With many instances of
rocksdb per machine, over the time it slows down due to i/o operations,
resulting in threads getting evicted because max.poll.interval exceeds the
set limit.

Try running rocksdb in memory https://github.com/facebook/
rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.

Thanks
Sachin



On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <skyah...@gmail.com> wrote:

> Hi Eno,
>
> I used 150, 50, 20 threads and the probabilities of crashing decreased with
> this number. When using 1 thread, no crash!
>
> My max.poll.interval is 5 minutes and all the processing won't last that
> long, so that parameter does not help.
>
>
> Thanks
> Tianji
>
> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
> > Hi Tianji,
> >
> > How many threads does your app use?
> >
> > One reason is explained here: https://groups.google.com/
> > forum/#!topic/confluent-platform/wgCSuwIJo5g <https://groups.google.com/
> > forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
> increase
> > max.poll.interval config value.
> > If that doesn't work, could you revert to using one thread for now. Also
> > let us know either way since we might need to open a bug report.
> >
> > Thanks
> > Eno
> >
> > > On 16 Mar 2017, at 20:47, Tianji Li <skyah...@gmail.com> wrote:
> > >
> > > Hi there,
> > >
> > > I always got this crashes and wonder if anyone knows why. Please let me
> > > know what information I should provide to help with trouble shooting.
> > >
> > > I am using 0.10.2.0. My application is reading one topic and then
> > > groupBy().aggregate() 50 times on different keys.
> > >
> > > I use memory store, without backing to kafka.
> > >
> > > Thanks
> > > Tianji
> > >
> > >
> > > 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> > > o.a.k.s.p.internals.StreamThread         : Could not create task 0_4.
> > Will
> > > retry.
> > >
> > > org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
> lock
> > > the state directory: /tmp/kafka-streams/xxx-test28/0_4
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> > >        at
> > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > AbstractTask.java:73)
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.<init>(StreamTask.java:108)
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.createStreamTask(StreamThread.java:834)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> > createTask(StreamThread.java:1207)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread$
> > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.addStreamTasks(StreamThread.java:937)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> > StreamThread.java:69)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > onPartitionsAssigned(StreamThread.java:236)
> > >        at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > onJoinComplete(ConsumerCoordinator.java:255)
> > >        at
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > joinGroupIfNeeded(AbstractCoordinator.java:339)
> > >        at
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > ensureActiveGroup(AbstractCoordinator.java:303)
> > >        at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > ConsumerCoordinator.java:286)
> > >        at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1030)
> > >        at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > >        at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:582)
> > >        at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> >
> >
>

Reply via email to