For those partitions that are lagging, do you see fetch requests in the log?
Thanks, Jun On Fri, Jul 19, 2013 at 12:30 AM, Nihit Purwar <npur...@sprinklr.com> wrote: > Hello Jun, > > Sorry for the delay in getting the logs. > Here are the 3 logs from the 3 servers with trace level as suggested: > > > https://docs.google.com/file/d/0B5etsywBa-bkQnBESUJzNV9yRWc/edit?usp=sharing > > Please have a look and let us know if you need anything else to further > debug this problem. > > Thanks, > Nihit > > On 11-Jul-2013, at 4:41 PM, Nihit Purwar <npur...@sprinklr.com> wrote: > > > Hi Jun, > > > > I did put in only one topic while starting the consumer and have used > the same API "createMessageStreams". > > As for the trace level logs of kafka consumer, we will send that to you > soon. > > > > Thanks again for replying. > > > > Nihit > > > > On 10-Jul-2013, at 10:38 PM, Jun Rao <jun...@gmail.com> wrote: > > > >> Also, just so that we are on the same page. I assume that you used the > >> following api. Did you just put in one topic in the topicCountMap? > >> def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, > >> List[KafkaStream[Array[Byte],Array[Byte]]]] > >> > >> Thank, > >> > >> Jun > >> > >> > >> On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <npur...@sprinklr.com> > wrote: > >> > >>> Hi Jun, > >>> > >>> Thanks for helping out so far. > >>> > >>> As per your explanation we are doing exactly as you have mentioned in > your > >>> workaround below. > >>>> A workaround is to use different consumer connectors, each consuming a > >>>> single topic. > >>> > >>> > >>> Here is the problem... > >>> > >>> We have a topic which gets a lot of events (around a million in a > day), so > >>> this topic on the server has a high number of partitions, and we have > >>> dedicated consumers only listening to this topic and the processing > time is > >>> in the order of 15-30 millis. So we are assured that our consumers are > not > >>> slow in processing. > >>> > >>> Every now then, it so happens, that our consumers threads stalls and do > >>> not receive any events (as suggested in my previous email with the > thread > >>> stack on idle threads) even though we can see the offset lag > increasing for > >>> the consumers. > >>> > >>> We also noticed that if we force rebalance the consumers (either by > >>> starting a new consumer or killing an existing one) data starts to > flow in > >>> again to these consumer threads. The consumers remains stable > (processing > >>> events) for about 20-30 mins before the threads go idle again and the > >>> backlog starts growing. This happens in a cycle for us and we are not > able > >>> to figure out the cause for events not flowing in. > >>> > >>> As a side note, we are also monitoring the GC cycles and there are > hardly > >>> any. > >>> > >>> Please let us know if you need any additional details. > >>> > >>> Thanks > >>> Nihit. > >>> > >>> > >>> On 10-Jul-2013, at 8:30 PM, Jun Rao <jun...@gmail.com> wrote: > >>> > >>>> Ok. One of the issues is that when you have a consumer that consumes > >>>> multiple topics, if one of the consumer threads is slow in consuming > >>>> messages from one topic, it can block the consumption of other > consumer > >>>> threads. This is because we use a shared fetcher to fetch all topics. > >>> There > >>>> is an in-memory queue per topic. If one of the queues is full, the > >>> fetcher > >>>> will block and can't put the data into other queues. > >>>> > >>>> A workaround is to use different consumer connectors, each consuming a > >>>> single topic. > >>>> > >>>> Thanks, > >>>> > >>>> Jun > >>>> > >>>> > >>>> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <npur...@sprinklr.com> > >>> wrote: > >>>> > >>>>> Hi Jun, > >>>>> > >>>>> Please see my comments inline again :) > >>>>> > >>>>> On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote: > >>>>> > >>>>>> This indicates our in-memory queue is empty. So the consumer thread > is > >>>>>> blocked. > >>>>> > >>>>> What should we do about this. > >>>>> As I mentioned in the previous mail, events are there to be consumed. > >>>>> Killing one consumer makes the other consumer consume events again. > >>>>> > >>>>> > >>>>>> What about the Kafka fetcher threads? Are they blocked on anything? > >>>>> > >>>>> One of the fetcher threads is blocked on putting to a queue, the > other > >>> is > >>>>> sleeping. > >>>>> Please look below: > >>>>> > >>>>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting > on > >>>>> condition [0x00007fcb833eb000] > >>>>> java.lang.Thread.State: WAITING (parking) > >>>>> at sun.misc.Unsafe.park(Native Method) > >>>>> - parking to wait for <0x00000006809e8000> (a > >>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >>>>> at > >>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > >>>>> at > >>>>> > >>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > >>>>> at > >>>>> > >>> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > >>>>> at > >>>>> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61) > >>>>> at > >>>>> > >>> > kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79) > >>>>> at > >>>>> > >>> > kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65) > >>>>> at > >>>>> > >>> > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > >>>>> at scala.collection.immutable.List.foreach(List.scala:45) > >>>>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) > >>>>> > >>>>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting > on > >>>>> condition [0x00007fcb836ee000] > >>>>> java.lang.Thread.State: TIMED_WAITING (sleeping) > >>>>> at java.lang.Thread.sleep(Native Method) > >>>>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99) > >>>>> > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Jun > >>>>>> > >>>>>> > >>>>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com> > >>>>> wrote: > >>>>>> > >>>>>>> Hello Jun, > >>>>>>> > >>>>>>> Please see my comments inline. > >>>>>>> > >>>>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote: > >>>>>>> > >>>>>>>> I assume that each consumer instance consumes all 15 topics. > >>>>>>> No, we kept dedicated consumer listening to the topic in question. > >>>>>>> We did this because this queue processes huge amounts of data. > >>>>>>> > >>>>>>> > >>>>>>>> Are all your > >>>>>>>> consumer threads alive? If one of your thread dies, it will > >>> eventually > >>>>>>>> block the consumption in other threads. > >>>>>>> > >>>>>>> Yes. We can see all the threads in the thread dump. > >>>>>>> We have ensured that the threads do not die due to an Exception. > >>>>>>> > >>>>>>> Please look at the stack trace below. We see all the threads > waiting > >>>>> like > >>>>>>> this: > >>>>>>> > >>>>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 > waiting > >>> on > >>>>>>> condition [0x00007efedae6d000] > >>>>>>> java.lang.Thread.State: WAITING (parking) > >>>>>>> at sun.misc.Unsafe.park(Native Method) > >>>>>>> - parking to wait for <0x0000000640248618> (a > >>>>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >>>>>>> at > >>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > >>>>>>> at > >>>>>>> > >>>>> > >>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > >>>>>>> at > >>>>>>> > >>>>> > >>> > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) > >>>>>>> at > >>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60) > >>>>>>> at > >>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32) > >>>>>>> at > >>>>>>> > >>> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > >>>>>>> at > >>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > >>>>>>> at > >>>>>>> > >>>>> > >>> > com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49) > >>>>>>> at > >>>>>>> > >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > >>>>>>> at > >>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>>>>>> at > >>>>>>> > >>>>> > >>> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > >>>>>>> at > >>>>>>> > >>>>> > >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > >>>>>>> at java.lang.Thread.run(Thread.java:662) > >>>>>>> > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> > >>>>>>>> Jun > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar < > npur...@sprinklr.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5) > >>>>>>>>> > >>>>>>>>> Our cluster configuration: > >>>>>>>>> 3 brokers on 3 different machines. Each broker machine has a > >>> zookeeper > >>>>>>>>> instance running as well. > >>>>>>>>> We have 15 topics defined. We are trying to use them as queue > (JMS > >>>>> like) > >>>>>>>>> by defining the same group across different kafka consumers. > >>>>>>>>> On the consumer side, we are using High Level Consumer. > >>>>>>>>> > >>>>>>>>> However we are seeing a weird behaviour. > >>>>>>>>> One of our heavily used queue (event_queue) has 2 dedicated > >>> consumers > >>>>>>>>> listening to that queue only. > >>>>>>>>> This queue is defined with 150 partitions on each broker & the > >>> number > >>>>> of > >>>>>>>>> streams defined on the 2 dedicated consumers is 150. > >>>>>>>>> After a while we see that most the consumer threads keep waiting > for > >>>>>>>>> events and the lag keeps growing. > >>>>>>>>> If we kill one of the dedicated consumers, then the other > consumer > >>>>>>> starts > >>>>>>>>> getting messaging in a hurry. > >>>>>>>>> > >>>>>>>>> Consumer had no Full GCs. > >>>>>>>>> > >>>>>>>>> How we measure lag? > >>>>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group > >>>>>>>>> event_queue --zkconnect > >>>>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic > >>>>>>> event_queue > >>>>>>>>> > >>>>>>>>> Around the time, the events stopped coming to the new consumer.. > >>> this > >>>>>>> was > >>>>>>>>> printed on the logs: > >>>>>>>>> > >>>>>>>>> [INFO] zookeeper state changed (Disconnected) > >>>>>>>>> [INFO] zookeeper state changed (Disconnected) > >>>>>>>>> [INFO] zookeeper state changed (SyncConnected) > >>>>>>>>> [INFO] zookeeper state changed (SyncConnected) > >>>>>>>>> > >>>>>>>>> Config Overidden: > >>>>>>>>> Consumer: > >>>>>>>>> fetch.size=3MB > >>>>>>>>> autooffset.reset=largest > >>>>>>>>> autocommit.interval.ms=500 > >>>>>>>>> Producer: > >>>>>>>>> maxMessageSize=3MB > >>>>>>>>> > >>>>>>>>> Please let us know if we are doing some wrong OR facing some > known > >>>>> issue > >>>>>>>>> here? > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Nihit > >>>>>>> > >>>>>>> > >>>>> > >>>>> > >>> > >>> > > > >