Hello Jun, Sorry for the delay in getting the logs. Here are the 3 logs from the 3 servers with trace level as suggested:
https://docs.google.com/file/d/0B5etsywBa-bkQnBESUJzNV9yRWc/edit?usp=sharing Please have a look and let us know if you need anything else to further debug this problem. Thanks, Nihit On 11-Jul-2013, at 4:41 PM, Nihit Purwar <npur...@sprinklr.com> wrote: > Hi Jun, > > I did put in only one topic while starting the consumer and have used the > same API "createMessageStreams". > As for the trace level logs of kafka consumer, we will send that to you soon. > > Thanks again for replying. > > Nihit > > On 10-Jul-2013, at 10:38 PM, Jun Rao <jun...@gmail.com> wrote: > >> Also, just so that we are on the same page. I assume that you used the >> following api. Did you just put in one topic in the topicCountMap? >> def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, >> List[KafkaStream[Array[Byte],Array[Byte]]]] >> >> Thank, >> >> Jun >> >> >> On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <npur...@sprinklr.com> wrote: >> >>> Hi Jun, >>> >>> Thanks for helping out so far. >>> >>> As per your explanation we are doing exactly as you have mentioned in your >>> workaround below. >>>> A workaround is to use different consumer connectors, each consuming a >>>> single topic. >>> >>> >>> Here is the problem... >>> >>> We have a topic which gets a lot of events (around a million in a day), so >>> this topic on the server has a high number of partitions, and we have >>> dedicated consumers only listening to this topic and the processing time is >>> in the order of 15-30 millis. So we are assured that our consumers are not >>> slow in processing. >>> >>> Every now then, it so happens, that our consumers threads stalls and do >>> not receive any events (as suggested in my previous email with the thread >>> stack on idle threads) even though we can see the offset lag increasing for >>> the consumers. >>> >>> We also noticed that if we force rebalance the consumers (either by >>> starting a new consumer or killing an existing one) data starts to flow in >>> again to these consumer threads. The consumers remains stable (processing >>> events) for about 20-30 mins before the threads go idle again and the >>> backlog starts growing. This happens in a cycle for us and we are not able >>> to figure out the cause for events not flowing in. >>> >>> As a side note, we are also monitoring the GC cycles and there are hardly >>> any. >>> >>> Please let us know if you need any additional details. >>> >>> Thanks >>> Nihit. >>> >>> >>> On 10-Jul-2013, at 8:30 PM, Jun Rao <jun...@gmail.com> wrote: >>> >>>> Ok. One of the issues is that when you have a consumer that consumes >>>> multiple topics, if one of the consumer threads is slow in consuming >>>> messages from one topic, it can block the consumption of other consumer >>>> threads. This is because we use a shared fetcher to fetch all topics. >>> There >>>> is an in-memory queue per topic. If one of the queues is full, the >>> fetcher >>>> will block and can't put the data into other queues. >>>> >>>> A workaround is to use different consumer connectors, each consuming a >>>> single topic. >>>> >>>> Thanks, >>>> >>>> Jun >>>> >>>> >>>> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <npur...@sprinklr.com> >>> wrote: >>>> >>>>> Hi Jun, >>>>> >>>>> Please see my comments inline again :) >>>>> >>>>> On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote: >>>>> >>>>>> This indicates our in-memory queue is empty. So the consumer thread is >>>>>> blocked. >>>>> >>>>> What should we do about this. >>>>> As I mentioned in the previous mail, events are there to be consumed. >>>>> Killing one consumer makes the other consumer consume events again. >>>>> >>>>> >>>>>> What about the Kafka fetcher threads? Are they blocked on anything? >>>>> >>>>> One of the fetcher threads is blocked on putting to a queue, the other >>> is >>>>> sleeping. >>>>> Please look below: >>>>> >>>>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on >>>>> condition [0x00007fcb833eb000] >>>>> java.lang.Thread.State: WAITING (parking) >>>>> at sun.misc.Unsafe.park(Native Method) >>>>> - parking to wait for <0x00000006809e8000> (a >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>>> at >>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) >>>>> at >>>>> >>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) >>>>> at >>>>> >>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) >>>>> at >>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61) >>>>> at >>>>> >>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79) >>>>> at >>>>> >>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65) >>>>> at >>>>> >>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) >>>>> at scala.collection.immutable.List.foreach(List.scala:45) >>>>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) >>>>> >>>>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on >>>>> condition [0x00007fcb836ee000] >>>>> java.lang.Thread.State: TIMED_WAITING (sleeping) >>>>> at java.lang.Thread.sleep(Native Method) >>>>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99) >>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jun >>>>>> >>>>>> >>>>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com> >>>>> wrote: >>>>>> >>>>>>> Hello Jun, >>>>>>> >>>>>>> Please see my comments inline. >>>>>>> >>>>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote: >>>>>>> >>>>>>>> I assume that each consumer instance consumes all 15 topics. >>>>>>> No, we kept dedicated consumer listening to the topic in question. >>>>>>> We did this because this queue processes huge amounts of data. >>>>>>> >>>>>>> >>>>>>>> Are all your >>>>>>>> consumer threads alive? If one of your thread dies, it will >>> eventually >>>>>>>> block the consumption in other threads. >>>>>>> >>>>>>> Yes. We can see all the threads in the thread dump. >>>>>>> We have ensured that the threads do not die due to an Exception. >>>>>>> >>>>>>> Please look at the stack trace below. We see all the threads waiting >>>>> like >>>>>>> this: >>>>>>> >>>>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting >>> on >>>>>>> condition [0x00007efedae6d000] >>>>>>> java.lang.Thread.State: WAITING (parking) >>>>>>> at sun.misc.Unsafe.park(Native Method) >>>>>>> - parking to wait for <0x0000000640248618> (a >>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>>>>> at >>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) >>>>>>> at >>>>>>> >>>>> >>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) >>>>>>> at >>>>>>> >>>>> >>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) >>>>>>> at >>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60) >>>>>>> at >>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32) >>>>>>> at >>>>>>> >>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>>>>>> at >>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>>>>>> at >>>>>>> >>>>> >>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49) >>>>>>> at >>>>>>> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>>>>> at >>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >>>>>>> at >>>>>>> >>>>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>>>>> at >>>>>>> >>>>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>>>>> at java.lang.Thread.run(Thread.java:662) >>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jun >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <npur...@sprinklr.com> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5) >>>>>>>>> >>>>>>>>> Our cluster configuration: >>>>>>>>> 3 brokers on 3 different machines. Each broker machine has a >>> zookeeper >>>>>>>>> instance running as well. >>>>>>>>> We have 15 topics defined. We are trying to use them as queue (JMS >>>>> like) >>>>>>>>> by defining the same group across different kafka consumers. >>>>>>>>> On the consumer side, we are using High Level Consumer. >>>>>>>>> >>>>>>>>> However we are seeing a weird behaviour. >>>>>>>>> One of our heavily used queue (event_queue) has 2 dedicated >>> consumers >>>>>>>>> listening to that queue only. >>>>>>>>> This queue is defined with 150 partitions on each broker & the >>> number >>>>> of >>>>>>>>> streams defined on the 2 dedicated consumers is 150. >>>>>>>>> After a while we see that most the consumer threads keep waiting for >>>>>>>>> events and the lag keeps growing. >>>>>>>>> If we kill one of the dedicated consumers, then the other consumer >>>>>>> starts >>>>>>>>> getting messaging in a hurry. >>>>>>>>> >>>>>>>>> Consumer had no Full GCs. >>>>>>>>> >>>>>>>>> How we measure lag? >>>>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group >>>>>>>>> event_queue --zkconnect >>>>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic >>>>>>> event_queue >>>>>>>>> >>>>>>>>> Around the time, the events stopped coming to the new consumer.. >>> this >>>>>>> was >>>>>>>>> printed on the logs: >>>>>>>>> >>>>>>>>> [INFO] zookeeper state changed (Disconnected) >>>>>>>>> [INFO] zookeeper state changed (Disconnected) >>>>>>>>> [INFO] zookeeper state changed (SyncConnected) >>>>>>>>> [INFO] zookeeper state changed (SyncConnected) >>>>>>>>> >>>>>>>>> Config Overidden: >>>>>>>>> Consumer: >>>>>>>>> fetch.size=3MB >>>>>>>>> autooffset.reset=largest >>>>>>>>> autocommit.interval.ms=500 >>>>>>>>> Producer: >>>>>>>>> maxMessageSize=3MB >>>>>>>>> >>>>>>>>> Please let us know if we are doing some wrong OR facing some known >>>>> issue >>>>>>>>> here? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Nihit >>>>>>> >>>>>>> >>>>> >>>>> >>> >>> >