Thanks Damien .. this worked. But now after the application restarts, I see the following exception ..
09:41:26.516 TKD [StreamThread-1] ERROR > c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of > uncaught exception .. Shutting down app > org.apache.kafka.streams.errors.StreamsException: stream-thread > [StreamThread-1] Failed to rebalance > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] > Store log-counts's change log (kstream-log-count-log-counts-changelog) does > not contain partition 0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:188) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) I found this thread .. https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition but unlike this use case I don't make any change in the partition of any topic in between the restarts. BTW my application uses stateful streaming and hence Kafka creates any internal topics. Not sure if it's related to this exception though. But the store name mentioned in the exception (log-count) is one for stateful streaming. regards. On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com> wrote: > Hi Debasish, > > It might be that it is blocked in `streams.close()` > You might want to to try the overload that has a long and TimeUnit as > params, i.e., `streams.close(1, TimeUnit.MINUTES)` > > Thanks, > Damian > > On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Hi - >> >> I have a Kafka streams application deployed on a Mesos DC/OS cluster. >> While >> the application was running, Kafka suddenly reported to be unhealthy and >> the application got an exception .. >> >> 07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ >> - >> > Stream terminated because of uncaught exception .. Shutting down app >> > org.apache.kafka.streams.errors.StreamsException: task [1_0] exception >> > caught when producing >> > at >> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >> checkForException(RecordCollectorImpl.java:121) >> > at >> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush( >> RecordCollectorImpl.java:129) >> > at >> > org.apache.kafka.streams.processor.internals. >> StreamTask$1.run(StreamTask.java:76) >> > at >> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:188) >> > at >> > org.apache.kafka.streams.processor.internals. >> StreamTask.commit(StreamTask.java:280) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( >> StreamThread.java:807) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( >> StreamThread.java:794) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( >> StreamThread.java:769) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:647) >> > at >> > org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring >> 205 >> > record(s) for >> > kstream-log-processing-windowed-access-count-per-host-repartition-0: >> 30020 >> > ms has passed since last attempt plus backoff time >> > 07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream. >> WeblogProcessing$ >> > - Stopping http service .. >> > 07:45:16.606 TKD [StreamThread-1] INFO >> > c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server >> > 07:45:16.607 TKD [StreamThread-1] ERROR c.l.f.s.kstream. >> WeblogProcessing$ >> > - Stopping streams service .. >> > 07:45:16.608 TKD [StreamThread-1] INFO >> > o.apache.kafka.streams.KafkaStreams - stream-client >> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State >> > transition from RUNNING to PENDING_SHUTDOWN. >> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO >> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] >> Informed >> > thread to shut down >> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN >> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] >> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN. >> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO >> > o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with >> > timeoutMillis = 60000 ms. >> >> >> The streams application stopped and I have the following exception handler >> registered .. >> >> // need to exit for any stream exception >> // mesos will restart the application >> streams.setUncaughtExceptionHandler(new >> Thread.UncaughtExceptionHandler() { >> override def uncaughtException(t: Thread, e: Throwable): Unit = try >> { >> logger.error(s"Stream terminated because of uncaught exception .. >> Shutting down app", e) >> logger.error(s"Stopping http service ..") >> restService.stop() >> logger.error(s"Stopping streams service ..") >> streams.close() >> } catch { >> case _: Exception => >> } finally { >> System.exit(-1) >> } >> }) >> >> Ideally the application should terminate and Mesos should have restarted >> it. But I see that the application doesn't terminate though I have a >> System.exit(-1) in the finally clause. Any idea what's happening or how >> can >> I make the application terminate .. >> >> Any help will be appreciated .. >> >> regards. >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg