Hi Jun, The parser is being used by kafka/core/src/main/scala/kafka/consumer/TopicCount.scala:56 As per your suggestion I've filed the JIRA https://issues.apache.org/jira/browse/KAFKA-1595 Thanks for looking into it. jsh
> Date: Wed, 13 Aug 2014 08:22:22 -0700 > Subject: Re: Blocking Recursive parsing from > kafka.consumer.TopicCount$.constructTopicCount > From: jun...@gmail.com > To: users@kafka.apache.org > > Are you using Scala JSON in your consumer application? > > Yes, we probably need to switch off Scala JSON since it's being deprecated. > Could you file a jira and put the link there? > > Thanks, > > Jun > > > On Tue, Aug 12, 2014 at 11:14 PM, Jagbir Hooda <jsho...@hotmail.com> wrote: > > > > Date: Tue, 12 Aug 2014 16:35:35 -0700 > > > Subject: Re: Blocking Recursive parsing from > > kafka.consumer.TopicCount$.constructTopicCount > > > From: wangg...@gmail.com > > > To: users@kafka.apache.org > > > > > > Hi Jagbir, > > > > > > The thread dump you uploaded is not readable, could you re-parse it and > > > upload again? > > > > > > Gouging > > Hi Guozhang, > > I'm sorry that that the email got garbled up. Below is another attempt. > > The first dump is for a recursive blocking thread holding the lock for > > 0x00000000d3a7e1d0and the subsequent dump is for a waiting thread. > > (Please grep for 0x00000000d3a7e1d0 to see the locked object.) > > -------------------------8<-----------------------------------------"Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"prio=10 > > tid=0x00007f24dc285800 nid=0xda9 runnable > > [0x00007f249e40b000]java.lang.Thread.State: RUNNABLEat > > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)at > > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)at > > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)at > > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.lexical.Scanners$Scanner.<init>(Scanners.scala:49)at > > scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:60)at > > scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:44)at > > scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:608)at > > scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:606)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:736)at > > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at > > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at > > scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)at > > scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)at > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)at > > scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)at > > scala.util.parsing.json.JSON$.parseRaw(JSON.scala:54)at > > scala.util.parsing.json.JSON$.parseFull(JSON.scala:68)at > > kafka.utils.Json$.liftedTree1$1(Json.scala:37)at > > kafka.utils.Json$.parseFull(Json.scala:36)- locked <0x00000000c5a7cdd8> (a > > java.lang.Object)at > > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:56)at > > kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:678)at > > kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:677)at > > scala.collection.Iterator$class.foreach(Iterator.scala:727)at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)at > > scala.collection.AbstractIterable.foreach(Iterable.scala:54)at > > kafka.utils.ZkUtils$.getConsumersPerTopic(ZkUtils.scala:677)at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:437)at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)at > > scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402)- > > locked <0x00000000d3a7e1d0> (a java.lang.Object)at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)-------------------------------------------------------------------- > > Many BLOCKED Threads had stack trace similar to the following stack > > waiting to lock 0x00000000d3a7e1d0 > > -------------------------8<-----------------------------------------"application-my-context-105" > > prio=10 tid=0x00007f24e45aa800 nid=0x3494 waiting for monitor entry > > [0x00007f24afe26000]java.lang.Thread.State: BLOCKED (on object monitor)at > > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)- > > waiting to lock <0x00000000d3a7e1d0> (a java.lang.Object)at > > kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)at > > com.custom1.Consumer.cancel(Consumer.java:312)at > > com.custom1.Consumer.run(Consumer.java:302)at > > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)------------------------------------------------------------------- > > > >