Hi - I have a Kafka streams application deployed on a Mesos DC/OS cluster. While the application was running, Kafka suddenly reported to be unhealthy and the application got an exception ..
07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ - > Stream terminated because of uncaught exception .. Shutting down app > org.apache.kafka.streams.errors.StreamsException: task [1_0] exception > caught when producing > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 205 > record(s) for > kstream-log-processing-windowed-access-count-per-host-repartition-0: 30020 > ms has passed since last attempt plus backoff time > 07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ > - Stopping http service .. > 07:45:16.606 TKD [StreamThread-1] INFO > c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server > 07:45:16.607 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ > - Stopping streams service .. > 07:45:16.608 TKD [StreamThread-1] INFO > o.apache.kafka.streams.KafkaStreams - stream-client > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State > transition from RUNNING to PENDING_SHUTDOWN. > 07:45:16.608 TKD [kafka-streams-close-thread] INFO > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Informed > thread to shut down > 07:45:16.609 TKD [kafka-streams-close-thread] WARN > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN. > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO > o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with > timeoutMillis = 60000 ms. The streams application stopped and I have the following exception handler registered .. // need to exit for any stream exception // mesos will restart the application streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { override def uncaughtException(t: Thread, e: Throwable): Unit = try { logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e) logger.error(s"Stopping http service ..") restService.stop() logger.error(s"Stopping streams service ..") streams.close() } catch { case _: Exception => } finally { System.exit(-1) } }) Ideally the application should terminate and Mesos should have restarted it. But I see that the application doesn't terminate though I have a System.exit(-1) in the finally clause. Any idea what's happening or how can I make the application terminate .. Any help will be appreciated .. regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg