Thanks again for the quick response Eno. We just left the application running in the hope it would recover; After ~1hour it's still just continuously spilling out the same exception and not managing to continue processing.
On 25 April 2017 at 16:24, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Ian, > > Retries are sometimes expected and don't always indicate a problem. We > should probably adjust the printing of the messages to not print this > warning frequently. Are you seeing any crash or does the app proceed? > > Thanks > Eno > > On 25 Apr 2017 4:02 p.m., "Ian Duffy" <i...@ianduffy.ie> wrote: > > Upgraded a handful of our streams applications to 0.10.2.1 as suggested. > Seeing much less issues and much smoother performance. > They withstood ISR changes. > > Seen the following when more consumers were added to a consumer group: > > 2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2] > o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will retry. > org.apache.kafka.streams.errors.LockException: task [1_21] Failed to lock > the state directory for task 1_21 > at > org.apache.kafka.streams.processor.internals.ProcessorStateM > anager.<init>(ProcessorStateManager.java:100) > at > org.apache.kafka.streams.processor.internals.AbstractTask.< > init>(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.< > init>(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread. > createStreamTask(StreamThread.java:864) > at > org.apache.kafka.streams.processor.internals.StreamThread$ > TaskCreator.createTask(StreamThread.java:1237) > at > org.apache.kafka.streams.processor.internals.StreamThread$Ab > stractTaskCreator.retryWithBackoff(StreamThread.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread. > addStreamTasks(StreamThread.java:967) > at > org.apache.kafka.streams.processor.internals.StreamThread. > access$600(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordina > tor.onJoinComplete(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordina > tor.joinGroupIfNeeded(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordina > tor.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordina > tor.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( > KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread. > runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread. > run(StreamThread.java:361) > > > > On 24 April 2017 at 16:02, Eno Thereska <eno.there...@gmail.com> wrote: > > > Hi Sachin, > > > > In KIP-62 a background heartbeat thread was introduced to deal with the > > group protocol arrivals and departures. There is a setting called > > session.timeout.ms that specifies the timeout of that background thread. > > So if the thread has died that background thread will also die and the > > right thing will happen. > > > > Eno > > > > > On 24 Apr 2017, at 15:34, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > > > I had a question about this setting > > > ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, > > Integer.toString(Integer.MAX_ > > > VALUE) > > > > > > How would the broker know if a thread has died or say we simply stopped > > an > > > instance and needs to be booted out of the group. > > > > > > Thanks > > > Sachin > > > > > > > > > On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com> > > > wrote: > > > > > >> Hi Ian, > > >> > > >> > > >> This is now fixed in 0.10.2.1. The default configuration need > tweaking. > > If > > >> you can't pick that up (it's currently being voted), make sure you > have > > >> these two parameters set as follows in your streams config: > > >> > > >> final Properties props = new Properties(); > > >> ... > > >> props.put(ProducerConfig.RETRIES_CONFIG, 10); <---- increase to 10 > > from > > >> default of 0 > > >> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, > > >> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity > > >> from default of 300 s > > >> > > >> Thanks > > >> Eno > > >> > > >>> On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote: > > >>> > > >>> Hi All, > > >>> > > >>> We're running multiple Kafka Stream applications using Kafka client > > >>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1 > > >>> Additionally, we're running Kafka Connect 0.10.2.0 with the > > ElasticSearch > > >>> connector by confluent [1] > > >>> > > >>> On an ISR change occurring on the brokers, all of the streams > > >> applications > > >>> and the Kafka connect ES connector threw exceptions and never > > recovered. > > >>> > > >>> We've seen a correlation between Kafka Broker ISR change and stream > > >>> applications dying. > > >>> > > >>> The logs from the streams applications throw out the following and > fail > > >> to > > >>> recover: > > >>> > > >>> 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 > > >>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1] > > >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from > > >> RUNNING > > >>> to NOT_RUNNING > > >>> 07:01:23.323 stream-processor /var/log/application.log 2017-04-24 > > >>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application - > > >>> Unexpected Exception caught in thread [StreamThread-1]: > > >>> org.apache.kafka.streams.errors.StreamsException: Exception caught > in > > >>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000, > > >>> topic=kafka-topic, partition=81, offset=479285 > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> StreamTask.process(StreamTask.java:216) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > >> StreamThread.java:641) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> StreamThread.run(StreamThread.java:368) > > >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task > > [0_81] > > >>> exception caught when producing > > >>> at > > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > > >> checkForException(RecordCollectorImpl.java:119) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > RecordCollectorImpl.send( > > >> RecordCollectorImpl.java:76) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.SinkNode. > > >> process(SinkNode.java:79) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > >>> at > > >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$ > > >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > > >> ProcessorNode.java:48) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > >> measureLatencyNs(StreamsMetricsImpl.java:188) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > >> ProcessorNode.java:134) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> SourceNode.process(SourceNode.java:70) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> StreamTask.process(StreamTask.java:197) > > >>> ... 2 common frames omitted > > >>> Caused by: org.apache.kafka.common.errors. > > NotLeaderForPartitionException > > >> : > > >>> This server is not the leader for that topic-partition. > > >>> 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 > > >>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3] > > >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from > > >> RUNNING > > >>> to NOT_RUNNING > > >>> 07:01:23.558 stream-processor /var/log/application.log 2017-04-24 > > >>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application - > > >>> Unexpected Exception caught in thread [StreamThread-3]: > > >>> org.apache.kafka.streams.errors.StreamsException: Exception caught > in > > >>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000, > > >>> topic=kafka-topic, partition=55, offset=479308 > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> StreamTask.process(StreamTask.java:216) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > >> StreamThread.java:641) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> StreamThread.run(StreamThread.java:368) > > >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task > > [0_55] > > >>> exception caught when producing > > >>> at > > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > > >> checkForException(RecordCollectorImpl.java:119) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > RecordCollectorImpl.send( > > >> RecordCollectorImpl.java:76) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.SinkNode. > > >> process(SinkNode.java:79) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > >>> at > > >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$ > > >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > > >> ProcessorNode.java:48) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > >> measureLatencyNs(StreamsMetricsImpl.java:188) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > >> ProcessorNode.java:134) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> SourceNode.process(SourceNode.java:70) > > >>> at > > >>> org.apache.kafka.streams.processor.internals. > > >> StreamTask.process(StreamTask.java:197) > > >>> ... 2 common frames omitted > > >>> Caused by: org.apache.kafka.common.errors. > > NotLeaderForPartitionException > > >> : > > >>> This server is not the leader for that topic-partition. > > >>> > > >>> Are we potentially doing something wrong with our streams > > >>> configuration/usage? Or does this look like a bug? > > >>> > > >>> Thanks, > > >>> Ian. > > >>> > > >>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch > > >> > > >> > > > > >