Hi Damian -

I also thought so .. yes, I will add `KafkaStreams#
setUncaughtExceptionHandler(...)` and Mesos should restart the process ..
Thanks for your prompt response ..

regards.

On Tue, Jul 4, 2017 at 9:30 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi Debasish,
>
> It looks like it is possibly a bug in the Kafka Consumer code.
> In your streams app you probably want to add an UncaughtExceptionHandler,
> i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and terminate the
> process when you receive an uncaught exception. I guess Mesos should
> automatically restart it for you, then?
>
> Thanks,
> Damian
>
> On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
> > Hi -
> >
> > I have been running a streaming application on some data set. Things
> > usually run ok. Today I was trying to run the same application on Kafka
> > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> > running for quite some time, I got the following exception ..
> >
> > Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > > Attempt to retrieve exception from future which hasn't failed
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(
> RequestFuture.java:99)
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(
> RequestFuture.java:89)
> > > at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> commitOffsetsSync(ConsumerCoordinator.java:590)
> > > at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1124)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(
> StreamTask.java:296)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:79)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:280)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:807)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:794)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:769)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:647)
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> >
> >
> > Looks like some internal processing failed and control went to an
> > unexpected path. I have 2 questions ..
> >
> >    1. any idea why this could happen ? I don't think it's related to
> Mesos
> >    DC/OS though - may be some concurrency issue ?
> >    2. how do I recover from such errors ? The stream processor has
> stopped
> >    and the only way out is to restart the application.
> >
> > regards.
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to