[
https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14626050#comment-14626050
]
ASF GitHub Bot commented on KAFKA-1595:
---------------------------------------
GitHub user ijuma opened a pull request:
https://github.com/apache/kafka/pull/74
KAFKA-1595; Remove deprecated and slower scala JSON parser
A thin wrapper over Jackson's Tree Model API is used as the replacement.
This wrapper
increases safety while providing a simple, but powerful API through the
usage of the
`DecodeJson` type class. Even though this has a maintenance cost, it makes
the API
much more convenient from Scala. A number of tests were added to verify the
behaviour of this wrapper.
The Scala module for Jackson doesn't provide any help for our current
usage, so we don't
depend on it.
An attempt has been made to maintain the existing behaviour regarding when
exceptions
are thrown. There are a number of cases where `JsonMappingException` will
be thrown
instead of `ClassCastException`, however. It is expected that users would
not try to catch
`ClassCastException`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ijuma/kafka
kafka-1595-remove-deprecated-json-parser-jackson
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/74.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #74
----
commit 61f20cc04a89200c28eb77137671235516c81847
Author: Ismael Juma <[email protected]>
Date: 2015-04-20T20:53:54Z
Introduce `testJsonParse`
Simple test that shows existing behaviour.
commit 4ca0f1111eb37e8be2d388b60efacc19bc6788b6
Author: Ismael Juma <[email protected]>
Date: 2015-04-21T00:15:02Z
KAFKA-1595; Remove deprecated and slower scala JSON parser from
kafka.consumer.TopicCount
A thin wrapper over Jackson's Tree Model API is used as the replacement.
This wrapper
increases safety while providing a simple, but powerful API through the
usage of the
`DecodeJson` type class. Even though this has a maintenance cost, it makes
the API
much more convenient from Scala. A number of tests were added to verify the
behaviour of this wrapper.
The Scala module for Jackson doesn't provide any help for our current
usage, so we don't
depend on it.
An attempt has been made to maintain the existing behaviour regarding when
exceptions
are thrown. There are a number of cases where `JsonMappingException` will
be thrown
instead of `ClassCastException`, however. It is expected that users would
not try to catch
`ClassCastException`.
commit f401990f13bddbd3d97e05756cf2f1abf367677e
Author: Ismael Juma <[email protected]>
Date: 2015-04-21T00:23:39Z
Minor clean-ups in `Json.encode`
----
> Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
> -----------------------------------------------------------------------------
>
> Key: KAFKA-1595
> URL: https://issues.apache.org/jira/browse/KAFKA-1595
> Project: Kafka
> Issue Type: Improvement
> Affects Versions: 0.8.1.1
> Reporter: Jagbir
> Assignee: Ismael Juma
> Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1595.patch
>
>
> The following issue is created as a follow up suggested by Jun Rao
> in a kafka news group message with the Subject
> "Blocking Recursive parsing from
> kafka.consumer.TopicCount$.constructTopicCount"
> SUMMARY:
> An issue was detected in a typical cluster of 3 kafka instances backed
> by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
> java version 1.7.0_65). On consumer end, when consumers get recycled,
> there is a troubling JSON parsing recursion which takes a busy lock and
> blocks consumers thread pool.
> In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
> a global lock (0x00000000d3a7e1d0) during the rebalance, and fires an
> expensive JSON parsing, while keeping the other consumers from shutting
> down, see, e.g,
> at
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> The deep recursive JSON parsing should be deprecated in favor
> of a better JSON parser, see, e.g,
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries?
> DETAILS:
> The first dump is for a recursive blocking thread holding the lock for
> 0x00000000d3a7e1d0
> and the subsequent dump is for a waiting thread.
> (Please grep for 0x00000000d3a7e1d0 to see the locked object.)
> Â
> -------------------------8<-----------------------------------------
> "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
> prio=10 tid=0x00007f24dc285800 nid=0xda9 runnable [0x00007f249e40b000]
> java.lang.Thread.State: RUNNABLE
> at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
> at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
> at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
> at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.lexical.Scanners$Scanner.<init>(Scanners.scala:49)
> at
> scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:60)
> at
> scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:44)
> at
> scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:608)
> at
> scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:606)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:736)
> at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:54)
> at scala.util.parsing.json.JSON$.parseFull(JSON.scala:68)
> at kafka.utils.Json$.liftedTree1$1(Json.scala:37)
> at kafka.utils.Json$.parseFull(Json.scala:36)
> - locked <0x00000000c5a7cdd8> (a java.lang.Object)
> at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:56)
> at
> kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:678)
> at
> kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:677)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.utils.ZkUtils$.getConsumersPerTopic(ZkUtils.scala:677)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:437)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402)
> - locked <0x00000000d3a7e1d0> (a java.lang.Object)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
> --------------------------------------------------------------------
> Many BLOCKED Threads had stack trace similar to the following stack waiting
> to lock 0x00000000d3a7e1d0
> -------------------------8<-----------------------------------------
> "application-my-context-105" prio=10 tid=0x00007f24e45aa800 nid=0x3494
> waiting for monitor entry [0x00007f24afe26000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00000000d3a7e1d0> (a java.lang.Object)
> at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at com.custom1.Consumer.cancel(Consumer.java:312)
> at com.custom1.Consumer.run(Consumer.java:302)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> -------------------------------------------------------------------
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)