Hi Sachin,

In KIP-62 a background heartbeat thread was introduced to deal with the group 
protocol arrivals and departures. There is a setting called session.timeout.ms 
that specifies the timeout of that background thread. So if the thread has died 
that background thread will also die and the right thing will happen.

Eno

> On 24 Apr 2017, at 15:34, Sachin Mittal <sjmit...@gmail.com> wrote:
> 
> I had a question about this setting
> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_
> VALUE)
> 
> How would the broker know if a thread has died or say we simply stopped an
> instance and needs to be booted out of the group.
> 
> Thanks
> Sachin
> 
> 
> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> Hi Ian,
>> 
>> 
>> This is now fixed in 0.10.2.1. The default configuration need tweaking. If
>> you can't pick that up (it's currently being voted), make sure you have
>> these two parameters set as follows in your streams config:
>> 
>> final Properties props = new Properties();
>> ...
>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from
>> default of 0
>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
>> from default of 300 s
>> 
>> Thanks
>> Eno
>> 
>>> On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote:
>>> 
>>> Hi All,
>>> 
>>> We're running multiple Kafka Stream applications using Kafka client
>>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
>>> Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch
>>> connector by confluent [1]
>>> 
>>> On an ISR change occurring on the brokers, all of the streams
>> applications
>>> and the Kafka connect ES connector threw exceptions and never recovered.
>>> 
>>> We've seen a correlation between Kafka Broker ISR change and stream
>>> applications dying.
>>> 
>>> The logs from the streams applications throw out the following and fail
>> to
>>> recover:
>>> 
>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
>>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
>> RUNNING
>>> to NOT_RUNNING
>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
>>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
>>> Unexpected Exception caught in thread [StreamThread-1]:
>>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
>>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
>>> topic=kafka-topic, partition=81, offset=479285
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:216)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:641)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:368)
>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81]
>>> exception caught when producing
>>> at
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> checkForException(RecordCollectorImpl.java:119)
>>> at
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
>> RecordCollectorImpl.java:76)
>>> at
>>> org.apache.kafka.streams.processor.internals.SinkNode.
>> process(SinkNode.java:79)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>> at
>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> ProcessorNode.java:48)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> ProcessorNode.java:134)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> SourceNode.process(SourceNode.java:70)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:197)
>>> ... 2 common frames omitted
>>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
>> :
>>> This server is not the leader for that topic-partition.
>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
>>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
>> RUNNING
>>> to NOT_RUNNING
>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
>>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
>>> Unexpected Exception caught in thread [StreamThread-3]:
>>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
>>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
>>> topic=kafka-topic, partition=55, offset=479308
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:216)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:641)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:368)
>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55]
>>> exception caught when producing
>>> at
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> checkForException(RecordCollectorImpl.java:119)
>>> at
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
>> RecordCollectorImpl.java:76)
>>> at
>>> org.apache.kafka.streams.processor.internals.SinkNode.
>> process(SinkNode.java:79)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>> at
>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> ProcessorNode.java:48)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> ProcessorNode.java:134)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> SourceNode.process(SourceNode.java:70)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:197)
>>> ... 2 common frames omitted
>>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
>> :
>>> This server is not the leader for that topic-partition.
>>> 
>>> Are we potentially doing something wrong with our streams
>>> configuration/usage? Or does this look like a bug?
>>> 
>>> Thanks,
>>> Ian.
>>> 
>>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
>> 
>> 

Reply via email to