I think Damina's finding is correct regarding the consumer bug, and there is a PR being worked on already: https://github.com/apache/kafka/pull/3489/files
Guozhang On Tue, Jul 4, 2017 at 10:04 AM, Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Thanks! > > On Tue, Jul 4, 2017 at 10:28 PM, Damian Guy <damian....@gmail.com> wrote: > > > Yes, System.exit(..) > > streams.close(..) just attempts to stop any running stream threads. > > > > On Tue, 4 Jul 2017 at 17:49 Debasish Ghosh <ghosh.debas...@gmail.com> > > wrote: > > > >> Hi Damien - > >> > >> Just 1 question .. by "terminate the process" do you mean > System.exit(..) > >> ? > >> Because streams.close() will not terminate the process - right ? > >> > >> regards. > >> > >> On Tue, Jul 4, 2017 at 9:36 PM, Debasish Ghosh < > ghosh.debas...@gmail.com> > >> wrote: > >> > >> > Hi Damian - > >> > > >> > I also thought so .. yes, I will add `KafkaStreams#setUncaughtE > >> > xceptionHandler(...)` and Mesos should restart the process .. Thanks > for > >> > your prompt response .. > >> > > >> > regards. > >> > > >> > On Tue, Jul 4, 2017 at 9:30 PM, Damian Guy <damian....@gmail.com> > >> wrote: > >> > > >> >> Hi Debasish, > >> >> > >> >> It looks like it is possibly a bug in the Kafka Consumer code. > >> >> In your streams app you probably want to add an > >> UncaughtExceptionHandler, > >> >> i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and > terminate > >> >> the > >> >> process when you receive an uncaught exception. I guess Mesos should > >> >> automatically restart it for you, then? > >> >> > >> >> Thanks, > >> >> Damian > >> >> > >> >> On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh <ghosh.debas...@gmail.com > > > >> >> wrote: > >> >> > >> >> > Hi - > >> >> > > >> >> > I have been running a streaming application on some data set. > Things > >> >> > usually run ok. Today I was trying to run the same application on > >> Kafka > >> >> > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. > >> After > >> >> > running for quite some time, I got the following exception .. > >> >> > > >> >> > Exception in thread "StreamThread-1" java.lang. > >> IllegalStateException: > >> >> > > Attempt to retrieve exception from future which hasn't failed > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.clients.consumer.internals.RequestFuture. > >> >> exception(RequestFuture.java:99) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.clients.consumer.internals.RequestFuture. > >> >> isRetriable(RequestFuture.java:89) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > >> >> KafkaConsumer.java:1124) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamTask. > >> >> commitOffsets(StreamTask.java:296) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamTask$1. > >> >> run(StreamTask.java:79) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI > >> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamTask. > >> >> commit(StreamTask.java:280) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> >> commitOne(StreamThread.java:807) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> >> commitAll(StreamThread.java:794) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> >> maybeCommit(StreamThread.java:769) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> >> runLoop(StreamThread.java:647) > >> >> > > at > >> >> > > > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> >> run(StreamThread.java:361) > >> >> > > >> >> > > >> >> > Looks like some internal processing failed and control went to an > >> >> > unexpected path. I have 2 questions .. > >> >> > > >> >> > 1. any idea why this could happen ? I don't think it's related > to > >> >> Mesos > >> >> > DC/OS though - may be some concurrency issue ? > >> >> > 2. how do I recover from such errors ? The stream processor has > >> >> stopped > >> >> > and the only way out is to restart the application. > >> >> > > >> >> > regards. > >> >> > > >> >> > -- > >> >> > Debasish Ghosh > >> >> > http://manning.com/ghosh2 > >> >> > http://manning.com/ghosh > >> >> > > >> >> > Twttr: @debasishg > >> >> > Blog: http://debasishg.blogspot.com > >> >> > Code: http://github.com/debasishg > >> >> > > >> >> > >> > > >> > > >> > > >> > -- > >> > Debasish Ghosh > >> > http://manning.com/ghosh2 > >> > http://manning.com/ghosh > >> > > >> > Twttr: @debasishg > >> > Blog: http://debasishg.blogspot.com > >> > Code: http://github.com/debasishg > >> > > >> > >> > >> > >> -- > >> Debasish Ghosh > >> http://manning.com/ghosh2 > >> http://manning.com/ghosh > >> > >> Twttr: @debasishg > >> Blog: http://debasishg.blogspot.com > >> Code: http://github.com/debasishg > >> > > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- -- Guozhang