Hi Debasish, It might be that it is blocked in `streams.close()` You might want to to try the overload that has a long and TimeUnit as params, i.e., `streams.close(1, TimeUnit.MINUTES)`
Thanks, Damian On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi - > > I have a Kafka streams application deployed on a Mesos DC/OS cluster. While > the application was running, Kafka suddenly reported to be unhealthy and > the application got an exception .. > > 07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ - > > Stream terminated because of uncaught exception .. Shutting down app > > org.apache.kafka.streams.errors.StreamsException: task [1_0] exception > > caught when producing > > at > > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121) > > at > > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129) > > at > > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76) > > at > > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 205 > > record(s) for > > kstream-log-processing-windowed-access-count-per-host-repartition-0: > 30020 > > ms has passed since last attempt plus backoff time > > 07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ > > - Stopping http service .. > > 07:45:16.606 TKD [StreamThread-1] INFO > > c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server > > 07:45:16.607 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ > > - Stopping streams service .. > > 07:45:16.608 TKD [StreamThread-1] INFO > > o.apache.kafka.streams.KafkaStreams - stream-client > > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State > > transition from RUNNING to PENDING_SHUTDOWN. > > 07:45:16.608 TKD [kafka-streams-close-thread] INFO > > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > Informed > > thread to shut down > > 07:45:16.609 TKD [kafka-streams-close-thread] WARN > > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN. > > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO > > o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with > > timeoutMillis = 60000 ms. > > > The streams application stopped and I have the following exception handler > registered .. > > // need to exit for any stream exception > // mesos will restart the application > streams.setUncaughtExceptionHandler(new > Thread.UncaughtExceptionHandler() { > override def uncaughtException(t: Thread, e: Throwable): Unit = try { > logger.error(s"Stream terminated because of uncaught exception .. > Shutting down app", e) > logger.error(s"Stopping http service ..") > restService.stop() > logger.error(s"Stopping streams service ..") > streams.close() > } catch { > case _: Exception => > } finally { > System.exit(-1) > } > }) > > Ideally the application should terminate and Mesos should have restarted > it. But I see that the application doesn't terminate though I have a > System.exit(-1) in the finally clause. Any idea what's happening or how can > I make the application terminate .. > > Any help will be appreciated .. > > regards. > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >