While debugging, the code got stuck on this call. tried the same by placing logs after each call, noticed that whenever consumer got stuck, only the logs before createMessageStreams were printed.
On Fri, Dec 6, 2013 at 9:40 PM, Jun Rao <jun...@gmail.com> wrote: > I don't see createMessageStreams in the thread dump though. Are you sure > it's stuck there? > > Thanks, > > Jun > > > On Fri, Dec 6, 2013 at 1:58 AM, Tarang Dawer <tarang.da...@gmail.com> > wrote: > > > Hi Jun > > please find the consumer thread dump in my previous reply. > > > > > > On Fri, Dec 6, 2013 at 3:27 PM, Tarang Dawer <tarang.da...@gmail.com> > > wrote: > > > > > Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.8-b03 mixed > mode): > > > > > > "DestroyJavaVM" prio=10 tid=0x00007feb18006800 nid=0xcc2 waiting on > > > condition [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > > > > "consumerGroup675437781_impetus-d898-1386323237902-426eefe2_watcher_executor" > > > prio=10 tid=0x00007feb18059000 nid=0xcd9 waiting on condition > > > [0x00007feb143cd000] > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000000ffbe80f8> (a > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > at > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > at > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2116) > > > at > > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:319) > > > > > > "main-EventThread" daemon prio=10 tid=0x00007feb18321800 nid=0xcd8 > > waiting > > > on condition [0x00007feb144ce000] > > > java.lang.Thread.State: WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000000ffbf00a0> (a > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > > > at > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > > > at > > > > > > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) > > > at > > > > > > org.apache.zookeeper.ClientCnxn$EventThread.run(zookeeper:ClientCnxn.java):414) > > > > > > "main-SendThread" daemon prio=10 tid=0x00007feb18145800 nid=0xcd7 > > runnable > > > [0x00007feb145cf000] > > > java.lang.Thread.State: RUNNABLE > > > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210) > > > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65) > > > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) > > > - locked <0x00000000ffbf8118> (a sun.nio.ch.Util$2) > > > - locked <0x00000000ffbf8128> (a > > java.util.Collections$UnmodifiableSet) > > > - locked <0x00000000ffbf80d0> (a sun.nio.ch.EPollSelectorImpl) > > > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) > > > at > > > > > > org.apache.zookeeper.ClientCnxn$SendThread.run(zookeeper:ClientCnxn.java):921) > > > > > > "ZkClient-EventThread-15-192.168.145.144:2181" daemon prio=10 > > > tid=0x00007feb181d6800 nid=0xcd6 waiting on condition > > [0x00007feb147d8000] > > > java.lang.Thread.State: WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000000ffbe81c0> (a > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > > > at > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > > > at > > > > > > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) > > > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:68) > > > > > > "ProducerSendThread-" prio=10 tid=0x00007feb182d4000 nid=0xcd5 waiting > on > > > condition [0x00007feb148d9000] > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000000ffbe82f0> (a > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > at > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > at > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > > > at > > > > > > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424) > > > at > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65) > > > at > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65) > > > at scala.collection.immutable.Stream$.continually(Stream.scala:598) > > > at > > > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65) > > > at > > > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > > > > > "metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007feb1811b000 > > > nid=0xcd4 waiting on condition [0x00007feb149da000] > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000000ffbf81a8> (a > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > at > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > at > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > > > at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) > > > at > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) > > > at > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) > > > at > > > > > > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) > > > at > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) > > > at java.lang.Thread.run(Thread.java:662) > > > > > > "metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007feb181a8000 > > > nid=0xcd3 waiting on condition [0x00007feb14adb000] > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000000ffbf81a8> (a > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > at > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > at > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > > > at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) > > > at > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) > > > at > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) > > > at > > > > > > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) > > > at > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) > > > at java.lang.Thread.run(Thread.java:662) > > > > > > "Abandoned connection cleanup thread" daemon prio=10 > > > tid=0x00007feb18151000 nid=0xcd2 in Object.wait() [0x00007feb14ce3000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x00000000ffbf8330> (a > > java.lang.ref.ReferenceQueue$Lock) > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) > > > - locked <0x00000000ffbf8330> (a java.lang.ref.ReferenceQueue$Lock) > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) > > > at > > > com.mysql.jdbc.NonRegisteringDriver$1.run(NonRegisteringDriver.java:93) > > > > > > "Low Memory Detector" daemon prio=10 tid=0x00007feb18090800 nid=0xccd > > > runnable [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "C2 CompilerThread1" daemon prio=10 tid=0x00007feb1808e000 nid=0xccc > > > waiting on condition [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "C2 CompilerThread0" daemon prio=10 tid=0x00007feb1808b800 nid=0xccb > > > waiting on condition [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "Signal Dispatcher" daemon prio=10 tid=0x00007feb18089000 nid=0xcca > > > waiting on condition [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "Finalizer" daemon prio=10 tid=0x00007feb1806d000 nid=0xcc9 in > > > Object.wait() [0x00007feb1d407000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x00000000ffbe8910> (a > > java.lang.ref.ReferenceQueue$Lock) > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) > > > - locked <0x00000000ffbe8910> (a java.lang.ref.ReferenceQueue$Lock) > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) > > > at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) > > > > > > "Reference Handler" daemon prio=10 tid=0x00007feb1806b000 nid=0xcc8 in > > > Object.wait() [0x00007feb1d508000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x00000000ffbe8aa0> (a java.lang.ref.Reference$Lock) > > > at java.lang.Object.wait(Object.java:485) > > > at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) > > > - locked <0x00000000ffbe8aa0> (a java.lang.ref.Reference$Lock) > > > > > > "VM Thread" prio=10 tid=0x00007feb18064800 nid=0xcc7 runnable > > > > > > "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007feb18019800 > nid=0xcc3 > > > runnable > > > > > > "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007feb1801b800 > nid=0xcc4 > > > runnable > > > > > > "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007feb1801d800 > nid=0xcc5 > > > runnable > > > > > > "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007feb1801f000 > nid=0xcc6 > > > runnable > > > > > > "VM Periodic Task Thread" prio=10 tid=0x00007feb180a3000 nid=0xcce > > waiting > > > on condition > > > > > > JNI global references: 1310 > > > > > > Heap > > > PSYoungGen total 29888K, used 26071K [0x00000000fdeb0000, > > > 0x0000000100000000, 0x0000000100000000) > > > eden space 25664K, 92% used > > > [0x00000000fdeb0000,0x00000000ff5f5f60,0x00000000ff7c0000) > > > from space 4224K, 53% used > > > [0x00000000ffbe0000,0x00000000ffe10050,0x0000000100000000) > > > to space 4224K, 0% used > > > [0x00000000ff7c0000,0x00000000ff7c0000,0x00000000ffbe0000) > > > PSOldGen total 68288K, used 0K [0x00000000f9c00000, > > > 0x00000000fdeb0000, 0x00000000fdeb0000) > > > object space 68288K, 0% used > > > [0x00000000f9c00000,0x00000000f9c00000,0x00000000fdeb0000) > > > PSPermGen total 29888K, used 29878K [0x00000000f4a00000, > > > 0x00000000f6730000, 0x00000000f9c00000) > > > object space 29888K, 99% used > > > [0x00000000f4a00000,0x00000000f672dad8,0x00000000f6730000) > > > > > > > > > > > > On Wed, Dec 4, 2013 at 11:11 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > >> Could you take a thread dump and see where the createMessageStreams is > > >> stuck? > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> > > >> On Wed, Dec 4, 2013 at 4:10 AM, Tarang Dawer <tarang.da...@gmail.com> > > >> wrote: > > >> > > >> > Hi All > > >> > > > >> > I am using Kafka 0.8 , with inbuilt zookeeper. > > >> > I am runnings consumers in a jar. > > >> > > > >> > > > >> > > > >> > Configuration properties for creating consumerConfig : - > > >> > > > >> > zookeeper.connect=IP > > >> > group.id=consumerGroup > > >> > fetch.message.max.bytes=1000000000 > > >> > zookeeper.session.timeout.ms=60000 > > >> > auto.offset.reset=smallest > > >> > zookeeper.sync.time.ms=200 > > >> > auto.commit.enable=false > > >> > > > >> > While running it, some times the consumer gets stuck on the message > > >> stream > > >> > creation call i.e > > >> > > > >> > > > >> > ConsumerConfig consumerConfig = new ConsumerConfig(this.props); > > >> > > > >> > ConsumerConnector consumerConnector = Consumer > > >> > .createJavaConsumerConnector(consumerConfig); > > >> > > > >> > > > >> > *Map<String, List<KafkaStream<byte[], byte[]>>> messageStreamMap = > > >> > consumerConnector > .createMessageStreams(topicMap)*; > > >> > > > >> > I have tested this thing on a number of partition settings , i.e > 1,2,5 > > >> etc. > > >> > > > >> > Whenever the consumer gets stuck, i have to kill the jar, and start > it > > >> > again. This thing happens only a select couple of times , i.e 2 or 3 > > >> times > > >> > out of 10, and for rest of the times, the consumer starts fine. > > >> > > > >> > Is there some property configuration that i may have been missing, > > >> that's > > >> > causing this inconsistent behaviour ? > > >> > > > >> > > > >> > Any guidance would be of great help. > > >> > > > >> > Thanks > > >> > Tarang Dawer > > >> > > > >> > > > > > > > > >