Hi Jun, Please see my comments inline again :)
On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote: > This indicates our in-memory queue is empty. So the consumer thread is > blocked. What should we do about this. As I mentioned in the previous mail, events are there to be consumed. Killing one consumer makes the other consumer consume events again. > What about the Kafka fetcher threads? Are they blocked on anything? One of the fetcher threads is blocked on putting to a queue, the other is sleeping. Please look below: "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on condition [0x00007fcb833eb000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000006809e8000> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61) at kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on condition [0x00007fcb836ee000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99) > > Thanks, > > Jun > > > On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com> wrote: > >> Hello Jun, >> >> Please see my comments inline. >> >> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote: >> >>> I assume that each consumer instance consumes all 15 topics. >> No, we kept dedicated consumer listening to the topic in question. >> We did this because this queue processes huge amounts of data. >> >> >>> Are all your >>> consumer threads alive? If one of your thread dies, it will eventually >>> block the consumption in other threads. >> >> Yes. We can see all the threads in the thread dump. >> We have ensured that the threads do not die due to an Exception. >> >> Please look at the stack trace below. We see all the threads waiting like >> this: >> >> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on >> condition [0x00007efedae6d000] >> java.lang.Thread.State: WAITING (parking) >> at sun.misc.Unsafe.park(Native Method) >> - parking to wait for <0x0000000640248618> (a >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >> at >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) >> at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) >> at >> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) >> at >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60) >> at >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32) >> at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> at >> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >> at >> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >> at java.lang.Thread.run(Thread.java:662) >> >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <npur...@sprinklr.com> >> wrote: >>> >>>> Hi, >>>> >>>> We are using kafka-0.7.2 with zookeeper (3.4.5) >>>> >>>> Our cluster configuration: >>>> 3 brokers on 3 different machines. Each broker machine has a zookeeper >>>> instance running as well. >>>> We have 15 topics defined. We are trying to use them as queue (JMS like) >>>> by defining the same group across different kafka consumers. >>>> On the consumer side, we are using High Level Consumer. >>>> >>>> However we are seeing a weird behaviour. >>>> One of our heavily used queue (event_queue) has 2 dedicated consumers >>>> listening to that queue only. >>>> This queue is defined with 150 partitions on each broker & the number of >>>> streams defined on the 2 dedicated consumers is 150. >>>> After a while we see that most the consumer threads keep waiting for >>>> events and the lag keeps growing. >>>> If we kill one of the dedicated consumers, then the other consumer >> starts >>>> getting messaging in a hurry. >>>> >>>> Consumer had no Full GCs. >>>> >>>> How we measure lag? >>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group >>>> event_queue --zkconnect >>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic >> event_queue >>>> >>>> Around the time, the events stopped coming to the new consumer.. this >> was >>>> printed on the logs: >>>> >>>> [INFO] zookeeper state changed (Disconnected) >>>> [INFO] zookeeper state changed (Disconnected) >>>> [INFO] zookeeper state changed (SyncConnected) >>>> [INFO] zookeeper state changed (SyncConnected) >>>> >>>> Config Overidden: >>>> Consumer: >>>> fetch.size=3MB >>>> autooffset.reset=largest >>>> autocommit.interval.ms=500 >>>> Producer: >>>> maxMessageSize=3MB >>>> >>>> Please let us know if we are doing some wrong OR facing some known issue >>>> here? >>>> >>>> Thanks, >>>> Nihit >> >>