Hi Debasish,

It might be that it is blocked in `streams.close()`
You might want to to try the overload that has a long and TimeUnit as
params, i.e., `streams.close(1, TimeUnit.MINUTES)`

Thanks,
Damian

On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi -
>
> I have a Kafka streams application deployed on a Mesos DC/OS cluster. While
> the application was running, Kafka suddenly reported to be unhealthy and
> the application got an exception ..
>
> 07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ -
> > Stream terminated because of uncaught exception .. Shutting down app
> > org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
> > caught when producing
> >         at
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
> >         at
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> >         at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 205
> > record(s) for
> > kstream-log-processing-windowed-access-count-per-host-repartition-0:
> 30020
> > ms has passed since last attempt plus backoff time
> > 07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$
> > - Stopping http service ..
> > 07:45:16.606 TKD [StreamThread-1] INFO
> >  c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server
> > 07:45:16.607 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$
> > - Stopping streams service ..
> > 07:45:16.608 TKD [StreamThread-1] INFO
> >  o.apache.kafka.streams.KafkaStreams - stream-client
> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State
> > transition from RUNNING to PENDING_SHUTDOWN.
> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO
> >  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> Informed
> > thread to shut down
> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN
> >  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN.
> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO
> >  o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with
> > timeoutMillis = 60000 ms.
>
>
> The streams application stopped and I have the following exception handler
> registered ..
>
>     // need to exit for any stream exception
>     // mesos will restart the application
>     streams.setUncaughtExceptionHandler(new
> Thread.UncaughtExceptionHandler() {
>       override def uncaughtException(t: Thread, e: Throwable): Unit = try {
>         logger.error(s"Stream terminated because of uncaught exception ..
> Shutting down app", e)
>         logger.error(s"Stopping http service ..")
>         restService.stop()
>         logger.error(s"Stopping streams service ..")
>         streams.close()
>       } catch {
>         case _: Exception =>
>       } finally {
>         System.exit(-1)
>       }
>     })
>
> Ideally the application should terminate and Mesos should have restarted
> it. But I see that the application doesn't terminate though I have a
> System.exit(-1) in the finally clause. Any idea what's happening or how can
> I make the application terminate ..
>
> Any help will be appreciated ..
>
> regards.
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to