I had a question about this setting
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_
VALUE)

How would the broker know if a thread has died or say we simply stopped an
instance and needs to be booted out of the group.

Thanks
Sachin


On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Ian,
>
>
> This is now fixed in 0.10.2.1. The default configuration need tweaking. If
> you can't pick that up (it's currently being voted), make sure you have
> these two parameters set as follows in your streams config:
>
> final Properties props = new Properties();
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from
> default of 0
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> from default of 300 s
>
> Thanks
> Eno
>
> > On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote:
> >
> > Hi All,
> >
> > We're running multiple Kafka Stream applications using Kafka client
> > 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> > Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch
> > connector by confluent [1]
> >
> > On an ISR change occurring on the brokers, all of the streams
> applications
> > and the Kafka connect ES connector threw exceptions and never recovered.
> >
> > We've seen a correlation between Kafka Broker ISR change and stream
> > applications dying.
> >
> > The logs from the streams applications throw out the following and fail
> to
> > recover:
> >
> > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> RUNNING
> > to NOT_RUNNING
> > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> > Unexpected Exception caught in thread [StreamThread-1]:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> > topic=kafka-topic, partition=81, offset=479285
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:641)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81]
> > exception caught when producing
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:76)
> > at
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> > ... 2 common frames omitted
> > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> > This server is not the leader for that topic-partition.
> > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> RUNNING
> > to NOT_RUNNING
> > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> > Unexpected Exception caught in thread [StreamThread-3]:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> > topic=kafka-topic, partition=55, offset=479308
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:641)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55]
> > exception caught when producing
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:76)
> > at
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> > ... 2 common frames omitted
> > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> > This server is not the leader for that topic-partition.
> >
> > Are we potentially doing something wrong with our streams
> > configuration/usage? Or does this look like a bug?
> >
> > Thanks,
> > Ian.
> >
> > [1] https://github.com/confluentinc/kafka-connect-elasticsearch
>
>

Reply via email to