Hi Sachin, In KIP-62 a background heartbeat thread was introduced to deal with the group protocol arrivals and departures. There is a setting called session.timeout.ms that specifies the timeout of that background thread. So if the thread has died that background thread will also die and the right thing will happen.
Eno > On 24 Apr 2017, at 15:34, Sachin Mittal <sjmit...@gmail.com> wrote: > > I had a question about this setting > ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_ > VALUE) > > How would the broker know if a thread has died or say we simply stopped an > instance and needs to be booted out of the group. > > Thanks > Sachin > > > On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Ian, >> >> >> This is now fixed in 0.10.2.1. The default configuration need tweaking. If >> you can't pick that up (it's currently being voted), make sure you have >> these two parameters set as follows in your streams config: >> >> final Properties props = new Properties(); >> ... >> props.put(ProducerConfig.RETRIES_CONFIG, 10); <---- increase to 10 from >> default of 0 >> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, >> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity >> from default of 300 s >> >> Thanks >> Eno >> >>> On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote: >>> >>> Hi All, >>> >>> We're running multiple Kafka Stream applications using Kafka client >>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1 >>> Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch >>> connector by confluent [1] >>> >>> On an ISR change occurring on the brokers, all of the streams >> applications >>> and the Kafka connect ES connector threw exceptions and never recovered. >>> >>> We've seen a correlation between Kafka Broker ISR change and stream >>> applications dying. >>> >>> The logs from the streams applications throw out the following and fail >> to >>> recover: >>> >>> 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 >>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1] >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from >> RUNNING >>> to NOT_RUNNING >>> 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 >>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application - >>> Unexpected Exception caught in thread [StreamThread-1]: >>> org.apache.kafka.streams.errors.StreamsException: Exception caught in >>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000, >>> topic=kafka-topic, partition=81, offset=479285 >>> at >>> org.apache.kafka.streams.processor.internals. >> StreamTask.process(StreamTask.java:216) >>> at >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:641) >>> at >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:368) >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81] >>> exception caught when producing >>> at >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >> checkForException(RecordCollectorImpl.java:119) >>> at >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( >> RecordCollectorImpl.java:76) >>> at >>> org.apache.kafka.streams.processor.internals.SinkNode. >> process(SinkNode.java:79) >>> at >>> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) >>> at >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$ >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( >> ProcessorNode.java:48) >>> at >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:188) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> ProcessorNode.java:134) >>> at >>> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) >>> at >>> org.apache.kafka.streams.processor.internals. >> SourceNode.process(SourceNode.java:70) >>> at >>> org.apache.kafka.streams.processor.internals. >> StreamTask.process(StreamTask.java:197) >>> ... 2 common frames omitted >>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException >> : >>> This server is not the leader for that topic-partition. >>> 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 >>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3] >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from >> RUNNING >>> to NOT_RUNNING >>> 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 >>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application - >>> Unexpected Exception caught in thread [StreamThread-3]: >>> org.apache.kafka.streams.errors.StreamsException: Exception caught in >>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000, >>> topic=kafka-topic, partition=55, offset=479308 >>> at >>> org.apache.kafka.streams.processor.internals. >> StreamTask.process(StreamTask.java:216) >>> at >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:641) >>> at >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:368) >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55] >>> exception caught when producing >>> at >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >> checkForException(RecordCollectorImpl.java:119) >>> at >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( >> RecordCollectorImpl.java:76) >>> at >>> org.apache.kafka.streams.processor.internals.SinkNode. >> process(SinkNode.java:79) >>> at >>> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) >>> at >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$ >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( >> ProcessorNode.java:48) >>> at >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:188) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> ProcessorNode.java:134) >>> at >>> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) >>> at >>> org.apache.kafka.streams.processor.internals. >> SourceNode.process(SourceNode.java:70) >>> at >>> org.apache.kafka.streams.processor.internals. >> StreamTask.process(StreamTask.java:197) >>> ... 2 common frames omitted >>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException >> : >>> This server is not the leader for that topic-partition. >>> >>> Are we potentially doing something wrong with our streams >>> configuration/usage? Or does this look like a bug? >>> >>> Thanks, >>> Ian. >>> >>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch >> >>