I had a question about this setting ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_ VALUE)
How would the broker know if a thread has died or say we simply stopped an instance and needs to be booted out of the group. Thanks Sachin On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Ian, > > > This is now fixed in 0.10.2.1. The default configuration need tweaking. If > you can't pick that up (it's currently being voted), make sure you have > these two parameters set as follows in your streams config: > > final Properties props = new Properties(); > ... > props.put(ProducerConfig.RETRIES_CONFIG, 10); <---- increase to 10 from > default of 0 > props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, > Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity > from default of 300 s > > Thanks > Eno > > > On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote: > > > > Hi All, > > > > We're running multiple Kafka Stream applications using Kafka client > > 0.10.2.0 against a 6 node broker cluster running 0.10.1.1 > > Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch > > connector by confluent [1] > > > > On an ISR change occurring on the brokers, all of the streams > applications > > and the Kafka connect ES connector threw exceptions and never recovered. > > > > We've seen a correlation between Kafka Broker ISR change and stream > > applications dying. > > > > The logs from the streams applications throw out the following and fail > to > > recover: > > > > 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 > > 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1] > > o.a.k.s.p.internals.StreamThread - Unexpected state transition from > RUNNING > > to NOT_RUNNING > > 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 > > 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application - > > Unexpected Exception caught in thread [StreamThread-1]: > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > > process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000, > > topic=kafka-topic, partition=81, offset=479285 > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:216) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:641) > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:368) > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81] > > exception caught when producing > > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:119) > > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:76) > > at > > org.apache.kafka.streams.processor.internals.SinkNode. > process(SinkNode.java:79) > > at > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > at > > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$ > KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) > > at > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:48) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:134) > > at > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > at > > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:70) > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:197) > > ... 2 common frames omitted > > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException > : > > This server is not the leader for that topic-partition. > > 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 > > 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3] > > o.a.k.s.p.internals.StreamThread - Unexpected state transition from > RUNNING > > to NOT_RUNNING > > 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 > > 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application - > > Unexpected Exception caught in thread [StreamThread-3]: > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > > process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000, > > topic=kafka-topic, partition=55, offset=479308 > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:216) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:641) > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:368) > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55] > > exception caught when producing > > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:119) > > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:76) > > at > > org.apache.kafka.streams.processor.internals.SinkNode. > process(SinkNode.java:79) > > at > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > at > > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$ > KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) > > at > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:48) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:134) > > at > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > at > > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:70) > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:197) > > ... 2 common frames omitted > > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException > : > > This server is not the leader for that topic-partition. > > > > Are we potentially doing something wrong with our streams > > configuration/usage? Or does this look like a bug? > > > > Thanks, > > Ian. > > > > [1] https://github.com/confluentinc/kafka-connect-elasticsearch > >