When debugging, be careful not to print the stream itself since it will block forever.
Thanks, Jun On Thu, Dec 12, 2013 at 12:41 PM, Tarang Dawer <tarang.da...@gmail.com>wrote: > While debugging, the code got stuck on this call. tried the same by placing > logs after each call, noticed that whenever consumer got stuck, only the > logs before createMessageStreams were printed. > > > > On Fri, Dec 6, 2013 at 9:40 PM, Jun Rao <jun...@gmail.com> wrote: > > > I don't see createMessageStreams in the thread dump though. Are you sure > > it's stuck there? > > > > Thanks, > > > > Jun > > > > > > On Fri, Dec 6, 2013 at 1:58 AM, Tarang Dawer <tarang.da...@gmail.com> > > wrote: > > > > > Hi Jun > > > please find the consumer thread dump in my previous reply. > > > > > > > > > On Fri, Dec 6, 2013 at 3:27 PM, Tarang Dawer <tarang.da...@gmail.com> > > > wrote: > > > > > > > Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.8-b03 mixed > > mode): > > > > > > > > "DestroyJavaVM" prio=10 tid=0x00007feb18006800 nid=0xcc2 waiting on > > > > condition [0x0000000000000000] > > > > java.lang.Thread.State: RUNNABLE > > > > > > > > > > > > > > "consumerGroup675437781_impetus-d898-1386323237902-426eefe2_watcher_executor" > > > > prio=10 tid=0x00007feb18059000 nid=0xcd9 waiting on condition > > > > [0x00007feb143cd000] > > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > > at sun.misc.Unsafe.park(Native Method) > > > > - parking to wait for <0x00000000ffbe80f8> (a > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > at > > > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > > at > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2116) > > > > at > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:319) > > > > > > > > "main-EventThread" daemon prio=10 tid=0x00007feb18321800 nid=0xcd8 > > > waiting > > > > on condition [0x00007feb144ce000] > > > > java.lang.Thread.State: WAITING (parking) > > > > at sun.misc.Unsafe.park(Native Method) > > > > - parking to wait for <0x00000000ffbf00a0> (a > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > at > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > > > > at > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > > > > at > > > > > > > > > > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) > > > > at > > > > > > > > > > org.apache.zookeeper.ClientCnxn$EventThread.run(zookeeper:ClientCnxn.java):414) > > > > > > > > "main-SendThread" daemon prio=10 tid=0x00007feb18145800 nid=0xcd7 > > > runnable > > > > [0x00007feb145cf000] > > > > java.lang.Thread.State: RUNNABLE > > > > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > > > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210) > > > > at > sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65) > > > > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) > > > > - locked <0x00000000ffbf8118> (a sun.nio.ch.Util$2) > > > > - locked <0x00000000ffbf8128> (a > > > java.util.Collections$UnmodifiableSet) > > > > - locked <0x00000000ffbf80d0> (a sun.nio.ch.EPollSelectorImpl) > > > > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) > > > > at > > > > > > > > > > org.apache.zookeeper.ClientCnxn$SendThread.run(zookeeper:ClientCnxn.java):921) > > > > > > > > "ZkClient-EventThread-15-192.168.145.144:2181" daemon prio=10 > > > > tid=0x00007feb181d6800 nid=0xcd6 waiting on condition > > > [0x00007feb147d8000] > > > > java.lang.Thread.State: WAITING (parking) > > > > at sun.misc.Unsafe.park(Native Method) > > > > - parking to wait for <0x00000000ffbe81c0> (a > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > at > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > > > > at > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > > > > at > > > > > > > > > > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) > > > > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:68) > > > > > > > > "ProducerSendThread-" prio=10 tid=0x00007feb182d4000 nid=0xcd5 > waiting > > on > > > > condition [0x00007feb148d9000] > > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > > at sun.misc.Unsafe.park(Native Method) > > > > - parking to wait for <0x00000000ffbe82f0> (a > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > at > > > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > > at > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > > > > at > > > > > > > > > > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424) > > > > at > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65) > > > > at > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65) > > > > at > scala.collection.immutable.Stream$.continually(Stream.scala:598) > > > > at > > > > > > > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65) > > > > at > > > > > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > > > > > > > "metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007feb1811b000 > > > > nid=0xcd4 waiting on condition [0x00007feb149da000] > > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > > at sun.misc.Unsafe.park(Native Method) > > > > - parking to wait for <0x00000000ffbf81a8> (a > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > at > > > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > > at > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > > > > at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) > > > > at > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) > > > > at > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) > > > > at > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) > > > > at > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) > > > > at java.lang.Thread.run(Thread.java:662) > > > > > > > > "metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007feb181a8000 > > > > nid=0xcd3 waiting on condition [0x00007feb14adb000] > > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > > at sun.misc.Unsafe.park(Native Method) > > > > - parking to wait for <0x00000000ffbf81a8> (a > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > at > > > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > > at > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > > > > at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) > > > > at > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) > > > > at > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) > > > > at > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) > > > > at > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) > > > > at java.lang.Thread.run(Thread.java:662) > > > > > > > > "Abandoned connection cleanup thread" daemon prio=10 > > > > tid=0x00007feb18151000 nid=0xcd2 in Object.wait() > [0x00007feb14ce3000] > > > > java.lang.Thread.State: WAITING (on object monitor) > > > > at java.lang.Object.wait(Native Method) > > > > - waiting on <0x00000000ffbf8330> (a > > > java.lang.ref.ReferenceQueue$Lock) > > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) > > > > - locked <0x00000000ffbf8330> (a > java.lang.ref.ReferenceQueue$Lock) > > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) > > > > at > > > > > com.mysql.jdbc.NonRegisteringDriver$1.run(NonRegisteringDriver.java:93) > > > > > > > > "Low Memory Detector" daemon prio=10 tid=0x00007feb18090800 nid=0xccd > > > > runnable [0x0000000000000000] > > > > java.lang.Thread.State: RUNNABLE > > > > > > > > "C2 CompilerThread1" daemon prio=10 tid=0x00007feb1808e000 nid=0xccc > > > > waiting on condition [0x0000000000000000] > > > > java.lang.Thread.State: RUNNABLE > > > > > > > > "C2 CompilerThread0" daemon prio=10 tid=0x00007feb1808b800 nid=0xccb > > > > waiting on condition [0x0000000000000000] > > > > java.lang.Thread.State: RUNNABLE > > > > > > > > "Signal Dispatcher" daemon prio=10 tid=0x00007feb18089000 nid=0xcca > > > > waiting on condition [0x0000000000000000] > > > > java.lang.Thread.State: RUNNABLE > > > > > > > > "Finalizer" daemon prio=10 tid=0x00007feb1806d000 nid=0xcc9 in > > > > Object.wait() [0x00007feb1d407000] > > > > java.lang.Thread.State: WAITING (on object monitor) > > > > at java.lang.Object.wait(Native Method) > > > > - waiting on <0x00000000ffbe8910> (a > > > java.lang.ref.ReferenceQueue$Lock) > > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) > > > > - locked <0x00000000ffbe8910> (a > java.lang.ref.ReferenceQueue$Lock) > > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) > > > > at > java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) > > > > > > > > "Reference Handler" daemon prio=10 tid=0x00007feb1806b000 nid=0xcc8 > in > > > > Object.wait() [0x00007feb1d508000] > > > > java.lang.Thread.State: WAITING (on object monitor) > > > > at java.lang.Object.wait(Native Method) > > > > - waiting on <0x00000000ffbe8aa0> (a > java.lang.ref.Reference$Lock) > > > > at java.lang.Object.wait(Object.java:485) > > > > at > java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) > > > > - locked <0x00000000ffbe8aa0> (a java.lang.ref.Reference$Lock) > > > > > > > > "VM Thread" prio=10 tid=0x00007feb18064800 nid=0xcc7 runnable > > > > > > > > "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007feb18019800 > > nid=0xcc3 > > > > runnable > > > > > > > > "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007feb1801b800 > > nid=0xcc4 > > > > runnable > > > > > > > > "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007feb1801d800 > > nid=0xcc5 > > > > runnable > > > > > > > > "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007feb1801f000 > > nid=0xcc6 > > > > runnable > > > > > > > > "VM Periodic Task Thread" prio=10 tid=0x00007feb180a3000 nid=0xcce > > > waiting > > > > on condition > > > > > > > > JNI global references: 1310 > > > > > > > > Heap > > > > PSYoungGen total 29888K, used 26071K [0x00000000fdeb0000, > > > > 0x0000000100000000, 0x0000000100000000) > > > > eden space 25664K, 92% used > > > > [0x00000000fdeb0000,0x00000000ff5f5f60,0x00000000ff7c0000) > > > > from space 4224K, 53% used > > > > [0x00000000ffbe0000,0x00000000ffe10050,0x0000000100000000) > > > > to space 4224K, 0% used > > > > [0x00000000ff7c0000,0x00000000ff7c0000,0x00000000ffbe0000) > > > > PSOldGen total 68288K, used 0K [0x00000000f9c00000, > > > > 0x00000000fdeb0000, 0x00000000fdeb0000) > > > > object space 68288K, 0% used > > > > [0x00000000f9c00000,0x00000000f9c00000,0x00000000fdeb0000) > > > > PSPermGen total 29888K, used 29878K [0x00000000f4a00000, > > > > 0x00000000f6730000, 0x00000000f9c00000) > > > > object space 29888K, 99% used > > > > [0x00000000f4a00000,0x00000000f672dad8,0x00000000f6730000) > > > > > > > > > > > > > > > > On Wed, Dec 4, 2013 at 11:11 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > >> Could you take a thread dump and see where the createMessageStreams > is > > > >> stuck? > > > >> > > > >> Thanks, > > > >> > > > >> Jun > > > >> > > > >> > > > >> On Wed, Dec 4, 2013 at 4:10 AM, Tarang Dawer < > tarang.da...@gmail.com> > > > >> wrote: > > > >> > > > >> > Hi All > > > >> > > > > >> > I am using Kafka 0.8 , with inbuilt zookeeper. > > > >> > I am runnings consumers in a jar. > > > >> > > > > >> > > > > >> > > > > >> > Configuration properties for creating consumerConfig : - > > > >> > > > > >> > zookeeper.connect=IP > > > >> > group.id=consumerGroup > > > >> > fetch.message.max.bytes=1000000000 > > > >> > zookeeper.session.timeout.ms=60000 > > > >> > auto.offset.reset=smallest > > > >> > zookeeper.sync.time.ms=200 > > > >> > auto.commit.enable=false > > > >> > > > > >> > While running it, some times the consumer gets stuck on the > message > > > >> stream > > > >> > creation call i.e > > > >> > > > > >> > > > > >> > ConsumerConfig consumerConfig = new ConsumerConfig(this.props); > > > >> > > > > >> > ConsumerConnector consumerConnector = Consumer > > > >> > .createJavaConsumerConnector(consumerConfig); > > > >> > > > > >> > > > > >> > *Map<String, List<KafkaStream<byte[], byte[]>>> messageStreamMap = > > > >> > consumerConnector > > .createMessageStreams(topicMap)*; > > > >> > > > > >> > I have tested this thing on a number of partition settings , i.e > > 1,2,5 > > > >> etc. > > > >> > > > > >> > Whenever the consumer gets stuck, i have to kill the jar, and > start > > it > > > >> > again. This thing happens only a select couple of times , i.e 2 > or 3 > > > >> times > > > >> > out of 10, and for rest of the times, the consumer starts fine. > > > >> > > > > >> > Is there some property configuration that i may have been missing, > > > >> that's > > > >> > causing this inconsistent behaviour ? > > > >> > > > > >> > > > > >> > Any guidance would be of great help. > > > >> > > > > >> > Thanks > > > >> > Tarang Dawer > > > >> > > > > >> > > > > > > > > > > > > > >