The exception indicates that streams was unable to find that topic-partition on the kafka brokers. Can you verify that it exists? Also, i'm assuming you are on 0.10.2.x?
On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Thanks Damien .. this worked. But now after the application restarts, I > see the following exception .. > > 09:41:26.516 TKD [StreamThread-1] ERROR >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of >> uncaught exception .. Shutting down app >> org.apache.kafka.streams.errors.StreamsException: stream-thread >> [StreamThread-1] Failed to rebalance >> at >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) >> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] >> Store log-counts's change log (kstream-log-count-log-counts-changelog) does >> not contain partition 0 >> at >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:188) >> at >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > > > I found this thread .. > https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition > but unlike this use case I don't make any change in the partition of any > topic in between the restarts. BTW my application uses stateful streaming > and hence Kafka creates any internal topics. Not sure if it's related to > this exception though. But the store name mentioned in the exception > (log-count) is one for stateful streaming. > > regards. > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com> wrote: > >> Hi Debasish, >> >> It might be that it is blocked in `streams.close()` >> You might want to to try the overload that has a long and TimeUnit as >> params, i.e., `streams.close(1, TimeUnit.MINUTES)` >> >> Thanks, >> Damian >> >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> Hi - >>> >>> I have a Kafka streams application deployed on a Mesos DC/OS cluster. >>> While >>> the application was running, Kafka suddenly reported to be unhealthy and >>> the application got an exception .. >>> >>> 07:45:16.606 TKD [StreamThread-1] ERROR >>> c.l.f.s.kstream.WeblogProcessing$ - >>> > Stream terminated because of uncaught exception .. Shutting down app >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0] exception >>> > caught when producing >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) >>> > at >>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) >>> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring >>> 205 >>> > record(s) for >>> > kstream-log-processing-windowed-access-count-per-host-repartition-0: >>> 30020 >>> > ms has passed since last attempt plus backoff time >>> > 07:45:16.606 TKD [StreamThread-1] ERROR >>> c.l.f.s.kstream.WeblogProcessing$ >>> > - Stopping http service .. >>> > 07:45:16.606 TKD [StreamThread-1] INFO >>> > c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server >>> > 07:45:16.607 TKD [StreamThread-1] ERROR >>> c.l.f.s.kstream.WeblogProcessing$ >>> > - Stopping streams service .. >>> > 07:45:16.608 TKD [StreamThread-1] INFO >>> > o.apache.kafka.streams.KafkaStreams - stream-client >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State >>> > transition from RUNNING to PENDING_SHUTDOWN. >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO >>> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] >>> Informed >>> > thread to shut down >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN >>> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] >>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN. >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO >>> > o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with >>> > timeoutMillis = 60000 ms. >>> >>> >>> The streams application stopped and I have the following exception >>> handler >>> registered .. >>> >>> // need to exit for any stream exception >>> // mesos will restart the application >>> streams.setUncaughtExceptionHandler(new >>> Thread.UncaughtExceptionHandler() { >>> override def uncaughtException(t: Thread, e: Throwable): Unit = >>> try { >>> logger.error(s"Stream terminated because of uncaught exception .. >>> Shutting down app", e) >>> logger.error(s"Stopping http service ..") >>> restService.stop() >>> logger.error(s"Stopping streams service ..") >>> streams.close() >>> } catch { >>> case _: Exception => >>> } finally { >>> System.exit(-1) >>> } >>> }) >>> >>> Ideally the application should terminate and Mesos should have restarted >>> it. But I see that the application doesn't terminate though I have a >>> System.exit(-1) in the finally clause. Any idea what's happening or how >>> can >>> I make the application terminate .. >>> >>> Any help will be appreciated .. >>> >>> regards. >>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >