Hello Jun,

Sorry for the delay in getting the logs.
Here are the 3 logs from the 3 servers with trace level as suggested:

https://docs.google.com/file/d/0B5etsywBa-bkQnBESUJzNV9yRWc/edit?usp=sharing

Please have a look and let us know if you need anything else to further debug 
this problem.

Thanks,
Nihit

On 11-Jul-2013, at 4:41 PM, Nihit Purwar <npur...@sprinklr.com> wrote:

> Hi Jun,
> 
> I did put in only one topic while starting the consumer and have used the 
> same API "createMessageStreams".
> As for the trace level logs of kafka consumer, we will send that to you soon.
> 
> Thanks again for replying.
> 
> Nihit
> 
> On 10-Jul-2013, at 10:38 PM, Jun Rao <jun...@gmail.com> wrote:
> 
>> Also, just so that we are on the same page. I assume that you used the
>> following api. Did you just put in one topic in the topicCountMap?
>> def createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
>> List[KafkaStream[Array[Byte],Array[Byte]]]]
>> 
>> Thank,
>> 
>> Jun
>> 
>> 
>> On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <npur...@sprinklr.com> wrote:
>> 
>>> Hi Jun,
>>> 
>>> Thanks for helping out so far.
>>> 
>>> As per your explanation we are doing exactly as you have mentioned in your
>>> workaround below.
>>>> A workaround is to use different consumer connectors, each consuming a
>>>> single topic.
>>> 
>>> 
>>> Here is the problem...
>>> 
>>> We have a topic which gets a lot of events (around a million in a day), so
>>> this topic on the server has a high number of partitions, and we have
>>> dedicated consumers only listening to this topic and the processing time is
>>> in the order of 15-30 millis. So we are assured that our consumers are not
>>> slow in processing.
>>> 
>>> Every now then, it so happens, that our consumers threads stalls and do
>>> not receive any events (as suggested in my previous email with the thread
>>> stack on idle threads) even though we can see the offset lag increasing for
>>> the consumers.
>>> 
>>> We also noticed that if we force rebalance the consumers (either by
>>> starting a new consumer or killing an existing one) data starts to flow in
>>> again to these consumer threads. The consumers remains stable (processing
>>> events) for about 20-30 mins before the threads go idle again and the
>>> backlog starts growing. This happens in a cycle for us and we are not able
>>> to figure out the cause for events not flowing in.
>>> 
>>> As a side note, we are also monitoring the GC cycles and there are hardly
>>> any.
>>> 
>>> Please let us know if you need any additional details.
>>> 
>>> Thanks
>>> Nihit.
>>> 
>>> 
>>> On 10-Jul-2013, at 8:30 PM, Jun Rao <jun...@gmail.com> wrote:
>>> 
>>>> Ok. One of the issues is that when you have a consumer that consumes
>>>> multiple topics, if one of the consumer threads is slow in consuming
>>>> messages from one topic, it can block the consumption of other consumer
>>>> threads. This is because we use a shared fetcher to fetch all topics.
>>> There
>>>> is an in-memory queue per topic. If one of the queues is full, the
>>> fetcher
>>>> will block and can't put the data into other queues.
>>>> 
>>>> A workaround is to use different consumer connectors, each consuming a
>>>> single topic.
>>>> 
>>>> Thanks,
>>>> 
>>>> Jun
>>>> 
>>>> 
>>>> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <npur...@sprinklr.com>
>>> wrote:
>>>> 
>>>>> Hi Jun,
>>>>> 
>>>>> Please see my comments inline again :)
>>>>> 
>>>>> On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote:
>>>>> 
>>>>>> This indicates our in-memory queue is empty. So the consumer thread is
>>>>>> blocked.
>>>>> 
>>>>> What should we do about this.
>>>>> As I mentioned in the previous mail, events are there to be consumed.
>>>>> Killing one consumer makes the other consumer consume events again.
>>>>> 
>>>>> 
>>>>>> What about the Kafka fetcher threads? Are they blocked on anything?
>>>>> 
>>>>> One of the fetcher threads is blocked on putting to a queue, the other
>>> is
>>>>> sleeping.
>>>>> Please look below:
>>>>> 
>>>>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
>>>>> condition [0x00007fcb833eb000]
>>>>> java.lang.Thread.State: WAITING (parking)
>>>>>      at sun.misc.Unsafe.park(Native Method)
>>>>>      - parking to wait for  <0x00000006809e8000> (a
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>      at
>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>>      at
>>>>> 
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>>      at
>>>>> 
>>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>>>>>      at
>>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
>>>>>      at
>>>>> 
>>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
>>>>>      at
>>>>> 
>>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
>>>>>      at
>>>>> 
>>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>>>>>      at scala.collection.immutable.List.foreach(List.scala:45)
>>>>>      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>>>>> 
>>>>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
>>>>> condition [0x00007fcb836ee000]
>>>>> java.lang.Thread.State: TIMED_WAITING (sleeping)
>>>>>      at java.lang.Thread.sleep(Native Method)
>>>>>      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
>>>>> 
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Jun
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com>
>>>>> wrote:
>>>>>> 
>>>>>>> Hello Jun,
>>>>>>> 
>>>>>>> Please see my comments inline.
>>>>>>> 
>>>>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> I assume that each consumer instance consumes all 15 topics.
>>>>>>> No, we kept dedicated consumer listening to the topic in question.
>>>>>>> We did this because this queue processes huge amounts of data.
>>>>>>> 
>>>>>>> 
>>>>>>>> Are all your
>>>>>>>> consumer threads alive? If one of your thread dies, it will
>>> eventually
>>>>>>>> block the consumption in other threads.
>>>>>>> 
>>>>>>> Yes. We can see all the threads in the thread dump.
>>>>>>> We have ensured that the threads do not die due to an Exception.
>>>>>>> 
>>>>>>> Please look at the stack trace below. We see all the threads waiting
>>>>> like
>>>>>>> this:
>>>>>>> 
>>>>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting
>>> on
>>>>>>> condition [0x00007efedae6d000]
>>>>>>> java.lang.Thread.State: WAITING (parking)
>>>>>>>     at sun.misc.Unsafe.park(Native Method)
>>>>>>>     - parking to wait for  <0x0000000640248618> (a
>>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>>>     at
>>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>>>>>     at
>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>>>>>>>     at
>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>>>>>>>     at
>>>>>>> 
>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>>>>>     at
>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>>>>>>>     at
>>>>>>> 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>>>>     at
>>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>>>     at java.lang.Thread.run(Thread.java:662)
>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Jun
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <npur...@sprinklr.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
>>>>>>>>> 
>>>>>>>>> Our cluster configuration:
>>>>>>>>> 3 brokers on 3 different machines. Each broker machine has a
>>> zookeeper
>>>>>>>>> instance running as well.
>>>>>>>>> We have 15 topics defined. We are trying to use them as queue (JMS
>>>>> like)
>>>>>>>>> by defining the same group across different kafka consumers.
>>>>>>>>> On the consumer side, we are using High Level Consumer.
>>>>>>>>> 
>>>>>>>>> However we are seeing a weird behaviour.
>>>>>>>>> One of our heavily used queue (event_queue) has 2 dedicated
>>> consumers
>>>>>>>>> listening to that queue only.
>>>>>>>>> This queue is defined with 150 partitions on each broker & the
>>> number
>>>>> of
>>>>>>>>> streams defined on the 2 dedicated consumers is 150.
>>>>>>>>> After a while we see that most the consumer threads keep waiting for
>>>>>>>>> events and the lag keeps growing.
>>>>>>>>> If we kill one of the dedicated consumers, then the other consumer
>>>>>>> starts
>>>>>>>>> getting messaging in a hurry.
>>>>>>>>> 
>>>>>>>>> Consumer had no Full GCs.
>>>>>>>>> 
>>>>>>>>> How we measure lag?
>>>>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>>>>>>>>> event_queue --zkconnect
>>>>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
>>>>>>> event_queue
>>>>>>>>> 
>>>>>>>>> Around the time, the events stopped coming to the new consumer..
>>> this
>>>>>>> was
>>>>>>>>> printed on the logs:
>>>>>>>>> 
>>>>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>>>>> 
>>>>>>>>> Config Overidden:
>>>>>>>>> Consumer:
>>>>>>>>> fetch.size=3MB
>>>>>>>>> autooffset.reset=largest
>>>>>>>>> autocommit.interval.ms=500
>>>>>>>>> Producer:
>>>>>>>>> maxMessageSize=3MB
>>>>>>>>> 
>>>>>>>>> Please let us know if we are doing some wrong OR facing some known
>>>>> issue
>>>>>>>>> here?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Nihit
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 

Reply via email to