The exception indicates that streams was unable to find that
topic-partition on the kafka brokers. Can you verify that it exists?
Also, i'm assuming you are on 0.10.2.x?

On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Thanks Damien .. this worked. But now after the application restarts, I
> see the following exception ..
>
> 09:41:26.516 TKD [StreamThread-1] ERROR
>> c.l.fdp.sample.kstream.WeblogDriver$ - Stream terminated because of
>> uncaught exception .. Shutting down app
>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>> [StreamThread-1] Failed to rebalance
>>         at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
>>         at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0]
>> Store log-counts's change log (kstream-log-count-log-counts-changelog) does
>> not contain partition 0
>>         at
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:188)
>>         at
>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>
>
> I found this thread ..
> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
> but unlike this use case I don't make any change in the partition of any
> topic in between the restarts. BTW my application uses stateful streaming
> and hence Kafka creates any internal topics. Not sure if it's related to
> this exception though. But the store name mentioned in the exception
> (log-count) is one for stateful streaming.
>
> regards.
>
> On Wed, Jul 26, 2017 at 2:20 PM, Damian Guy <damian....@gmail.com> wrote:
>
>> Hi Debasish,
>>
>> It might be that it is blocked in `streams.close()`
>> You might want to to try the overload that has a long and TimeUnit as
>> params, i.e., `streams.close(1, TimeUnit.MINUTES)`
>>
>> Thanks,
>> Damian
>>
>> On Wed, 26 Jul 2017 at 09:11 Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Hi -
>>>
>>> I have a Kafka streams application deployed on a Mesos DC/OS cluster.
>>> While
>>> the application was running, Kafka suddenly reported to be unhealthy and
>>> the application got an exception ..
>>>
>>> 07:45:16.606 TKD [StreamThread-1] ERROR
>>> c.l.f.s.kstream.WeblogProcessing$ -
>>> > Stream terminated because of uncaught exception .. Shutting down app
>>> > org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
>>> > caught when producing
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>>> >         at
>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>>> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring
>>> 205
>>> > record(s) for
>>> > kstream-log-processing-windowed-access-count-per-host-repartition-0:
>>> 30020
>>> > ms has passed since last attempt plus backoff time
>>> > 07:45:16.606 TKD [StreamThread-1] ERROR
>>> c.l.f.s.kstream.WeblogProcessing$
>>> > - Stopping http service ..
>>> > 07:45:16.606 TKD [StreamThread-1] INFO
>>> >  c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server
>>> > 07:45:16.607 TKD [StreamThread-1] ERROR
>>> c.l.f.s.kstream.WeblogProcessing$
>>> > - Stopping streams service ..
>>> > 07:45:16.608 TKD [StreamThread-1] INFO
>>> >  o.apache.kafka.streams.KafkaStreams - stream-client
>>> > [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State
>>> > transition from RUNNING to PENDING_SHUTDOWN.
>>> > 07:45:16.608 TKD [kafka-streams-close-thread] INFO
>>> >  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>>> Informed
>>> > thread to shut down
>>> > 07:45:16.609 TKD [kafka-streams-close-thread] WARN
>>> >  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>>> > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN.
>>> > 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO
>>> >  o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with
>>> > timeoutMillis = 60000 ms.
>>>
>>>
>>> The streams application stopped and I have the following exception
>>> handler
>>> registered ..
>>>
>>>     // need to exit for any stream exception
>>>     // mesos will restart the application
>>>     streams.setUncaughtExceptionHandler(new
>>> Thread.UncaughtExceptionHandler() {
>>>       override def uncaughtException(t: Thread, e: Throwable): Unit =
>>> try {
>>>         logger.error(s"Stream terminated because of uncaught exception ..
>>> Shutting down app", e)
>>>         logger.error(s"Stopping http service ..")
>>>         restService.stop()
>>>         logger.error(s"Stopping streams service ..")
>>>         streams.close()
>>>       } catch {
>>>         case _: Exception =>
>>>       } finally {
>>>         System.exit(-1)
>>>       }
>>>     })
>>>
>>> Ideally the application should terminate and Mesos should have restarted
>>> it. But I see that the application doesn't terminate though I have a
>>> System.exit(-1) in the finally clause. Any idea what's happening or how
>>> can
>>> I make the application terminate ..
>>>
>>> Any help will be appreciated ..
>>>
>>> regards.
>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to