Hi Ian,

This is now fixed in 0.10.2.1. The default configuration need tweaking. If you 
can't pick that up (it's currently being voted), make sure you have these two 
parameters set as follows in your streams config:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from 
default of 0
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity from 
default of 300 s

Thanks
Eno

> On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote:
> 
> Hi All,
> 
> We're running multiple Kafka Stream applications using Kafka client
> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch
> connector by confluent [1]
> 
> On an ISR change occurring on the brokers, all of the streams applications
> and the Kafka connect ES connector threw exceptions and never recovered.
> 
> We've seen a correlation between Kafka Broker ISR change and stream
> applications dying.
> 
> The logs from the streams applications throw out the following and fail to
> recover:
> 
> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING
> to NOT_RUNNING
> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> Unexpected Exception caught in thread [StreamThread-1]:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> topic=kafka-topic, partition=81, offset=479285
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81]
> exception caught when producing
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
> ... 2 common frames omitted
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This server is not the leader for that topic-partition.
> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING
> to NOT_RUNNING
> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> Unexpected Exception caught in thread [StreamThread-3]:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> topic=kafka-topic, partition=55, offset=479308
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55]
> exception caught when producing
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
> ... 2 common frames omitted
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This server is not the leader for that topic-partition.
> 
> Are we potentially doing something wrong with our streams
> configuration/usage? Or does this look like a bug?
> 
> Thanks,
> Ian.
> 
> [1] https://github.com/confluentinc/kafka-connect-elasticsearch

Reply via email to