On Wed, 26 Jul 2017 at 15:53 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> One of the brokers died. The good thing is that it's not a production
> cluster, it's just a demo cluster. I have no replicas. But I can knock off
> the current Kafka instance and have a new one.
>
>
That explains it.


> Just for my understanding, if I don't have a replica, how should such
> situations be handled ? And if I have replicas, is there any documentation
> that discusses how the leader for the partition will be decided in such
> situations, so that I can take care of things when I move to production.
>
>
If you don't have any replicas, and the broker with that partition goes
offline, then you won't be able to access that partition until the broker
comes back online. There are some docs on replication here:
https://kafka.apache.org/documentation/#replication

Thanks,
Damian


> regards.
>
> On Wed, Jul 26, 2017 at 7:51 PM, Damian Guy <damian....@gmail.com> wrote:
>
>> Hi,
>>
>> It looks to me that there is currently no leader for the partition, i.e.,
>> leader -1. Also there are no replicas? Something up with your brokers?
>>
>> Thanks,
>> Damian
>>
>> On Wed, 26 Jul 2017 at 12:34 Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Hi Damian -
>>>
>>> Yes, it exists .. It's actually a change log topic corresponding to the
>>> state store log-count
>>>
>>> $ dcos confluent-kafka topic describe
>>> kstream-log-count-log-counts-changelog
>>> {
>>>   "partitions": [
>>>     {
>>>       "0": {
>>>         "leader": -1,
>>>         "controller_epoch": 3,
>>>         "isr": [],
>>>         "leader_epoch": 3,
>>>         "version": 1
>>>       }
>>>     }
>>>   ]
>>> }
>>>
>>> Also 1 point to note is that when Mesos restarts the process it starts
>>> in a
>>> different node. So the local state store will not exist there. But I
>>> expect
>>> Kafka will create it from the corresponding backed up topic. Hence the
>>> exception looks a bit confusing to me.
>>>
>>> Thoughts ?
>>>
>>> regards.
>>>
>>> On Wed, Jul 26, 2017 at 3:43 PM, Damian Guy <damian....@gmail.com>
>>> wrote:
>>>
>>> > The exception indicates that streams was unable to find that
>>> > topic-partition on the kafka brokers. Can you verify that it exists?
>>> > Also, i'm assuming you are on 0.10.2.x?
>>> >
>>> > On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com>
>>> > wrote:
>>> >
>>> > > Thanks Damien .. this worked. But now after the application
>>> restarts, I
>>> > > see the following exception ..
>>> > >
>>> > > 09:41:26.516 TKD [StreamThread-1] ERROR
>>> > >> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of
>>> > >> uncaught exception .. Shutting down app
>>> > >> org.apache.kafka.streams.errors.StreamsException: stream-thread
>>> > >> [StreamThread-1] Failed to rebalance
>>> > >>         at
>>> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> > StreamThread.java:598)
>>> > >>         at
>>> > >> org.apache.kafka.streams.processor.internals.
>>> > StreamThread.run(StreamThread.java:361)
>>> > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>> [0_0]
>>> > >> Store log-counts's change log
>>> (kstream-log-count-log-counts-changelog)
>>> > does
>>> > >> not contain partition 0
>>> > >>         at
>>> > >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
>>> > register(ProcessorStateManager.java:188)
>>> > >>         at
>>> > >>
>>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
>>> > register(AbstractProcessorContext.java:99)
>>> > >
>>> > >
>>> > > I found this thread ..
>>> > > https://stackoverflow.com/questions/42329387/failed-to-
>>> > rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
>>> > > but unlike this use case I don't make any change in the partition of
>>> any
>>> > > topic in between the restarts. BTW my application uses stateful
>>> streaming
>>> > > and hence Kafka creates any internal topics. Not sure if it's
>>> related to
>>> > > this exception though. But the store name mentioned in the exception
>>> > > (log-count) is one for stateful streaming.
>>> > >
>>> > > regards.
>>> > >
>>> > > On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com>
>>> > wrote:
>>> > >
>>> > >> Hi Debasish,
>>> > >>
>>> > >> It might be that it is blocked in `streams.close()`
>>> > >> You might want to to try the overload that has a long and TimeUnit
>>> as
>>> > >> params, i.e., `streams.close(1, TimeUnit.MINUTES)`
>>> > >>
>>> > >> Thanks,
>>> > >> Damian
>>> > >>
>>> > >> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <
>>> ghosh.debas...@gmail.com>
>>> > >> wrote:
>>> > >>
>>> > >>> Hi -
>>> > >>>
>>> > >>> I have a Kafka streams application deployed on a Mesos DC/OS
>>> cluster.
>>> > >>> While
>>> > >>> the application was running, Kafka suddenly reported to be
>>> unhealthy
>>> > and
>>> > >>> the application got an exception ..
>>> > >>>
>>> > >>> 07:45:16.606 TKD [StreamThread-1] ERROR
>>> > >>> c.l.f.s.kstream.WeblogProcessing$ -
>>> > >>> > Stream terminated because of uncaught exception .. Shutting down
>>> app
>>> > >>> > org.apache.kafka.streams.errors.StreamsException: task [1_0]
>>> > exception
>>> > >>> > caught when producing
>>> > >>> >         at
>>> > >>> >
>>> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>> > checkForException(RecordCollectorImpl.java:121)
>>> > >>> >         at
>>> > >>> >
>>> > >>> org.apache.kafka.streams.processor.internals.
>>> > RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
>>> > >>> >         at
>>> > >>> >
>>> > >>> org.apache.kafka.streams.processor.internals.
>>> > StreamTask$1.run(StreamTask.java:76)
>>> > >>> >         at
>>> > >>> >
>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>>> > >>> >         at
>>> > >>> >
>>> > >>> org.apache.kafka.streams.processor.internals.
>>> > StreamTask.commit(StreamTask.java:280)
>>> > >>> >         at
>>> > >>> >
>>> > >>>
>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>>> > StreamThread.java:807)
>>> > >>> >         at
>>> > >>> >
>>> > >>>
>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>>> > StreamThread.java:794)
>>> > >>> >         at
>>> > >>> >
>>> > >>>
>>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
>>> > StreamThread.java:769)
>>> > >>> >         at
>>> > >>> >
>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> > StreamThread.java:647)
>>> > >>> >         at
>>> > >>> >
>>> > >>> org.apache.kafka.streams.processor.internals.
>>> > StreamThread.run(StreamThread.java:361)
>>> > >>> > Caused by: org.apache.kafka.common.errors.TimeoutException:
>>> Expiring
>>> > >>> 205
>>> > >>> > record(s) for
>>> > >>> >
>>> kstream-log-processing-windowed-access-count-per-host-repartition-0:
>>> > >>> 30020
>>> > >>> > ms has passed since last attempt plus backoff time
>>> > >>> > 07:45:16.606 TKD [StreamThread-1] ERROR
>>> > >>> c.l.f.s.kstream.WeblogProcessing$
>>> > >>> > - Stopping http service ..
>>> > >>> > 07:45:16.606 TKD [StreamThread-1] INFO
>>> > >>> >  c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server
>>> > >>> > 07:45:16.607 TKD [StreamThread-1] ERROR
>>> > >>> c.l.f.s.kstream.WeblogProcessing$
>>> > >>> > - Stopping streams service ..
>>> > >>> > 07:45:16.608 TKD [StreamThread-1] INFO
>>> > >>> >  o.apache.kafka.streams.KafkaStreams - stream-client
>>> > >>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999]
>>> State
>>> > >>> > transition from RUNNING to PENDING_SHUTDOWN.
>>> > >>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO
>>> > >>> >  o.a.k.s.p.internals.StreamThread - stream-thread
>>> [StreamThread-1]
>>> > >>> Informed
>>> > >>> > thread to shut down
>>> > >>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN
>>> > >>> >  o.a.k.s.p.internals.StreamThread - stream-thread
>>> [StreamThread-1]
>>> > >>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN.
>>> > >>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO
>>> > >>> >  o.a.k.clients.producer.KafkaProducer - Closing the Kafka
>>> producer
>>> > with
>>> > >>> > timeoutMillis = 60000 ms.
>>> > >>>
>>> > >>>
>>> > >>> The streams application stopped and I have the following exception
>>> > >>> handler
>>> > >>> registered ..
>>> > >>>
>>> > >>>     // need to exit for any stream exception
>>> > >>>     // mesos will restart the application
>>> > >>>     streams.setUncaughtExceptionHandler(new
>>> > >>> Thread.UncaughtExceptionHandler() {
>>> > >>>       override def uncaughtException(t: Thread, e: Throwable):
>>> Unit =
>>> > >>> try {
>>> > >>>         logger.error(s"Stream terminated because of uncaught
>>> exception
>>> > ..
>>> > >>> Shutting down app", e)
>>> > >>>         logger.error(s"Stopping http service ..")
>>> > >>>         restService.stop()
>>> > >>>         logger.error(s"Stopping streams service ..")
>>> > >>>         streams.close()
>>> > >>>       } catch {
>>> > >>>         case _: Exception =>
>>> > >>>       } finally {
>>> > >>>         System.exit(-1)
>>> > >>>       }
>>> > >>>     })
>>> > >>>
>>> > >>> Ideally the application should terminate and Mesos should have
>>> > restarted
>>> > >>> it. But I see that the application doesn't terminate though I have
>>> a
>>> > >>> System.exit(-1) in the finally clause. Any idea what's happening
>>> or how
>>> > >>> can
>>> > >>> I make the application terminate ..
>>> > >>>
>>> > >>> Any help will be appreciated ..
>>> > >>>
>>> > >>> regards.
>>> > >>>
>>> > >>>
>>> > >>> --
>>> > >>> Debasish Ghosh
>>> > >>> http://manning.com/ghosh2
>>> > >>> http://manning.com/ghosh
>>> > >>>
>>> > >>> Twttr: @debasishg
>>> > >>> Blog: http://debasishg.blogspot.com
>>> > >>> Code: http://github.com/debasishg
>>> > >>>
>>> > >>
>>> > >
>>> > >
>>> > > --
>>> > > Debasish Ghosh
>>> > > http://manning.com/ghosh2
>>> > > http://manning.com/ghosh
>>> > >
>>> > > Twttr: @debasishg
>>> > > Blog: http://debasishg.blogspot.com
>>> > > Code: http://github.com/debasishg
>>> > >
>>> >
>>>
>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to