Do you have a simple test that can reproduce this issue? Thanks,
Jun On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > HI Jun, > > Consumer Connector is not closed because I can see the ConsumerFetcher > Thread alive but Blocked on *put* and hasNext() is blocked on *take*. > This is what I see after recovery. > > > > Thanks, > > Bhavesh > > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao <jun...@gmail.com> wrote: > > > Another possibility is that the consumer connector is already closed and > > then you call hasNext() on the iterator. > > > > Thanks, > > > > > > Jun > > > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com> > > wrote: > > > > > Hi Jun, > > > > > > The hasNext() itself throws this error. I have to manually reset state > > and > > > sometime it is able to recover and other it is not. Any other clue ? > > > > > > public boolean hasNext() { > > > LOG.info("called of hasNext() :"); > > > int retry = 3; > > > while(retry > 0){ > > > try{ > > > // this hasNext is blocking call.. > > > boolean result = iterator.hasNext(); > > > return result; > > > }catch(IllegalStateException exp){ > > > iterator.resetState(); > > > LOG.error("GOT IllegalStateException arg trying to > > > recover....", exp); > > > retry--; > > > } > > > } > > > return false; > > > } > > > > > > Thanks, > > > > > > Bhavesh > > > > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > The IllegalStateException typically happens if you call next() before > > > > hasNext() on the iterator. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry < > > > > mistry.p.bhav...@gmail.com > > > > > wrote: > > > > > > > > > Hi Neha, > > > > > > > > > > Thanks for your answer. Can you please let me know how I can > resolve > > > the > > > > > Iterator IllegalStateException ? I would appreciate your is this > is > > > bug > > > > I > > > > > can file one or let me know if this is use case specific ? > > > > > > > > > > Thanks, > > > > > > > > > > Bhavesh > > > > > > > > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede < > > > neha.narkh...@gmail.com> > > > > > wrote: > > > > > > > > > > > queued.max.message.chunks controls the consumer's fetcher queue. > > > > > > > > > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry < > > > > > > mistry.p.bhav...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > HI Neha, > > > > > > > > > > > > > > If I solved the problem number 1 think and number 2 will be > > solved > > > > > (prob > > > > > > > 1 is causing problem number 2(blocked)). Can you please let me > > > know > > > > > what > > > > > > > controls the queue size for *ConsumerFetcherThread* thread ? > > > > > > > > > > > > > > > > > > > > > Please see the attached java source code which will reproduce > the > > > > > > > problem. You may remove the recovery process... Please check. > > We > > > > > have > > > > > > to > > > > > > > do some work before we start reading from Kafka Stream > Interator > > > and > > > > > this > > > > > > > seems to cause some issue with java.lang. > > > > > > > IllegalStateException: Iterator is in failed state*. > > > > > > > > > > > > > > Please let me know your finding and recommendation. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Bhavesh > > > > > > > > > > > > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede < > > > > > neha.narkh...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > >> >> Sometime it give following exception. > > > > > > >> > > > > > > >> It will help to have a more specific test case that reproduces > > the > > > > > > failed > > > > > > >> iterator state. > > > > > > >> > > > > > > >> Also, the consumer threads block if the fetcher queue is full. > > The > > > > > queue > > > > > > >> can fill up if your consumer thread dies or slows down. I'd > > > > recommend > > > > > > you > > > > > > >> ensure that all your consumer threads are alive. You can take > a > > > > thread > > > > > > >> dump > > > > > > >> to verify this. > > > > > > >> > > > > > > >> Thanks, > > > > > > >> Neha > > > > > > >> > > > > > > >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry < > > > > > > >> mistry.p.bhav...@gmail.com> > > > > > > >> wrote: > > > > > > >> > > > > > > >> > Hi Neha, > > > > > > >> > > > > > > > >> > > > > > > > >> > I have two problems:. Any help is greatly appreciated. > > > > > > >> > > > > > > > >> > > > > > > > >> > 1)* java.lang.IllegalStateException: Iterator is in failed > > > state* > > > > > > >> > > > > > > > >> > ConsumerConnector consumerConnector = Consumer > > > > > > >> > > > > .createJavaConsumerConnector(getConsumerConfig()); > > > > > > >> > Map<String, Integer> topicCountMap = new > > HashMap<String, > > > > > > >> > Integer>(); > > > > > > >> > topicCountMap.put(topic, *32*); > > > > > > >> > Map<String, List<KafkaStream<byte[], byte[]>>> > > > > > topicStreamMap > > > > > > = > > > > > > >> > consumerConnector > > > > > > >> > .createMessageStreams(topicCountMap); > > > > > > >> > > > > > > > >> > List<KafkaStream<byte[], byte[]>> streams = > > > > > > >> > Collections.synchronizedList(topicStreamMap.get(topic)); > > > > > > >> > > > > > > > >> > AppStaticInfo info = Mupd8Main.STATICINFO(); > > > > > > >> > > > > > > > >> > Iterator<KafkaStream<byte[], byte[]>> iterator = > > > > > > >> > streams.iterator(); > > > > > > >> > // remove the head first list for this source...rest > > are > > > > for > > > > > > the > > > > > > >> > Dynamic Souce... > > > > > > >> > mainIterator = iterator.next().iterator(); > > > > > > >> > > > > > > > >> > List<ConsumerIterator<byte[], byte[]>> iteratorList > = > > > new > > > > > > >> > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size()); > > > > > > >> > // now rest of the iterator must be registered now.. > > > > > > >> > while(iterator.hasNext()){ > > > > > > >> > iteratorList.add(iterator.next().iterator()); > > > > > > >> > } > > > > > > >> > *KafkaStreamRegistory.registerStream(mainSourceName, > > > > > > >> > iteratorList);* > > > > > > >> > > > > > > > >> > Once the Consumer iterator is created and registered. We > use > > > this > > > > > in > > > > > > >> > another thread to start reading from the Consumer Iterator. > > > > > Sometime > > > > > > >> it > > > > > > >> > give following exception. > > > > > > >> > > > > > > > >> > 24 Oct 2014 16:03:25,923 ERROR > > > > > > >> > [SourceReader:request_source:LogStreamKafkaSource1] > > > > > > >> > (grizzled.slf4j.Logger.error:116) - SourceThread: exception > > > > during > > > > > > >> reads. > > > > > > >> > Swallowed to continue next read. > > > > > > >> > java.lang.IllegalStateException: Iterator is in failed state > > > > > > >> > at > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) > > > > > > >> > > > > > > > >> > > > > > > > >> > I have tried to recover from this state by using this: > > > > > > >> > iterator.resetState(); but it does not recover sometime. > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > *2) ConsumerFetcherThread are blocked on enqueue ? What > > > controls > > > > > size > > > > > > >> of > > > > > > >> > queue ? Why are they blocked ? *Due to this our lags are > > > > > increasing. > > > > > > >> our > > > > > > >> > threads blocked on hasNext()... > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1" > > > > > > >> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on > condition > > > > > > >> > [0x0000000116379000] > > > > > > >> > java.lang.Thread.State: WAITING (parking) > > > > > > >> > at sun.misc.Unsafe.park(Native Method) > > > > > > >> > - parking to wait for <0x0000000704019388> (a > > > > > > >> > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > > > >> > at > > > > > > >> > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) > > > > > > >> > at > > > > > > >> > > > > > > > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112) > > > > > > >> > at > > > > > > >> > > > > > > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > > > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > > > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) > > > > > > >> > at kafka.utils.Utils$.inLock(Utils.scala:535) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > > > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) > > > > > > >> > at > > > > > > >> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2" > > > > > > >> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on > condition > > > > > > >> > [0x0000000116276000] > > > > > > >> > java.lang.Thread.State: WAITING (parking) > > > > > > >> > at sun.misc.Unsafe.park(Native Method) > > > > > > >> > - parking to wait for <0x0000000704064ce0> (a > > > > > > >> > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > > > >> > at > > > > > > >> > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) > > > > > > >> > at > > > > > > >> > > > > > > > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112) > > > > > > >> > at > > > > > > >> > > > > > > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > > > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) > > > > > > >> > at kafka.utils.Utils$.inLock(Utils.scala:535) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111) > > > > > > >> > at > > > > > > >> > > > > > > > >> > > > > > > > > > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) > > > > > > >> > at > > > > > > >> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > Thanks, > > > > > > >> > > > > > > > >> > Bhavesh > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede < > > > > > > neha.narkh...@gmail.com > > > > > > >> > > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> > > Can you provide the steps to reproduce this issue? > > > > > > >> > > > > > > > > >> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry < > > > > > > >> > > mistry.p.bhav...@gmail.com> > > > > > > >> > > wrote: > > > > > > >> > > > > > > > > >> > > > I am using one from the Kafka Trunk branch. > > > > > > >> > > > > > > > > > >> > > > Thanks, > > > > > > >> > > > > > > > > > >> > > > Bhavesh > > > > > > >> > > > > > > > > > >> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede < > > > > > > >> > neha.narkh...@gmail.com> > > > > > > >> > > > wrote: > > > > > > >> > > > > > > > > > >> > > > > Which version of Kafka are you using on the consumer? > > > > > > >> > > > > > > > > > > >> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry < > > > > > > >> > > > > mistry.p.bhav...@gmail.com> > > > > > > >> > > > > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > HI Kafka Community , > > > > > > >> > > > > > > > > > > > >> > > > > > I am using kafka trunk source code and I get > following > > > > > > >> exception. > > > > > > >> > > What > > > > > > >> > > > > > could cause the iterator to have FAILED state. > Please > > > let > > > > > me > > > > > > >> know > > > > > > >> > > how > > > > > > >> > > > I > > > > > > >> > > > > > can fix this issue. > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > *java.lang.IllegalStateException: Iterator is in > > failed > > > > > state > > > > > > >> at > > > > > > >> > > > > > > > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)* > > > > > > >> > > > > > Here is Properties: > > > > > > >> > > > > > > > > > > > >> > > > > > Properties props = new Properties(); > > > > > > >> > > > > > props.put("zookeeper.connect", zkConnect); > > > > > > >> > > > > > props.put("group.id", groupId); > > > > > > >> > > > > > * props.put("consumer.timeout.ms < > > > > > > >> > http://consumer.timeout.ms > > > > > > >> > > >", > > > > > > >> > > > > > "-1");* > > > > > > >> > > > > > props.put("zookeeper.session.timeout.ms", > > > > "10000"); > > > > > > >> > > > > > props.put("zookeeper.sync.time.ms", > "6000"); > > > > > > >> > > > > > props.put("auto.commit.interval.ms", > "2000"); > > > > > > >> > > > > > props.put("rebalance.max.retries", "8"); > > > > > > >> > > > > > props.put("auto.offset.reset", "largest"); > > > > > > >> > > > > > > > props.put("fetch.message.max.bytes","2097152"); > > > > > > >> > > > > > > > > > props.put("socket.receive.buffer.bytes","2097152"); > > > > > > >> > > > > > props.put("auto.commit.enable","true"); > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > Thanks, > > > > > > >> > > > > > > > > > > > >> > > > > > Bhavesh > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >