Hi All, We're running multiple Kafka Stream applications using Kafka client 0.10.2.0 against a 6 node broker cluster running 0.10.1.1 Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch connector by confluent [1]
On an ISR change occurring on the brokers, all of the streams applications and the Kafka connect ES connector threw exceptions and never recovered. We've seen a correlation between Kafka Broker ISR change and stream applications dying. The logs from the streams applications throw out the following and fail to recover: 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1] o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application - Unexpected Exception caught in thread [StreamThread-1]: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000, topic=kafka-topic, partition=81, offset=479285 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) ... 2 common frames omitted Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3] o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application - Unexpected Exception caught in thread [StreamThread-3]: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000, topic=kafka-topic, partition=55, offset=479308 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) ... 2 common frames omitted Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. Are we potentially doing something wrong with our streams configuration/usage? Or does this look like a bug? Thanks, Ian. [1] https://github.com/confluentinc/kafka-connect-elasticsearch