Hi Eno,

Looks like we just didn't wait long enough. It eventually recovered and
started processing again.

Thanks for all the fantastic work in the 0.10.2.1 client.

On 25 April 2017 at 18:12, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Ian,
>
> Any chance you could share the full log? Feel free to send it to me
> directly if you don't want to broadcast it everywhere.
>
> Thanks
> Eno
>
>
> > On 25 Apr 2017, at 17:36, Ian Duffy <i...@ianduffy.ie> wrote:
> >
> > Thanks again for the quick response Eno.
> >
> > We just left the application running in the hope it would recover; After
> > ~1hour it's still just continuously spilling out the same exception and
> not
> > managing to continue processing.
> >
> > On 25 April 2017 at 16:24, Eno Thereska <eno.there...@gmail.com> wrote:
> >
> >> Hi Ian,
> >>
> >> Retries are sometimes expected and don't always indicate a problem. We
> >> should probably adjust the printing of the messages to not print this
> >> warning frequently. Are you seeing any crash or does the app proceed?
> >>
> >> Thanks
> >> Eno
> >>
> >> On 25 Apr 2017 4:02 p.m., "Ian Duffy" <i...@ianduffy.ie> wrote:
> >>
> >> Upgraded a handful of our streams applications to 0.10.2.1 as suggested.
> >> Seeing much less issues and much smoother performance.
> >> They withstood ISR changes.
> >>
> >> Seen the following when more consumers were added to a consumer group:
> >>
> >> 2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2]
> >> o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will
> retry.
> >> org.apache.kafka.streams.errors.LockException: task [1_21] Failed to
> lock
> >> the state directory for task 1_21
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorStateM
> >> anager.<init>(ProcessorStateManager.java:100)
> >> at
> >> org.apache.kafka.streams.processor.internals.AbstractTask.<
> >> init>(AbstractTask.java:73)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamTask.<
> >> init>(StreamTask.java:108)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> createStreamTask(StreamThread.java:864)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$
> >> TaskCreator.createTask(StreamThread.java:1237)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$Ab
> >> stractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> addStreamTasks(StreamThread.java:967)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> access$600(StreamThread.java:69)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >> onPartitionsAssigned(StreamThread.java:234)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.onJoinComplete(ConsumerCoordinator.java:259)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordina
> >> tor.joinGroupIfNeeded(AbstractCoordinator.java:352)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordina
> >> tor.ensureActiveGroup(AbstractCoordinator.java:303)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.poll(ConsumerCoordinator.java:290)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> >> KafkaConsumer.java:1029)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >> KafkaConsumer.java:995)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> runLoop(StreamThread.java:592)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> run(StreamThread.java:361)
> >>
> >>
> >>
> >> On 24 April 2017 at 16:02, Eno Thereska <eno.there...@gmail.com> wrote:
> >>
> >>> Hi Sachin,
> >>>
> >>> In KIP-62 a background heartbeat thread was introduced to deal with the
> >>> group protocol arrivals and departures. There is a setting called
> >>> session.timeout.ms that specifies the timeout of that background
> thread.
> >>> So if the thread has died that background thread will also die and the
> >>> right thing will happen.
> >>>
> >>> Eno
> >>>
> >>>> On 24 Apr 2017, at 15:34, Sachin Mittal <sjmit...@gmail.com> wrote:
> >>>>
> >>>> I had a question about this setting
> >>>> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> >>> Integer.toString(Integer.MAX_
> >>>> VALUE)
> >>>>
> >>>> How would the broker know if a thread has died or say we simply
> stopped
> >>> an
> >>>> instance and needs to be booted out of the group.
> >>>>
> >>>> Thanks
> >>>> Sachin
> >>>>
> >>>>
> >>>> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Hi Ian,
> >>>>>
> >>>>>
> >>>>> This is now fixed in 0.10.2.1. The default configuration need
> >> tweaking.
> >>> If
> >>>>> you can't pick that up (it's currently being voted), make sure you
> >> have
> >>>>> these two parameters set as follows in your streams config:
> >>>>>
> >>>>> final Properties props = new Properties();
> >>>>> ...
> >>>>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
> >>> from
> >>>>> default of 0
> >>>>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> >>>>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to
> infinity
> >>>>> from default of 300 s
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>> On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote:
> >>>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> We're running multiple Kafka Stream applications using Kafka client
> >>>>>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> >>>>>> Additionally, we're running Kafka Connect 0.10.2.0 with the
> >>> ElasticSearch
> >>>>>> connector by confluent [1]
> >>>>>>
> >>>>>> On an ISR change occurring on the brokers, all of the streams
> >>>>> applications
> >>>>>> and the Kafka connect ES connector threw exceptions and never
> >>> recovered.
> >>>>>>
> >>>>>> We've seen a correlation between Kafka Broker ISR change and stream
> >>>>>> applications dying.
> >>>>>>
> >>>>>> The logs from the streams applications throw out the following and
> >> fail
> >>>>> to
> >>>>>> recover:
> >>>>>>
> >>>>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> >>>>>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> >>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> >>>>> RUNNING
> >>>>>> to NOT_RUNNING
> >>>>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> >>>>>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> >>>>>> Unexpected Exception caught in thread [StreamThread-1]:
> >>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught
> >> in
> >>>>>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> >>>>>> topic=kafka-topic, partition=81, offset=479285
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:216)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>> StreamThread.java:641)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:368)
> >>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >>> [0_81]
> >>>>>> exception caught when producing
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >> RecordCollectorImpl.send(
> >>>>> RecordCollectorImpl.java:76)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>> process(SinkNode.java:79)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> >>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >>>>> ProcessorNode.java:48)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>>>> ProcessorNode.java:134)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> SourceNode.process(SourceNode.java:70)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:197)
> >>>>>> ... 2 common frames omitted
> >>>>>> Caused by: org.apache.kafka.common.errors.
> >>> NotLeaderForPartitionException
> >>>>> :
> >>>>>> This server is not the leader for that topic-partition.
> >>>>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> >>>>>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> >>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> >>>>> RUNNING
> >>>>>> to NOT_RUNNING
> >>>>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> >>>>>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> >>>>>> Unexpected Exception caught in thread [StreamThread-3]:
> >>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught
> >> in
> >>>>>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> >>>>>> topic=kafka-topic, partition=55, offset=479308
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:216)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>> StreamThread.java:641)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:368)
> >>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >>> [0_55]
> >>>>>> exception caught when producing
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >> RecordCollectorImpl.send(
> >>>>> RecordCollectorImpl.java:76)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>> process(SinkNode.java:79)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> >>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >>>>> ProcessorNode.java:48)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>>>> ProcessorNode.java:134)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> SourceNode.process(SourceNode.java:70)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:197)
> >>>>>> ... 2 common frames omitted
> >>>>>> Caused by: org.apache.kafka.common.errors.
> >>> NotLeaderForPartitionException
> >>>>> :
> >>>>>> This server is not the leader for that topic-partition.
> >>>>>>
> >>>>>> Are we potentially doing something wrong with our streams
> >>>>>> configuration/usage? Or does this look like a bug?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Ian.
> >>>>>>
> >>>>>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
> >>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Reply via email to