Hi -

I have a Kafka streams application deployed on a Mesos DC/OS cluster. While
the application was running, Kafka suddenly reported to be unhealthy and
the application got an exception ..

07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$ -
> Stream terminated because of uncaught exception .. Shutting down app
> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
> caught when producing
>         at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
>         at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
>         at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
>         at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 205
> record(s) for
> kstream-log-processing-windowed-access-count-per-host-repartition-0: 30020
> ms has passed since last attempt plus backoff time
> 07:45:16.606 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$
> - Stopping http service ..
> 07:45:16.606 TKD [StreamThread-1] INFO
>  c.l.f.s.k.http.WeblogDSLHttpService - Stopping the http server
> 07:45:16.607 TKD [StreamThread-1] ERROR c.l.f.s.kstream.WeblogProcessing$
> - Stopping streams service ..
> 07:45:16.608 TKD [StreamThread-1] INFO
>  o.apache.kafka.streams.KafkaStreams - stream-client
> [kstream-log-processing-39b51b2b-e8da-4db8-b782-bec4fb030999] State
> transition from RUNNING to PENDING_SHUTDOWN.
> 07:45:16.608 TKD [kafka-streams-close-thread] INFO
>  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Informed
> thread to shut down
> 07:45:16.609 TKD [kafka-streams-close-thread] WARN
>  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN.
> 07:45:16.627 TKD [default-akka.kafka.default-dispatcher-43] INFO
>  o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with
> timeoutMillis = 60000 ms.


The streams application stopped and I have the following exception handler
registered ..

    // need to exit for any stream exception
    // mesos will restart the application
    streams.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
      override def uncaughtException(t: Thread, e: Throwable): Unit = try {
        logger.error(s"Stream terminated because of uncaught exception ..
Shutting down app", e)
        logger.error(s"Stopping http service ..")
        restService.stop()
        logger.error(s"Stopping streams service ..")
        streams.close()
      } catch {
        case _: Exception =>
      } finally {
        System.exit(-1)
      }
    })

Ideally the application should terminate and Mesos should have restarted
it. But I see that the application doesn't terminate though I have a
System.exit(-1) in the finally clause. Any idea what's happening or how can
I make the application terminate ..

Any help will be appreciated ..

regards.


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to