The IllegalStateException typically happens if you call next() before
hasNext() on the iterator.

Thanks,

Jun

On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com
> wrote:

> Hi Neha,
>
> Thanks for your answer.  Can you please let me know how I can resolve the
> Iterator IllegalStateException ?  I would appreciate your is this is bug I
> can file one or let me know if this is use case specific ?
>
> Thanks,
>
> Bhavesh
>
> On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <neha.narkh...@gmail.com>
> wrote:
>
> > queued.max.message.chunks controls the consumer's fetcher queue.
> >
> > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com>
> > wrote:
> >
> > > HI Neha,
> > >
> > > If I solved the problem number 1 think and number 2 will be solved
> (prob
> > > 1 is causing problem number 2(blocked)).  Can you please let me know
> what
> > > controls the queue size for *ConsumerFetcherThread* thread ?
> > >
> > >
> > > Please see the attached java source code which will reproduce the
> > > problem.  You may remove the recovery process...  Please check.  We
> have
> > to
> > > do some work before we start reading from Kafka Stream Interator and
> this
> > > seems to cause some issue with java.lang.
> > > IllegalStateException: Iterator is in failed state*.
> > >
> > > Please let me know your finding and recommendation.
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <
> neha.narkh...@gmail.com>
> > > wrote:
> > >
> > >> >> Sometime it give following exception.
> > >>
> > >> It will help to have a more specific test case that reproduces the
> > failed
> > >> iterator state.
> > >>
> > >> Also, the consumer threads block if the fetcher queue is full. The
> queue
> > >> can fill up if your consumer thread dies or slows down. I'd recommend
> > you
> > >> ensure that all your consumer threads are alive. You can take a thread
> > >> dump
> > >> to verify this.
> > >>
> > >> Thanks,
> > >> Neha
> > >>
> > >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> > >> mistry.p.bhav...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Neha,
> > >> >
> > >> >
> > >> > I have two problems:.  Any help is greatly appreciated.
> > >> >
> > >> >
> > >> > 1)* java.lang.IllegalStateException: Iterator is in failed state*
> > >> >
> > >> >        ConsumerConnector  consumerConnector = Consumer
> > >> >                 .createJavaConsumerConnector(getConsumerConfig());
> > >> >         Map<String, Integer> topicCountMap = new HashMap<String,
> > >> > Integer>();
> > >> >         topicCountMap.put(topic, *32*);
> > >> >         Map<String, List<KafkaStream<byte[], byte[]>>>
> topicStreamMap
> > =
> > >> > consumerConnector
> > >> >                 .createMessageStreams(topicCountMap);
> > >> >
> > >> >         List<KafkaStream<byte[], byte[]>> streams =
> > >> > Collections.synchronizedList(topicStreamMap.get(topic));
> > >> >
> > >> >         AppStaticInfo info = Mupd8Main.STATICINFO();
> > >> >
> > >> >         Iterator<KafkaStream<byte[], byte[]>> iterator =
> > >> > streams.iterator();
> > >> >         // remove the head first list for this source...rest are for
> > the
> > >> > Dynamic Souce...
> > >> >         mainIterator = iterator.next().iterator();
> > >> >
> > >> >         List<ConsumerIterator<byte[], byte[]>> iteratorList = new
> > >> > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
> > >> >         // now rest of the iterator must be registered now..
> > >> >         while(iterator.hasNext()){
> > >> >             iteratorList.add(iterator.next().iterator());
> > >> >         }
> > >> >         *KafkaStreamRegistory.registerStream(mainSourceName,
> > >> > iteratorList);*
> > >> >
> > >> > Once the Consumer iterator is created and registered.  We use this
> in
> > >> > another thread to start reading from the Consumer Iterator.
>  Sometime
> > >> it
> > >> > give following exception.
> > >> >
> > >> > 24 Oct 2014 16:03:25,923 ERROR
> > >> > [SourceReader:request_source:LogStreamKafkaSource1]
> > >> > (grizzled.slf4j.Logger.error:116)  - SourceThread: exception during
> > >> reads.
> > >> > Swallowed to continue next read.
> > >> > java.lang.IllegalStateException: Iterator is in failed state
> > >> >     at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> > >> >
> > >> >
> > >> > I have tried to recover from this state by using this:
> > >> > iterator.resetState(); but it does not recover sometime.
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > *2) ConsumerFetcherThread are blocked on enqueue ?  What controls
> size
> > >> of
> > >> > queue ? Why are they blocked ?  *Due to this our lags are
> increasing.
> > >> our
> > >> > threads blocked on hasNext()...
> > >> >
> > >> >
> > >> >
> > >>
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> > >> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition
> > >> > [0x0000000116379000]
> > >> >    java.lang.Thread.State: WAITING (parking)
> > >> >         at sun.misc.Unsafe.park(Native Method)
> > >> >         - parking to wait for  <0x0000000704019388> (a
> > >> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >> >         at
> > >> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >> >         at
> > >> >
> > >>
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > >> >         at
> > >> >
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > >> >         at
> > >> >
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >> >         at
> > >> >
> > >>
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >> >         at
> > >> >
> > >>
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > >> >         at
> > >> >
> > >>
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > >> >         at
> > >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >> >
> > >> >
> > >> >
> > >>
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
> > >> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition
> > >> > [0x0000000116276000]
> > >> >    java.lang.Thread.State: WAITING (parking)
> > >> >         at sun.misc.Unsafe.park(Native Method)
> > >> >         - parking to wait for  <0x0000000704064ce0> (a
> > >> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >> >         at
> > >> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >> >         at
> > >> >
> > >>
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > >> >         at
> > >> >
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > >> >         at
> > >> >
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >> >         at
> > >> >
> > >>
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > >> >         at
> > >> >
> > >>
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > >> >         at
> > >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Bhavesh
> > >> >
> > >> >
> > >> >
> > >> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Can you provide the steps to reproduce this issue?
> > >> > >
> > >> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> > >> > > mistry.p.bhav...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > > > I am using one from the Kafka Trunk branch.
> > >> > > >
> > >> > > > Thanks,
> > >> > > >
> > >> > > > Bhavesh
> > >> > > >
> > >> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <
> > >> > neha.narkh...@gmail.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Which version of Kafka are you using on the consumer?
> > >> > > > >
> > >> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> > >> > > > > mistry.p.bhav...@gmail.com>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > HI Kafka Community ,
> > >> > > > > >
> > >> > > > > > I am using kafka trunk source code and I get following
> > >> exception.
> > >> > > What
> > >> > > > > > could cause the iterator to have FAILED state.  Please let
> me
> > >> know
> > >> > > how
> > >> > > > I
> > >> > > > > > can fix this issue.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > *java.lang.IllegalStateException: Iterator is in failed
> state
> > >>   at
> > >> > > > > >
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > >> > > > > > Here is Properties:
> > >> > > > > >
> > >> > > > > >         Properties props = new Properties();
> > >> > > > > >         props.put("zookeeper.connect", zkConnect);
> > >> > > > > >         props.put("group.id", groupId);
> > >> > > > > > *        props.put("consumer.timeout.ms <
> > >> > http://consumer.timeout.ms
> > >> > > >",
> > >> > > > > > "-1");*
> > >> > > > > >         props.put("zookeeper.session.timeout.ms", "10000");
> > >> > > > > >         props.put("zookeeper.sync.time.ms", "6000");
> > >> > > > > >         props.put("auto.commit.interval.ms", "2000");
> > >> > > > > >         props.put("rebalance.max.retries", "8");
> > >> > > > > >         props.put("auto.offset.reset", "largest");
> > >> > > > > >         props.put("fetch.message.max.bytes","2097152");
> > >> > > > > >         props.put("socket.receive.buffer.bytes","2097152");
> > >> > > > > >         props.put("auto.commit.enable","true");
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > >
> > >> > > > > > Bhavesh
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to