Hi, It looks to me that there is currently no leader for the partition, i.e., leader -1. Also there are no replicas? Something up with your brokers?
Thanks, Damian On Wed, 26 Jul 2017 at 12:34 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi Damian - > > Yes, it exists .. It's actually a change log topic corresponding to the > state store log-count > > $ dcos confluent-kafka topic describe > kstream-log-count-log-counts-changelog > { > "partitions": [ > { > "0": { > "leader": -1, > "controller_epoch": 3, > "isr": [], > "leader_epoch": 3, > "version": 1 > } > } > ] > } > > Also 1 point to note is that when Mesos restarts the process it starts in a > different node. So the local state store will not exist there. But I expect > Kafka will create it from the corresponding backed up topic. Hence the > exception looks a bit confusing to me. > > Thoughts ? > > regards. > > On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com> wrote: > > > The exception indicates that streams was unable to find that > > topic-partition on the kafka brokers. Can you verify that it exists? > > Also, i'm assuming you are on 0.10.2.x? > > > > On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com> > > wrote: > > > > > Thanks Damien .. this worked. But now after the application restarts, I > > > see the following exception .. > > > > > > 09:41:26.516 TKD [StreamThread-1] ERROR > > >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of > > >> uncaught exception .. Shutting down app > > >> org.apache.kafka.streams.errors.StreamsException: stream-thread > > >> [StreamThread-1] Failed to rebalance > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:598) > > >> at > > >> org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:361) > > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task > [0_0] > > >> Store log-counts's change log (kstream-log-count-log-counts-changelog) > > does > > >> not contain partition 0 > > >> at > > >> org.apache.kafka.streams.processor.internals.ProcessorStateManager. > > register(ProcessorStateManager.java:188) > > >> at > > >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext. > > register(AbstractProcessorContext.java:99) > > > > > > > > > I found this thread .. > > > https://stackoverflow.com/questions/42329387/failed-to- > > rebalance-error-in-kafka-streams-with-more-than-one-topic-partition > > > but unlike this use case I don't make any change in the partition of > any > > > topic in between the restarts. BTW my application uses stateful > streaming > > > and hence Kafka creates any internal topics. Not sure if it's related > to > > > this exception though. But the store name mentioned in the exception > > > (log-count) is one for stateful streaming. > > > > > > regards. > > > > > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com> > > wrote: > > > > > >> Hi Debasish, > > >> > > >> It might be that it is blocked in `streams.close()` > > >> You might want to to try the overload that has a long and TimeUnit as > > >> params, i.e., `streams.close(1, TimeUnit.MINUTES)` > > >> > > >> Thanks, > > >> Damian > > >> > > >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com > > > > >> wrote: > > >> > > >>> Hi - > > >>> > > >>> I have a Kafka streams application deployed on a Mesos DC/OS cluster. > > >>> While > > >>> the application was running, Kafka suddenly reported to be unhealthy > > and > > >>> the application got an exception .. > > >>> > > >>> 07:45:16.606 TKD [StreamThread-1] ERROR > > >>> c.l.f.s.kstream.WeblogProcessing$ - > > >>> > Stream terminated because of uncaught exception .. Shutting down > app > > >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0] > > exception > > >>> > caught when producing > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > > checkForException(RecordCollectorImpl.java:121) > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals. > > RecordCollectorImpl.flush(RecordCollectorImpl.java:129) > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals. > > StreamTask$1.run(StreamTask.java:76) > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > measureLatencyNs(StreamsMetricsImpl.java:188) > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals. > > StreamTask.commit(StreamTask.java:280) > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > > StreamThread.java:807) > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > > StreamThread.java:794) > > >>> > at > > >>> > > > >>> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > > StreamThread.java:769) > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:647) > > >>> > at > > >>> > > > >>> org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:361) > > >>> > Caused by: org.apache.kafka.common.errors.TimeoutException: > Expiring > > >>> 205 > > >>> > record(s) for > > >>> > > kstream-log-processing-windowed-access-count-per-host-repartition-0: > > >>> 30020 > > >>> > ms has passed since last attempt plus backoff time > > >>> > 07:45:16.606 TKD [StreamThread-1] ERROR > > >>> c.l.f.s.kstream.WeblogProcessing$ > > >>> > - Stopping http service .. > > >>> > 07:45:16.606 TKD [StreamThread-1] INFO > > >>> > c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server > > >>> > 07:45:16.607 TKD [StreamThread-1] ERROR > > >>> c.l.f.s.kstream.WeblogProcessing$ > > >>> > - Stopping streams service .. > > >>> > 07:45:16.608 TKD [StreamThread-1] INFO > > >>> > o.apache.kafka.streams.KafkaStreams - stream-client > > >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State > > >>> > transition from RUNNING to PENDING_SHUTDOWN. > > >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO > > >>> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > > >>> Informed > > >>> > thread to shut down > > >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN > > >>> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > > >>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN. > > >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO > > >>> > o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer > > with > > >>> > timeoutMillis = 60000 ms. > > >>> > > >>> > > >>> The streams application stopped and I have the following exception > > >>> handler > > >>> registered .. > > >>> > > >>> // need to exit for any stream exception > > >>> // mesos will restart the application > > >>> streams.setUncaughtExceptionHandler(new > > >>> Thread.UncaughtExceptionHandler() { > > >>> override def uncaughtException(t: Thread, e: Throwable): Unit = > > >>> try { > > >>> logger.error(s"Stream terminated because of uncaught > exception > > .. > > >>> Shutting down app", e) > > >>> logger.error(s"Stopping http service ..") > > >>> restService.stop() > > >>> logger.error(s"Stopping streams service ..") > > >>> streams.close() > > >>> } catch { > > >>> case _: Exception => > > >>> } finally { > > >>> System.exit(-1) > > >>> } > > >>> }) > > >>> > > >>> Ideally the application should terminate and Mesos should have > > restarted > > >>> it. But I see that the application doesn't terminate though I have a > > >>> System.exit(-1) in the finally clause. Any idea what's happening or > how > > >>> can > > >>> I make the application terminate .. > > >>> > > >>> Any help will be appreciated .. > > >>> > > >>> regards. > > >>> > > >>> > > >>> -- > > >>> Debasish Ghosh > > >>> http://manning.com/ghosh2 > > >>> http://manning.com/ghosh > > >>> > > >>> Twttr: @debasishg > > >>> Blog: http://debasishg.blogspot.com > > >>> Code: http://github.com/debasishg > > >>> > > >> > > > > > > > > > -- > > > Debasish Ghosh > > > http://manning.com/ghosh2 > > > http://manning.com/ghosh > > > > > > Twttr: @debasishg > > > Blog: http://debasishg.blogspot.com > > > Code: http://github.com/debasishg > > > > > > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >