Hi,

It looks to me that there is currently no leader for the partition, i.e.,
leader -1. Also there are no replicas? Something up with your brokers?

Thanks,
Damian

On Wed, 26 Jul 2017 at 12:34 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi Damian -
>
> Yes, it exists .. It's actually a change log topic corresponding to the
> state store log-count
>
> $ dcos confluent-kafka topic describe
> kstream-log-count-log-counts-changelog
> {
>   "partitions": [
>     {
>       "0": {
>         "leader": -1,
>         "controller_epoch": 3,
>         "isr": [],
>         "leader_epoch": 3,
>         "version": 1
>       }
>     }
>   ]
> }
>
> Also 1 point to note is that when Mesos restarts the process it starts in a
> different node. So the local state store will not exist there. But I expect
> Kafka will create it from the corresponding backed up topic. Hence the
> exception looks a bit confusing to me.
>
> Thoughts ?
>
> regards.
>
> On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > The exception indicates that streams was unable to find that
> > topic-partition on the kafka brokers. Can you verify that it exists?
> > Also, i'm assuming you are on 0.10.2.x?
> >
> > On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com>
> > wrote:
> >
> > > Thanks Damien .. this worked. But now after the application restarts, I
> > > see the following exception ..
> > >
> > > 09:41:26.516 TKD [StreamThread-1] ERROR
> > >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of
> > >> uncaught exception .. Shutting down app
> > >> org.apache.kafka.streams.errors.StreamsException: stream-thread
> > >> [StreamThread-1] Failed to rebalance
> > >>         at
> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:598)
> > >>         at
> > >> org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:361)
> > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_0]
> > >> Store log-counts's change log (kstream-log-count-log-counts-changelog)
> > does
> > >> not contain partition 0
> > >>         at
> > >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > register(ProcessorStateManager.java:188)
> > >>         at
> > >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> > register(AbstractProcessorContext.java:99)
> > >
> > >
> > > I found this thread ..
> > > https://stackoverflow.com/questions/42329387/failed-to-
> > rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
> > > but unlike this use case I don't make any change in the partition of
> any
> > > topic in between the restarts. BTW my application uses stateful
> streaming
> > > and hence Kafka creates any internal topics. Not sure if it's related
> to
> > > this exception though. But the store name mentioned in the exception
> > > (log-count) is one for stateful streaming.
> > >
> > > regards.
> > >
> > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com>
> > wrote:
> > >
> > >> Hi Debasish,
> > >>
> > >> It might be that it is blocked in `streams.close()`
> > >> You might want to to try the overload that has a long and TimeUnit as
> > >> params, i.e., `streams.close(1, TimeUnit.MINUTES)`
> > >>
> > >> Thanks,
> > >> Damian
> > >>
> > >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com
> >
> > >> wrote:
> > >>
> > >>> Hi -
> > >>>
> > >>> I have a Kafka streams application deployed on a Mesos DC/OS cluster.
> > >>> While
> > >>> the application was running, Kafka suddenly reported to be unhealthy
> > and
> > >>> the application got an exception ..
> > >>>
> > >>> 07:45:16.606 TKD [StreamThread-1] ERROR
> > >>> c.l.f.s.kstream.WeblogProcessing$ -
> > >>> > Stream terminated because of uncaught exception .. Shutting down
> app
> > >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0]
> > exception
> > >>> > caught when producing
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:121)
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.
> > RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:76)
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:280)
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:807)
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:794)
> > >>> >         at
> > >>> >
> > >>>
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:769)
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:647)
> > >>> >         at
> > >>> >
> > >>> org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:361)
> > >>> > Caused by: org.apache.kafka.common.errors.TimeoutException:
> Expiring
> > >>> 205
> > >>> > record(s) for
> > >>> >
> kstream-log-processing-windowed-access-count-per-host-repartition-0:
> > >>> 30020
> > >>> > ms has passed since last attempt plus backoff time
> > >>> > 07:45:16.606 TKD [StreamThread-1] ERROR
> > >>> c.l.f.s.kstream.WeblogProcessing$
> > >>> > - Stopping http service ..
> > >>> > 07:45:16.606 TKD [StreamThread-1] INFO
> > >>> >  c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server
> > >>> > 07:45:16.607 TKD [StreamThread-1] ERROR
> > >>> c.l.f.s.kstream.WeblogProcessing$
> > >>> > - Stopping streams service ..
> > >>> > 07:45:16.608 TKD [StreamThread-1] INFO
> > >>> >  o.apache.kafka.streams.KafkaStreams - stream-client
> > >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State
> > >>> > transition from RUNNING to PENDING_SHUTDOWN.
> > >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO
> > >>> >  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> > >>> Informed
> > >>> > thread to shut down
> > >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN
> > >>> >  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> > >>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN.
> > >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO
> > >>> >  o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer
> > with
> > >>> > timeoutMillis = 60000 ms.
> > >>>
> > >>>
> > >>> The streams application stopped and I have the following exception
> > >>> handler
> > >>> registered ..
> > >>>
> > >>>     // need to exit for any stream exception
> > >>>     // mesos will restart the application
> > >>>     streams.setUncaughtExceptionHandler(new
> > >>> Thread.UncaughtExceptionHandler() {
> > >>>       override def uncaughtException(t: Thread, e: Throwable): Unit =
> > >>> try {
> > >>>         logger.error(s"Stream terminated because of uncaught
> exception
> > ..
> > >>> Shutting down app", e)
> > >>>         logger.error(s"Stopping http service ..")
> > >>>         restService.stop()
> > >>>         logger.error(s"Stopping streams service ..")
> > >>>         streams.close()
> > >>>       } catch {
> > >>>         case _: Exception =>
> > >>>       } finally {
> > >>>         System.exit(-1)
> > >>>       }
> > >>>     })
> > >>>
> > >>> Ideally the application should terminate and Mesos should have
> > restarted
> > >>> it. But I see that the application doesn't terminate though I have a
> > >>> System.exit(-1) in the finally clause. Any idea what's happening or
> how
> > >>> can
> > >>> I make the application terminate ..
> > >>>
> > >>> Any help will be appreciated ..
> > >>>
> > >>> regards.
> > >>>
> > >>>
> > >>> --
> > >>> Debasish Ghosh
> > >>> http://manning.com/ghosh2
> > >>> http://manning.com/ghosh
> > >>>
> > >>> Twttr: @debasishg
> > >>> Blog: http://debasishg.blogspot.com
> > >>> Code: http://github.com/debasishg
> > >>>
> > >>
> > >
> > >
> > > --
> > > Debasish Ghosh
> > > http://manning.com/ghosh2
> > > http://manning.com/ghosh
> > >
> > > Twttr: @debasishg
> > > Blog: http://debasishg.blogspot.com
> > > Code: http://github.com/debasishg
> > >
> >
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to