queued.max.message.chunks controls the consumer's fetcher queue.

On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com>
wrote:

> HI Neha,
>
> If I solved the problem number 1 think and number 2 will be solved  (prob
> 1 is causing problem number 2(blocked)).  Can you please let me know what
> controls the queue size for *ConsumerFetcherThread* thread ?
>
>
> Please see the attached java source code which will reproduce the
> problem.  You may remove the recovery process...  Please check.  We have to
> do some work before we start reading from Kafka Stream Interator and this
> seems to cause some issue with java.lang.
> IllegalStateException: Iterator is in failed state*.
>
> Please let me know your finding and recommendation.
>
> Thanks,
>
> Bhavesh
>
> On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <neha.narkh...@gmail.com>
> wrote:
>
>> >> Sometime it give following exception.
>>
>> It will help to have a more specific test case that reproduces the failed
>> iterator state.
>>
>> Also, the consumer threads block if the fetcher queue is full. The queue
>> can fill up if your consumer thread dies or slows down. I'd recommend you
>> ensure that all your consumer threads are alive. You can take a thread
>> dump
>> to verify this.
>>
>> Thanks,
>> Neha
>>
>> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
>> mistry.p.bhav...@gmail.com>
>> wrote:
>>
>> > Hi Neha,
>> >
>> >
>> > I have two problems:.  Any help is greatly appreciated.
>> >
>> >
>> > 1)* java.lang.IllegalStateException: Iterator is in failed state*
>> >
>> >        ConsumerConnector  consumerConnector = Consumer
>> >                 .createJavaConsumerConnector(getConsumerConfig());
>> >         Map<String, Integer> topicCountMap = new HashMap<String,
>> > Integer>();
>> >         topicCountMap.put(topic, *32*);
>> >         Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamMap =
>> > consumerConnector
>> >                 .createMessageStreams(topicCountMap);
>> >
>> >         List<KafkaStream<byte[], byte[]>> streams =
>> > Collections.synchronizedList(topicStreamMap.get(topic));
>> >
>> >         AppStaticInfo info = Mupd8Main.STATICINFO();
>> >
>> >         Iterator<KafkaStream<byte[], byte[]>> iterator =
>> > streams.iterator();
>> >         // remove the head first list for this source...rest are for the
>> > Dynamic Souce...
>> >         mainIterator = iterator.next().iterator();
>> >
>> >         List<ConsumerIterator<byte[], byte[]>> iteratorList = new
>> > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
>> >         // now rest of the iterator must be registered now..
>> >         while(iterator.hasNext()){
>> >             iteratorList.add(iterator.next().iterator());
>> >         }
>> >         *KafkaStreamRegistory.registerStream(mainSourceName,
>> > iteratorList);*
>> >
>> > Once the Consumer iterator is created and registered.  We use this in
>> > another thread to start reading from the Consumer Iterator.   Sometime
>> it
>> > give following exception.
>> >
>> > 24 Oct 2014 16:03:25,923 ERROR
>> > [SourceReader:request_source:LogStreamKafkaSource1]
>> > (grizzled.slf4j.Logger.error:116)  - SourceThread: exception during
>> reads.
>> > Swallowed to continue next read.
>> > java.lang.IllegalStateException: Iterator is in failed state
>> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>> >
>> >
>> > I have tried to recover from this state by using this:
>> > iterator.resetState(); but it does not recover sometime.
>> >
>> >
>> >
>> >
>> > *2) ConsumerFetcherThread are blocked on enqueue ?  What controls size
>> of
>> > queue ? Why are they blocked ?  *Due to this our lags are increasing.
>> our
>> > threads blocked on hasNext()...
>> >
>> >
>> >
>> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
>> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition
>> > [0x0000000116379000]
>> >    java.lang.Thread.State: WAITING (parking)
>> >         at sun.misc.Unsafe.park(Native Method)
>> >         - parking to wait for  <0x0000000704019388> (a
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>> >         at
>> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>> >         at
>> >
>> >
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>> >         at
>> >
>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
>> >         at
>> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>> >         at
>> >
>> >
>> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
>> >         at
>> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>> >         at
>> >
>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>> >         at
>> >
>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
>> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
>> >         at
>> >
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>> >         at
>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>> >
>> >
>> >
>> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
>> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition
>> > [0x0000000116276000]
>> >    java.lang.Thread.State: WAITING (parking)
>> >         at sun.misc.Unsafe.park(Native Method)
>> >         - parking to wait for  <0x0000000704064ce0> (a
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>> >         at
>> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>> >         at
>> >
>> >
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>> >         at
>> >
>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
>> >         at
>> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>> >         at
>> >
>> >
>> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
>> >         at
>> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>> >         at
>> >
>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
>> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
>> >         at
>> >
>> >
>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
>> >         at
>> >
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>> >         at
>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>> >
>> >
>> >
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Bhavesh
>> >
>> >
>> >
>> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <neha.narkh...@gmail.com
>> >
>> > wrote:
>> >
>> > > Can you provide the steps to reproduce this issue?
>> > >
>> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
>> > > mistry.p.bhav...@gmail.com>
>> > > wrote:
>> > >
>> > > > I am using one from the Kafka Trunk branch.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Bhavesh
>> > > >
>> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <
>> > neha.narkh...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > Which version of Kafka are you using on the consumer?
>> > > > >
>> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
>> > > > > mistry.p.bhav...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > HI Kafka Community ,
>> > > > > >
>> > > > > > I am using kafka trunk source code and I get following
>> exception.
>> > > What
>> > > > > > could cause the iterator to have FAILED state.  Please let me
>> know
>> > > how
>> > > > I
>> > > > > > can fix this issue.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > *java.lang.IllegalStateException: Iterator is in failed state
>>   at
>> > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
>> > > > > > Here is Properties:
>> > > > > >
>> > > > > >         Properties props = new Properties();
>> > > > > >         props.put("zookeeper.connect", zkConnect);
>> > > > > >         props.put("group.id", groupId);
>> > > > > > *        props.put("consumer.timeout.ms <
>> > http://consumer.timeout.ms
>> > > >",
>> > > > > > "-1");*
>> > > > > >         props.put("zookeeper.session.timeout.ms", "10000");
>> > > > > >         props.put("zookeeper.sync.time.ms", "6000");
>> > > > > >         props.put("auto.commit.interval.ms", "2000");
>> > > > > >         props.put("rebalance.max.retries", "8");
>> > > > > >         props.put("auto.offset.reset", "largest");
>> > > > > >         props.put("fetch.message.max.bytes","2097152");
>> > > > > >         props.put("socket.receive.buffer.bytes","2097152");
>> > > > > >         props.put("auto.commit.enable","true");
>> > > > > >
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Bhavesh
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to