queued.max.message.chunks controls the consumer's fetcher queue. On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote:
> HI Neha, > > If I solved the problem number 1 think and number 2 will be solved (prob > 1 is causing problem number 2(blocked)). Can you please let me know what > controls the queue size for *ConsumerFetcherThread* thread ? > > > Please see the attached java source code which will reproduce the > problem. You may remove the recovery process... Please check. We have to > do some work before we start reading from Kafka Stream Interator and this > seems to cause some issue with java.lang. > IllegalStateException: Iterator is in failed state*. > > Please let me know your finding and recommendation. > > Thanks, > > Bhavesh > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <neha.narkh...@gmail.com> > wrote: > >> >> Sometime it give following exception. >> >> It will help to have a more specific test case that reproduces the failed >> iterator state. >> >> Also, the consumer threads block if the fetcher queue is full. The queue >> can fill up if your consumer thread dies or slows down. I'd recommend you >> ensure that all your consumer threads are alive. You can take a thread >> dump >> to verify this. >> >> Thanks, >> Neha >> >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry < >> mistry.p.bhav...@gmail.com> >> wrote: >> >> > Hi Neha, >> > >> > >> > I have two problems:. Any help is greatly appreciated. >> > >> > >> > 1)* java.lang.IllegalStateException: Iterator is in failed state* >> > >> > ConsumerConnector consumerConnector = Consumer >> > .createJavaConsumerConnector(getConsumerConfig()); >> > Map<String, Integer> topicCountMap = new HashMap<String, >> > Integer>(); >> > topicCountMap.put(topic, *32*); >> > Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamMap = >> > consumerConnector >> > .createMessageStreams(topicCountMap); >> > >> > List<KafkaStream<byte[], byte[]>> streams = >> > Collections.synchronizedList(topicStreamMap.get(topic)); >> > >> > AppStaticInfo info = Mupd8Main.STATICINFO(); >> > >> > Iterator<KafkaStream<byte[], byte[]>> iterator = >> > streams.iterator(); >> > // remove the head first list for this source...rest are for the >> > Dynamic Souce... >> > mainIterator = iterator.next().iterator(); >> > >> > List<ConsumerIterator<byte[], byte[]>> iteratorList = new >> > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size()); >> > // now rest of the iterator must be registered now.. >> > while(iterator.hasNext()){ >> > iteratorList.add(iterator.next().iterator()); >> > } >> > *KafkaStreamRegistory.registerStream(mainSourceName, >> > iteratorList);* >> > >> > Once the Consumer iterator is created and registered. We use this in >> > another thread to start reading from the Consumer Iterator. Sometime >> it >> > give following exception. >> > >> > 24 Oct 2014 16:03:25,923 ERROR >> > [SourceReader:request_source:LogStreamKafkaSource1] >> > (grizzled.slf4j.Logger.error:116) - SourceThread: exception during >> reads. >> > Swallowed to continue next read. >> > java.lang.IllegalStateException: Iterator is in failed state >> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) >> > >> > >> > I have tried to recover from this state by using this: >> > iterator.resetState(); but it does not recover sometime. >> > >> > >> > >> > >> > *2) ConsumerFetcherThread are blocked on enqueue ? What controls size >> of >> > queue ? Why are they blocked ? *Due to this our lags are increasing. >> our >> > threads blocked on hasNext()... >> > >> > >> > >> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1" >> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition >> > [0x0000000116379000] >> > java.lang.Thread.State: WAITING (parking) >> > at sun.misc.Unsafe.park(Native Method) >> > - parking to wait for <0x0000000704019388> (a >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >> > at >> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) >> > at >> > >> > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) >> > at >> > >> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) >> > at >> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) >> > at >> > >> > >> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112) >> > at >> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) >> > at >> > >> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) >> > at >> > >> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) >> > at kafka.utils.Utils$.inLock(Utils.scala:535) >> > at >> > >> > >> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111) >> > at >> > >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) >> > at >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >> > >> > >> > >> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2" >> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition >> > [0x0000000116276000] >> > java.lang.Thread.State: WAITING (parking) >> > at sun.misc.Unsafe.park(Native Method) >> > - parking to wait for <0x0000000704064ce0> (a >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >> > at >> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) >> > at >> > >> > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) >> > at >> > >> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) >> > at >> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) >> > at >> > >> > >> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112) >> > at >> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) >> > at >> > >> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) >> > at >> > >> > >> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) >> > at kafka.utils.Utils$.inLock(Utils.scala:535) >> > at >> > >> > >> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111) >> > at >> > >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) >> > at >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >> > >> > >> > >> > >> > >> > >> > Thanks, >> > >> > Bhavesh >> > >> > >> > >> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <neha.narkh...@gmail.com >> > >> > wrote: >> > >> > > Can you provide the steps to reproduce this issue? >> > > >> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry < >> > > mistry.p.bhav...@gmail.com> >> > > wrote: >> > > >> > > > I am using one from the Kafka Trunk branch. >> > > > >> > > > Thanks, >> > > > >> > > > Bhavesh >> > > > >> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede < >> > neha.narkh...@gmail.com> >> > > > wrote: >> > > > >> > > > > Which version of Kafka are you using on the consumer? >> > > > > >> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry < >> > > > > mistry.p.bhav...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > HI Kafka Community , >> > > > > > >> > > > > > I am using kafka trunk source code and I get following >> exception. >> > > What >> > > > > > could cause the iterator to have FAILED state. Please let me >> know >> > > how >> > > > I >> > > > > > can fix this issue. >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > *java.lang.IllegalStateException: Iterator is in failed state >> at >> > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)* >> > > > > > Here is Properties: >> > > > > > >> > > > > > Properties props = new Properties(); >> > > > > > props.put("zookeeper.connect", zkConnect); >> > > > > > props.put("group.id", groupId); >> > > > > > * props.put("consumer.timeout.ms < >> > http://consumer.timeout.ms >> > > >", >> > > > > > "-1");* >> > > > > > props.put("zookeeper.session.timeout.ms", "10000"); >> > > > > > props.put("zookeeper.sync.time.ms", "6000"); >> > > > > > props.put("auto.commit.interval.ms", "2000"); >> > > > > > props.put("rebalance.max.retries", "8"); >> > > > > > props.put("auto.offset.reset", "largest"); >> > > > > > props.put("fetch.message.max.bytes","2097152"); >> > > > > > props.put("socket.receive.buffer.bytes","2097152"); >> > > > > > props.put("auto.commit.enable","true"); >> > > > > > >> > > > > > >> > > > > > Thanks, >> > > > > > >> > > > > > Bhavesh >> > > > > > >> > > > > >> > > > >> > > >> > >> > >