Hi Neha,
I have two problems:. Any help is greatly appreciated. 1)* java.lang.IllegalStateException: Iterator is in failed state* ConsumerConnector consumerConnector = Consumer .createJavaConsumerConnector(getConsumerConfig()); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, *32*); Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamMap = consumerConnector .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = Collections.synchronizedList(topicStreamMap.get(topic)); AppStaticInfo info = Mupd8Main.STATICINFO(); Iterator<KafkaStream<byte[], byte[]>> iterator = streams.iterator(); // remove the head first list for this source...rest are for the Dynamic Souce... mainIterator = iterator.next().iterator(); List<ConsumerIterator<byte[], byte[]>> iteratorList = new ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size()); // now rest of the iterator must be registered now.. while(iterator.hasNext()){ iteratorList.add(iterator.next().iterator()); } *KafkaStreamRegistory.registerStream(mainSourceName, iteratorList);* Once the Consumer iterator is created and registered. We use this in another thread to start reading from the Consumer Iterator. Sometime it give following exception. 24 Oct 2014 16:03:25,923 ERROR [SourceReader:request_source:LogStreamKafkaSource1] (grizzled.slf4j.Logger.error:116) - SourceThread: exception during reads. Swallowed to continue next read. java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) I have tried to recover from this state by using this: iterator.resetState(); but it does not recover sometime. *2) ConsumerFetcherThread are blocked on enqueue ? What controls size of queue ? Why are they blocked ? *Due to this our lags are increasing. our threads blocked on hasNext()... "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1" prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition [0x0000000116379000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000704019388> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2" prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition [0x0000000116276000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000704064ce0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Thanks, Bhavesh On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <neha.narkh...@gmail.com> wrote: > Can you provide the steps to reproduce this issue? > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com> > wrote: > > > I am using one from the Kafka Trunk branch. > > > > Thanks, > > > > Bhavesh > > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <neha.narkh...@gmail.com> > > wrote: > > > > > Which version of Kafka are you using on the consumer? > > > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry < > > > mistry.p.bhav...@gmail.com> > > > wrote: > > > > > > > HI Kafka Community , > > > > > > > > I am using kafka trunk source code and I get following exception. > What > > > > could cause the iterator to have FAILED state. Please let me know > how > > I > > > > can fix this issue. > > > > > > > > > > > > > > > > > > > > > > > > *java.lang.IllegalStateException: Iterator is in failed state at > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)* > > > > Here is Properties: > > > > > > > > Properties props = new Properties(); > > > > props.put("zookeeper.connect", zkConnect); > > > > props.put("group.id", groupId); > > > > * props.put("consumer.timeout.ms <http://consumer.timeout.ms > >", > > > > "-1");* > > > > props.put("zookeeper.session.timeout.ms", "10000"); > > > > props.put("zookeeper.sync.time.ms", "6000"); > > > > props.put("auto.commit.interval.ms", "2000"); > > > > props.put("rebalance.max.retries", "8"); > > > > props.put("auto.offset.reset", "largest"); > > > > props.put("fetch.message.max.bytes","2097152"); > > > > props.put("socket.receive.buffer.bytes","2097152"); > > > > props.put("auto.commit.enable","true"); > > > > > > > > > > > > Thanks, > > > > > > > > Bhavesh > > > > > > > > > >