Hi Ian, Any chance you could share the full log? Feel free to send it to me directly if you don't want to broadcast it everywhere.
Thanks Eno > On 25 Apr 2017, at 17:36, Ian Duffy <i...@ianduffy.ie> wrote: > > Thanks again for the quick response Eno. > > We just left the application running in the hope it would recover; After > ~1hour it's still just continuously spilling out the same exception and not > managing to continue processing. > > On 25 April 2017 at 16:24, Eno Thereska <eno.there...@gmail.com> wrote: > >> Hi Ian, >> >> Retries are sometimes expected and don't always indicate a problem. We >> should probably adjust the printing of the messages to not print this >> warning frequently. Are you seeing any crash or does the app proceed? >> >> Thanks >> Eno >> >> On 25 Apr 2017 4:02 p.m., "Ian Duffy" <i...@ianduffy.ie> wrote: >> >> Upgraded a handful of our streams applications to 0.10.2.1 as suggested. >> Seeing much less issues and much smoother performance. >> They withstood ISR changes. >> >> Seen the following when more consumers were added to a consumer group: >> >> 2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2] >> o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will retry. >> org.apache.kafka.streams.errors.LockException: task [1_21] Failed to lock >> the state directory for task 1_21 >> at >> org.apache.kafka.streams.processor.internals.ProcessorStateM >> anager.<init>(ProcessorStateManager.java:100) >> at >> org.apache.kafka.streams.processor.internals.AbstractTask.< >> init>(AbstractTask.java:73) >> at >> org.apache.kafka.streams.processor.internals.StreamTask.< >> init>(StreamTask.java:108) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> createStreamTask(StreamThread.java:864) >> at >> org.apache.kafka.streams.processor.internals.StreamThread$ >> TaskCreator.createTask(StreamThread.java:1237) >> at >> org.apache.kafka.streams.processor.internals.StreamThread$Ab >> stractTaskCreator.retryWithBackoff(StreamThread.java:1210) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> addStreamTasks(StreamThread.java:967) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> access$600(StreamThread.java:69) >> at >> org.apache.kafka.streams.processor.internals.StreamThread$1. >> onPartitionsAssigned(StreamThread.java:234) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> tor.onJoinComplete(ConsumerCoordinator.java:259) >> at >> org.apache.kafka.clients.consumer.internals.AbstractCoordina >> tor.joinGroupIfNeeded(AbstractCoordinator.java:352) >> at >> org.apache.kafka.clients.consumer.internals.AbstractCoordina >> tor.ensureActiveGroup(AbstractCoordinator.java:303) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> tor.poll(ConsumerCoordinator.java:290) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( >> KafkaConsumer.java:1029) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> KafkaConsumer.java:995) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> runLoop(StreamThread.java:592) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> run(StreamThread.java:361) >> >> >> >> On 24 April 2017 at 16:02, Eno Thereska <eno.there...@gmail.com> wrote: >> >>> Hi Sachin, >>> >>> In KIP-62 a background heartbeat thread was introduced to deal with the >>> group protocol arrivals and departures. There is a setting called >>> session.timeout.ms that specifies the timeout of that background thread. >>> So if the thread has died that background thread will also die and the >>> right thing will happen. >>> >>> Eno >>> >>>> On 24 Apr 2017, at 15:34, Sachin Mittal <sjmit...@gmail.com> wrote: >>>> >>>> I had a question about this setting >>>> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, >>> Integer.toString(Integer.MAX_ >>>> VALUE) >>>> >>>> How would the broker know if a thread has died or say we simply stopped >>> an >>>> instance and needs to be booted out of the group. >>>> >>>> Thanks >>>> Sachin >>>> >>>> >>>> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com> >>>> wrote: >>>> >>>>> Hi Ian, >>>>> >>>>> >>>>> This is now fixed in 0.10.2.1. The default configuration need >> tweaking. >>> If >>>>> you can't pick that up (it's currently being voted), make sure you >> have >>>>> these two parameters set as follows in your streams config: >>>>> >>>>> final Properties props = new Properties(); >>>>> ... >>>>> props.put(ProducerConfig.RETRIES_CONFIG, 10); <---- increase to 10 >>> from >>>>> default of 0 >>>>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, >>>>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity >>>>> from default of 300 s >>>>> >>>>> Thanks >>>>> Eno >>>>> >>>>>> On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote: >>>>>> >>>>>> Hi All, >>>>>> >>>>>> We're running multiple Kafka Stream applications using Kafka client >>>>>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1 >>>>>> Additionally, we're running Kafka Connect 0.10.2.0 with the >>> ElasticSearch >>>>>> connector by confluent [1] >>>>>> >>>>>> On an ISR change occurring on the brokers, all of the streams >>>>> applications >>>>>> and the Kafka connect ES connector threw exceptions and never >>> recovered. >>>>>> >>>>>> We've seen a correlation between Kafka Broker ISR change and stream >>>>>> applications dying. >>>>>> >>>>>> The logs from the streams applications throw out the following and >> fail >>>>> to >>>>>> recover: >>>>>> >>>>>> 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 >>>>>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1] >>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from >>>>> RUNNING >>>>>> to NOT_RUNNING >>>>>> 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 >>>>>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application - >>>>>> Unexpected Exception caught in thread [StreamThread-1]: >>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught >> in >>>>>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000, >>>>>> topic=kafka-topic, partition=81, offset=479285 >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> StreamTask.process(StreamTask.java:216) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>>>> StreamThread.java:641) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> StreamThread.run(StreamThread.java:368) >>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task >>> [0_81] >>>>>> exception caught when producing >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >>>>> checkForException(RecordCollectorImpl.java:119) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >> RecordCollectorImpl.send( >>>>> RecordCollectorImpl.java:76) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.SinkNode. >>>>> process(SinkNode.java:79) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) >>>>>> at >>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$ >>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( >>>>> ProcessorNode.java:48) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >>>>> measureLatencyNs(StreamsMetricsImpl.java:188) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process( >>>>> ProcessorNode.java:134) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> SourceNode.process(SourceNode.java:70) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> StreamTask.process(StreamTask.java:197) >>>>>> ... 2 common frames omitted >>>>>> Caused by: org.apache.kafka.common.errors. >>> NotLeaderForPartitionException >>>>> : >>>>>> This server is not the leader for that topic-partition. >>>>>> 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 >>>>>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3] >>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from >>>>> RUNNING >>>>>> to NOT_RUNNING >>>>>> 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 >>>>>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application - >>>>>> Unexpected Exception caught in thread [StreamThread-3]: >>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught >> in >>>>>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000, >>>>>> topic=kafka-topic, partition=55, offset=479308 >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> StreamTask.process(StreamTask.java:216) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>>>> StreamThread.java:641) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> StreamThread.run(StreamThread.java:368) >>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task >>> [0_55] >>>>>> exception caught when producing >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >>>>> checkForException(RecordCollectorImpl.java:119) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >> RecordCollectorImpl.send( >>>>> RecordCollectorImpl.java:76) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.SinkNode. >>>>> process(SinkNode.java:79) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) >>>>>> at >>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$ >>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( >>>>> ProcessorNode.java:48) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >>>>> measureLatencyNs(StreamsMetricsImpl.java:188) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process( >>>>> ProcessorNode.java:134) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> SourceNode.process(SourceNode.java:70) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals. >>>>> StreamTask.process(StreamTask.java:197) >>>>>> ... 2 common frames omitted >>>>>> Caused by: org.apache.kafka.common.errors. >>> NotLeaderForPartitionException >>>>> : >>>>>> This server is not the leader for that topic-partition. >>>>>> >>>>>> Are we potentially doing something wrong with our streams >>>>>> configuration/usage? Or does this look like a bug? >>>>>> >>>>>> Thanks, >>>>>> Ian. >>>>>> >>>>>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch >>>>> >>>>> >>> >>> >>