Hi Neha,

Thanks for your answer.  Can you please let me know how I can resolve the
Iterator IllegalStateException ?  I would appreciate your is this is bug I
can file one or let me know if this is use case specific ?

Thanks,

Bhavesh

On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <neha.narkh...@gmail.com>
wrote:

> queued.max.message.chunks controls the consumer's fetcher queue.
>
> On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > HI Neha,
> >
> > If I solved the problem number 1 think and number 2 will be solved  (prob
> > 1 is causing problem number 2(blocked)).  Can you please let me know what
> > controls the queue size for *ConsumerFetcherThread* thread ?
> >
> >
> > Please see the attached java source code which will reproduce the
> > problem.  You may remove the recovery process...  Please check.  We have
> to
> > do some work before we start reading from Kafka Stream Interator and this
> > seems to cause some issue with java.lang.
> > IllegalStateException: Iterator is in failed state*.
> >
> > Please let me know your finding and recommendation.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <neha.narkh...@gmail.com>
> > wrote:
> >
> >> >> Sometime it give following exception.
> >>
> >> It will help to have a more specific test case that reproduces the
> failed
> >> iterator state.
> >>
> >> Also, the consumer threads block if the fetcher queue is full. The queue
> >> can fill up if your consumer thread dies or slows down. I'd recommend
> you
> >> ensure that all your consumer threads are alive. You can take a thread
> >> dump
> >> to verify this.
> >>
> >> Thanks,
> >> Neha
> >>
> >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> >> mistry.p.bhav...@gmail.com>
> >> wrote:
> >>
> >> > Hi Neha,
> >> >
> >> >
> >> > I have two problems:.  Any help is greatly appreciated.
> >> >
> >> >
> >> > 1)* java.lang.IllegalStateException: Iterator is in failed state*
> >> >
> >> >        ConsumerConnector  consumerConnector = Consumer
> >> >                 .createJavaConsumerConnector(getConsumerConfig());
> >> >         Map<String, Integer> topicCountMap = new HashMap<String,
> >> > Integer>();
> >> >         topicCountMap.put(topic, *32*);
> >> >         Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamMap
> =
> >> > consumerConnector
> >> >                 .createMessageStreams(topicCountMap);
> >> >
> >> >         List<KafkaStream<byte[], byte[]>> streams =
> >> > Collections.synchronizedList(topicStreamMap.get(topic));
> >> >
> >> >         AppStaticInfo info = Mupd8Main.STATICINFO();
> >> >
> >> >         Iterator<KafkaStream<byte[], byte[]>> iterator =
> >> > streams.iterator();
> >> >         // remove the head first list for this source...rest are for
> the
> >> > Dynamic Souce...
> >> >         mainIterator = iterator.next().iterator();
> >> >
> >> >         List<ConsumerIterator<byte[], byte[]>> iteratorList = new
> >> > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
> >> >         // now rest of the iterator must be registered now..
> >> >         while(iterator.hasNext()){
> >> >             iteratorList.add(iterator.next().iterator());
> >> >         }
> >> >         *KafkaStreamRegistory.registerStream(mainSourceName,
> >> > iteratorList);*
> >> >
> >> > Once the Consumer iterator is created and registered.  We use this in
> >> > another thread to start reading from the Consumer Iterator.   Sometime
> >> it
> >> > give following exception.
> >> >
> >> > 24 Oct 2014 16:03:25,923 ERROR
> >> > [SourceReader:request_source:LogStreamKafkaSource1]
> >> > (grizzled.slf4j.Logger.error:116)  - SourceThread: exception during
> >> reads.
> >> > Swallowed to continue next read.
> >> > java.lang.IllegalStateException: Iterator is in failed state
> >> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> >> >
> >> >
> >> > I have tried to recover from this state by using this:
> >> > iterator.resetState(); but it does not recover sometime.
> >> >
> >> >
> >> >
> >> >
> >> > *2) ConsumerFetcherThread are blocked on enqueue ?  What controls size
> >> of
> >> > queue ? Why are they blocked ?  *Due to this our lags are increasing.
> >> our
> >> > threads blocked on hasNext()...
> >> >
> >> >
> >> >
> >>
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> >> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition
> >> > [0x0000000116379000]
> >> >    java.lang.Thread.State: WAITING (parking)
> >> >         at sun.misc.Unsafe.park(Native Method)
> >> >         - parking to wait for  <0x0000000704019388> (a
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >> >         at
> >> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >> >         at
> >> >
> >> >
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >> >         at
> >> >
> >>
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> >> >         at
> >> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> >> >         at
> >> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> >> >         at
> >> >
> >>
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >> >         at
> >> >
> >>
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> >> >         at
> >> >
> >>
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> >> >         at
> >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> >> >
> >> >
> >> >
> >>
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
> >> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition
> >> > [0x0000000116276000]
> >> >    java.lang.Thread.State: WAITING (parking)
> >> >         at sun.misc.Unsafe.park(Native Method)
> >> >         - parking to wait for  <0x0000000704064ce0> (a
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >> >         at
> >> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >> >         at
> >> >
> >> >
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >> >         at
> >> >
> >>
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> >> >         at
> >> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> >> >         at
> >> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> >> >         at
> >> >
> >>
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> >> >         at
> >> >
> >> >
> >>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> >> >         at
> >> >
> >>
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> >> >         at
> >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > Thanks,
> >> >
> >> > Bhavesh
> >> >
> >> >
> >> >
> >> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <
> neha.narkh...@gmail.com
> >> >
> >> > wrote:
> >> >
> >> > > Can you provide the steps to reproduce this issue?
> >> > >
> >> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> >> > > mistry.p.bhav...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > I am using one from the Kafka Trunk branch.
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Bhavesh
> >> > > >
> >> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <
> >> > neha.narkh...@gmail.com>
> >> > > > wrote:
> >> > > >
> >> > > > > Which version of Kafka are you using on the consumer?
> >> > > > >
> >> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> >> > > > > mistry.p.bhav...@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > HI Kafka Community ,
> >> > > > > >
> >> > > > > > I am using kafka trunk source code and I get following
> >> exception.
> >> > > What
> >> > > > > > could cause the iterator to have FAILED state.  Please let me
> >> know
> >> > > how
> >> > > > I
> >> > > > > > can fix this issue.
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > *java.lang.IllegalStateException: Iterator is in failed state
> >>   at
> >> > > > > >
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> >> > > > > > Here is Properties:
> >> > > > > >
> >> > > > > >         Properties props = new Properties();
> >> > > > > >         props.put("zookeeper.connect", zkConnect);
> >> > > > > >         props.put("group.id", groupId);
> >> > > > > > *        props.put("consumer.timeout.ms <
> >> > http://consumer.timeout.ms
> >> > > >",
> >> > > > > > "-1");*
> >> > > > > >         props.put("zookeeper.session.timeout.ms", "10000");
> >> > > > > >         props.put("zookeeper.sync.time.ms", "6000");
> >> > > > > >         props.put("auto.commit.interval.ms", "2000");
> >> > > > > >         props.put("rebalance.max.retries", "8");
> >> > > > > >         props.put("auto.offset.reset", "largest");
> >> > > > > >         props.put("fetch.message.max.bytes","2097152");
> >> > > > > >         props.put("socket.receive.buffer.bytes","2097152");
> >> > > > > >         props.put("auto.commit.enable","true");
> >> > > > > >
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > >
> >> > > > > > Bhavesh
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to