Hi Damien - Just 1 question .. by "terminate the process" do you mean System.exit(..) ? Because streams.close() will not terminate the process - right ?
regards. On Tue, Jul 4, 2017 at 9:36 PM, Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi Damian - > > I also thought so .. yes, I will add `KafkaStreams#setUncaughtE > xceptionHandler(...)` and Mesos should restart the process .. Thanks for > your prompt response .. > > regards. > > On Tue, Jul 4, 2017 at 9:30 PM, Damian Guy <damian....@gmail.com> wrote: > >> Hi Debasish, >> >> It looks like it is possibly a bug in the Kafka Consumer code. >> In your streams app you probably want to add an UncaughtExceptionHandler, >> i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and terminate >> the >> process when you receive an uncaught exception. I guess Mesos should >> automatically restart it for you, then? >> >> Thanks, >> Damian >> >> On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >> > Hi - >> > >> > I have been running a streaming application on some data set. Things >> > usually run ok. Today I was trying to run the same application on Kafka >> > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After >> > running for quite some time, I got the following exception .. >> > >> > Exception in thread "StreamThread-1" java.lang.IllegalStateException: >> > > Attempt to retrieve exception from future which hasn't failed >> > > at >> > > >> > org.apache.kafka.clients.consumer.internals.RequestFuture. >> exception(RequestFuture.java:99) >> > > at >> > > >> > org.apache.kafka.clients.consumer.internals.RequestFuture. >> isRetriable(RequestFuture.java:89) >> > > at >> > > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> tor.commitOffsetsSync(ConsumerCoordinator.java:590) >> > > at >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( >> KafkaConsumer.java:1124) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask. >> commitOffsets(StreamTask.java:296) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask$1. >> run(StreamTask.java:79) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask. >> commit(StreamTask.java:280) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. >> commitOne(StreamThread.java:807) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. >> commitAll(StreamThread.java:794) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. >> maybeCommit(StreamThread.java:769) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. >> runLoop(StreamThread.java:647) >> > > at >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. >> run(StreamThread.java:361) >> > >> > >> > Looks like some internal processing failed and control went to an >> > unexpected path. I have 2 questions .. >> > >> > 1. any idea why this could happen ? I don't think it's related to >> Mesos >> > DC/OS though - may be some concurrency issue ? >> > 2. how do I recover from such errors ? The stream processor has >> stopped >> > and the only way out is to restart the application. >> > >> > regards. >> > >> > -- >> > Debasish Ghosh >> > http://manning.com/ghosh2 >> > http://manning.com/ghosh >> > >> > Twttr: @debasishg >> > Blog: http://debasishg.blogspot.com >> > Code: http://github.com/debasishg >> > >> > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg