Hi Damian - I also thought so .. yes, I will add `KafkaStreams# setUncaughtExceptionHandler(...)` and Mesos should restart the process .. Thanks for your prompt response ..
regards. On Tue, Jul 4, 2017 at 9:30 PM, Damian Guy <damian....@gmail.com> wrote: > Hi Debasish, > > It looks like it is possibly a bug in the Kafka Consumer code. > In your streams app you probably want to add an UncaughtExceptionHandler, > i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and terminate the > process when you receive an uncaught exception. I guess Mesos should > automatically restart it for you, then? > > Thanks, > Damian > > On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > > > Hi - > > > > I have been running a streaming application on some data set. Things > > usually run ok. Today I was trying to run the same application on Kafka > > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After > > running for quite some time, I got the following exception .. > > > > Exception in thread "StreamThread-1" java.lang.IllegalStateException: > > > Attempt to retrieve exception from future which hasn't failed > > > at > > > > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception( > RequestFuture.java:99) > > > at > > > > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable( > RequestFuture.java:89) > > > at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > commitOffsetsSync(ConsumerCoordinator.java:590) > > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > commitSync(KafkaConsumer.java:1124) > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets( > StreamTask.java:296) > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:79) > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:188) > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:280) > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > StreamThread.java:807) > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > StreamThread.java:794) > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > StreamThread.java:769) > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:647) > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:361) > > > > > > Looks like some internal processing failed and control went to an > > unexpected path. I have 2 questions .. > > > > 1. any idea why this could happen ? I don't think it's related to > Mesos > > DC/OS though - may be some concurrency issue ? > > 2. how do I recover from such errors ? The stream processor has > stopped > > and the only way out is to restart the application. > > > > regards. > > > > -- > > Debasish Ghosh > > http://manning.com/ghosh2 > > http://manning.com/ghosh > > > > Twttr: @debasishg > > Blog: http://debasishg.blogspot.com > > Code: http://github.com/debasishg > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg