Hi Ian,

Any chance you could share the full log? Feel free to send it to me directly if 
you don't want to broadcast it everywhere.

Thanks
Eno


> On 25 Apr 2017, at 17:36, Ian Duffy <i...@ianduffy.ie> wrote:
> 
> Thanks again for the quick response Eno.
> 
> We just left the application running in the hope it would recover; After
> ~1hour it's still just continuously spilling out the same exception and not
> managing to continue processing.
> 
> On 25 April 2017 at 16:24, Eno Thereska <eno.there...@gmail.com> wrote:
> 
>> Hi Ian,
>> 
>> Retries are sometimes expected and don't always indicate a problem. We
>> should probably adjust the printing of the messages to not print this
>> warning frequently. Are you seeing any crash or does the app proceed?
>> 
>> Thanks
>> Eno
>> 
>> On 25 Apr 2017 4:02 p.m., "Ian Duffy" <i...@ianduffy.ie> wrote:
>> 
>> Upgraded a handful of our streams applications to 0.10.2.1 as suggested.
>> Seeing much less issues and much smoother performance.
>> They withstood ISR changes.
>> 
>> Seen the following when more consumers were added to a consumer group:
>> 
>> 2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2]
>> o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will retry.
>> org.apache.kafka.streams.errors.LockException: task [1_21] Failed to lock
>> the state directory for task 1_21
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.<init>(ProcessorStateManager.java:100)
>> at
>> org.apache.kafka.streams.processor.internals.AbstractTask.<
>> init>(AbstractTask.java:73)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.<
>> init>(StreamTask.java:108)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> createStreamTask(StreamThread.java:864)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$
>> TaskCreator.createTask(StreamThread.java:1237)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$Ab
>> stractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> addStreamTasks(StreamThread.java:967)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> access$600(StreamThread.java:69)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$1.
>> onPartitionsAssigned(StreamThread.java:234)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.onJoinComplete(ConsumerCoordinator.java:259)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.joinGroupIfNeeded(AbstractCoordinator.java:352)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.ensureActiveGroup(AbstractCoordinator.java:303)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.poll(ConsumerCoordinator.java:290)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>> KafkaConsumer.java:1029)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> runLoop(StreamThread.java:592)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> run(StreamThread.java:361)
>> 
>> 
>> 
>> On 24 April 2017 at 16:02, Eno Thereska <eno.there...@gmail.com> wrote:
>> 
>>> Hi Sachin,
>>> 
>>> In KIP-62 a background heartbeat thread was introduced to deal with the
>>> group protocol arrivals and departures. There is a setting called
>>> session.timeout.ms that specifies the timeout of that background thread.
>>> So if the thread has died that background thread will also die and the
>>> right thing will happen.
>>> 
>>> Eno
>>> 
>>>> On 24 Apr 2017, at 15:34, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>> 
>>>> I had a question about this setting
>>>> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>>> Integer.toString(Integer.MAX_
>>>> VALUE)
>>>> 
>>>> How would the broker know if a thread has died or say we simply stopped
>>> an
>>>> instance and needs to be booted out of the group.
>>>> 
>>>> Thanks
>>>> Sachin
>>>> 
>>>> 
>>>> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Ian,
>>>>> 
>>>>> 
>>>>> This is now fixed in 0.10.2.1. The default configuration need
>> tweaking.
>>> If
>>>>> you can't pick that up (it's currently being voted), make sure you
>> have
>>>>> these two parameters set as follows in your streams config:
>>>>> 
>>>>> final Properties props = new Properties();
>>>>> ...
>>>>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
>>> from
>>>>> default of 0
>>>>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>>>>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
>>>>> from default of 300 s
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>>> On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote:
>>>>>> 
>>>>>> Hi All,
>>>>>> 
>>>>>> We're running multiple Kafka Stream applications using Kafka client
>>>>>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
>>>>>> Additionally, we're running Kafka Connect 0.10.2.0 with the
>>> ElasticSearch
>>>>>> connector by confluent [1]
>>>>>> 
>>>>>> On an ISR change occurring on the brokers, all of the streams
>>>>> applications
>>>>>> and the Kafka connect ES connector threw exceptions and never
>>> recovered.
>>>>>> 
>>>>>> We've seen a correlation between Kafka Broker ISR change and stream
>>>>>> applications dying.
>>>>>> 
>>>>>> The logs from the streams applications throw out the following and
>> fail
>>>>> to
>>>>>> recover:
>>>>>> 
>>>>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
>>>>>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
>>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
>>>>> RUNNING
>>>>>> to NOT_RUNNING
>>>>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
>>>>>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
>>>>>> Unexpected Exception caught in thread [StreamThread-1]:
>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught
>> in
>>>>>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
>>>>>> topic=kafka-topic, partition=81, offset=479285
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:216)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> StreamThread.java:641)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:368)
>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>> [0_81]
>>>>>> exception caught when producing
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(
>>>>> RecordCollectorImpl.java:76)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>>>>> process(SinkNode.java:79)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>>>>> at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
>>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>>>>> ProcessorNode.java:48)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>>>> ProcessorNode.java:134)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> SourceNode.process(SourceNode.java:70)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:197)
>>>>>> ... 2 common frames omitted
>>>>>> Caused by: org.apache.kafka.common.errors.
>>> NotLeaderForPartitionException
>>>>> :
>>>>>> This server is not the leader for that topic-partition.
>>>>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
>>>>>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
>>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
>>>>> RUNNING
>>>>>> to NOT_RUNNING
>>>>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
>>>>>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
>>>>>> Unexpected Exception caught in thread [StreamThread-3]:
>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught
>> in
>>>>>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
>>>>>> topic=kafka-topic, partition=55, offset=479308
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:216)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> StreamThread.java:641)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:368)
>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>> [0_55]
>>>>>> exception caught when producing
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(
>>>>> RecordCollectorImpl.java:76)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>>>>> process(SinkNode.java:79)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>>>>> at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
>>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>>>>> ProcessorNode.java:48)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>>>> ProcessorNode.java:134)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> SourceNode.process(SourceNode.java:70)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:197)
>>>>>> ... 2 common frames omitted
>>>>>> Caused by: org.apache.kafka.common.errors.
>>> NotLeaderForPartitionException
>>>>> :
>>>>>> This server is not the leader for that topic-partition.
>>>>>> 
>>>>>> Are we potentially doing something wrong with our streams
>>>>>> configuration/usage? Or does this look like a bug?
>>>>>> 
>>>>>> Thanks,
>>>>>> Ian.
>>>>>> 
>>>>>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
>>>>> 
>>>>> 
>>> 
>>> 
>> 

Reply via email to