For those partitions that are lagging, do you see fetch requests in the log?

Thanks,

Jun


On Fri, Jul 19, 2013 at 12:30 AM, Nihit Purwar <npur...@sprinklr.com> wrote:

> Hello Jun,
>
> Sorry for the delay in getting the logs.
> Here are the 3 logs from the 3 servers with trace level as suggested:
>
>
> https://docs.google.com/file/d/0B5etsywBa-bkQnBESUJzNV9yRWc/edit?usp=sharing
>
> Please have a look and let us know if you need anything else to further
> debug this problem.
>
> Thanks,
> Nihit
>
> On 11-Jul-2013, at 4:41 PM, Nihit Purwar <npur...@sprinklr.com> wrote:
>
> > Hi Jun,
> >
> > I did put in only one topic while starting the consumer and have used
> the same API "createMessageStreams".
> > As for the trace level logs of kafka consumer, we will send that to you
> soon.
> >
> > Thanks again for replying.
> >
> > Nihit
> >
> > On 10-Jul-2013, at 10:38 PM, Jun Rao <jun...@gmail.com> wrote:
> >
> >> Also, just so that we are on the same page. I assume that you used the
> >> following api. Did you just put in one topic in the topicCountMap?
> >> def createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
> >> List[KafkaStream[Array[Byte],Array[Byte]]]]
> >>
> >> Thank,
> >>
> >> Jun
> >>
> >>
> >> On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <npur...@sprinklr.com>
> wrote:
> >>
> >>> Hi Jun,
> >>>
> >>> Thanks for helping out so far.
> >>>
> >>> As per your explanation we are doing exactly as you have mentioned in
> your
> >>> workaround below.
> >>>> A workaround is to use different consumer connectors, each consuming a
> >>>> single topic.
> >>>
> >>>
> >>> Here is the problem...
> >>>
> >>> We have a topic which gets a lot of events (around a million in a
> day), so
> >>> this topic on the server has a high number of partitions, and we have
> >>> dedicated consumers only listening to this topic and the processing
> time is
> >>> in the order of 15-30 millis. So we are assured that our consumers are
> not
> >>> slow in processing.
> >>>
> >>> Every now then, it so happens, that our consumers threads stalls and do
> >>> not receive any events (as suggested in my previous email with the
> thread
> >>> stack on idle threads) even though we can see the offset lag
> increasing for
> >>> the consumers.
> >>>
> >>> We also noticed that if we force rebalance the consumers (either by
> >>> starting a new consumer or killing an existing one) data starts to
> flow in
> >>> again to these consumer threads. The consumers remains stable
> (processing
> >>> events) for about 20-30 mins before the threads go idle again and the
> >>> backlog starts growing. This happens in a cycle for us and we are not
> able
> >>> to figure out the cause for events not flowing in.
> >>>
> >>> As a side note, we are also monitoring the GC cycles and there are
> hardly
> >>> any.
> >>>
> >>> Please let us know if you need any additional details.
> >>>
> >>> Thanks
> >>> Nihit.
> >>>
> >>>
> >>> On 10-Jul-2013, at 8:30 PM, Jun Rao <jun...@gmail.com> wrote:
> >>>
> >>>> Ok. One of the issues is that when you have a consumer that consumes
> >>>> multiple topics, if one of the consumer threads is slow in consuming
> >>>> messages from one topic, it can block the consumption of other
> consumer
> >>>> threads. This is because we use a shared fetcher to fetch all topics.
> >>> There
> >>>> is an in-memory queue per topic. If one of the queues is full, the
> >>> fetcher
> >>>> will block and can't put the data into other queues.
> >>>>
> >>>> A workaround is to use different consumer connectors, each consuming a
> >>>> single topic.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jun
> >>>>
> >>>>
> >>>> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <npur...@sprinklr.com>
> >>> wrote:
> >>>>
> >>>>> Hi Jun,
> >>>>>
> >>>>> Please see my comments inline again :)
> >>>>>
> >>>>> On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote:
> >>>>>
> >>>>>> This indicates our in-memory queue is empty. So the consumer thread
> is
> >>>>>> blocked.
> >>>>>
> >>>>> What should we do about this.
> >>>>> As I mentioned in the previous mail, events are there to be consumed.
> >>>>> Killing one consumer makes the other consumer consume events again.
> >>>>>
> >>>>>
> >>>>>> What about the Kafka fetcher threads? Are they blocked on anything?
> >>>>>
> >>>>> One of the fetcher threads is blocked on putting to a queue, the
> other
> >>> is
> >>>>> sleeping.
> >>>>> Please look below:
> >>>>>
> >>>>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting
> on
> >>>>> condition [0x00007fcb833eb000]
> >>>>> java.lang.Thread.State: WAITING (parking)
> >>>>>      at sun.misc.Unsafe.park(Native Method)
> >>>>>      - parking to wait for  <0x00000006809e8000> (a
> >>>>>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>>>>      at
> >>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>>>>      at
> >>>>>
> >>>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>>>>      at
> >>>>>
> >>>
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> >>>>>      at
> >>>>>
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
> >>>>>      at
> >>>>>
> >>>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
> >>>>>      at
> >>>>>
> >>>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
> >>>>>      at
> >>>>>
> >>>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> >>>>>      at scala.collection.immutable.List.foreach(List.scala:45)
> >>>>>      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >>>>>
> >>>>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting
> on
> >>>>> condition [0x00007fcb836ee000]
> >>>>> java.lang.Thread.State: TIMED_WAITING (sleeping)
> >>>>>      at java.lang.Thread.sleep(Native Method)
> >>>>>      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
> >>>>>
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Jun
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hello Jun,
> >>>>>>>
> >>>>>>> Please see my comments inline.
> >>>>>>>
> >>>>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote:
> >>>>>>>
> >>>>>>>> I assume that each consumer instance consumes all 15 topics.
> >>>>>>> No, we kept dedicated consumer listening to the topic in question.
> >>>>>>> We did this because this queue processes huge amounts of data.
> >>>>>>>
> >>>>>>>
> >>>>>>>> Are all your
> >>>>>>>> consumer threads alive? If one of your thread dies, it will
> >>> eventually
> >>>>>>>> block the consumption in other threads.
> >>>>>>>
> >>>>>>> Yes. We can see all the threads in the thread dump.
> >>>>>>> We have ensured that the threads do not die due to an Exception.
> >>>>>>>
> >>>>>>> Please look at the stack trace below. We see all the threads
> waiting
> >>>>> like
> >>>>>>> this:
> >>>>>>>
> >>>>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9
> waiting
> >>> on
> >>>>>>> condition [0x00007efedae6d000]
> >>>>>>> java.lang.Thread.State: WAITING (parking)
> >>>>>>>     at sun.misc.Unsafe.park(Native Method)
> >>>>>>>     - parking to wait for  <0x0000000640248618> (a
> >>>>>>>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>>>>>>     at
> >>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >>>>>>>     at
> >>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
> >>>>>>>     at
> >>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> >>>>>>>     at
> >>>>>>>
> >>>
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>>>>>>     at
> >>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
> >>>>>>>     at
> >>>>>>>
> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> >>>>>>>     at
> >>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>>>>>>     at java.lang.Thread.run(Thread.java:662)
> >>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> Jun
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <
> npur...@sprinklr.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
> >>>>>>>>>
> >>>>>>>>> Our cluster configuration:
> >>>>>>>>> 3 brokers on 3 different machines. Each broker machine has a
> >>> zookeeper
> >>>>>>>>> instance running as well.
> >>>>>>>>> We have 15 topics defined. We are trying to use them as queue
> (JMS
> >>>>> like)
> >>>>>>>>> by defining the same group across different kafka consumers.
> >>>>>>>>> On the consumer side, we are using High Level Consumer.
> >>>>>>>>>
> >>>>>>>>> However we are seeing a weird behaviour.
> >>>>>>>>> One of our heavily used queue (event_queue) has 2 dedicated
> >>> consumers
> >>>>>>>>> listening to that queue only.
> >>>>>>>>> This queue is defined with 150 partitions on each broker & the
> >>> number
> >>>>> of
> >>>>>>>>> streams defined on the 2 dedicated consumers is 150.
> >>>>>>>>> After a while we see that most the consumer threads keep waiting
> for
> >>>>>>>>> events and the lag keeps growing.
> >>>>>>>>> If we kill one of the dedicated consumers, then the other
> consumer
> >>>>>>> starts
> >>>>>>>>> getting messaging in a hurry.
> >>>>>>>>>
> >>>>>>>>> Consumer had no Full GCs.
> >>>>>>>>>
> >>>>>>>>> How we measure lag?
> >>>>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >>>>>>>>> event_queue --zkconnect
> >>>>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
> >>>>>>> event_queue
> >>>>>>>>>
> >>>>>>>>> Around the time, the events stopped coming to the new consumer..
> >>> this
> >>>>>>> was
> >>>>>>>>> printed on the logs:
> >>>>>>>>>
> >>>>>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>>>>>
> >>>>>>>>> Config Overidden:
> >>>>>>>>> Consumer:
> >>>>>>>>> fetch.size=3MB
> >>>>>>>>> autooffset.reset=largest
> >>>>>>>>> autocommit.interval.ms=500
> >>>>>>>>> Producer:
> >>>>>>>>> maxMessageSize=3MB
> >>>>>>>>>
> >>>>>>>>> Please let us know if we are doing some wrong OR facing some
> known
> >>>>> issue
> >>>>>>>>> here?
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Nihit
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >
>
>

Reply via email to