[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16034038#comment-16034038 ]
ASF GitHub Bot commented on KAFKA-1595: --------------------------------------- GitHub user ijuma reopened a pull request: https://github.com/apache/kafka/pull/83 KAFKA-1595; Remove deprecated and slower scala JSON parser Tested that we only use Jackson methods introduced in 2.0 in the main codebase by compiling it with the older version locally. We use a constructor introduced in 2.4 in one test, but I didn't remove it as it seemed harmless. The reasoning for this is explained in the mailing list thread: http://search-hadoop.com/m/uyzND1FWbWw1qUbWe You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka kafka-1595-remove-deprecated-json-parser-jackson Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/83.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #83 ---- commit bdceb1080ef78258c81bb3b8405be1f5e07a12b1 Author: Ismael Juma <ism...@juma.me.uk> Date: 2015-07-17T10:43:22Z Update to JUnit 4.12. It includes `assertNotEquals`, which is used in a subsequent commit. commit 117cd1db07798d5d8ff021fa2a1569a301d775e6 Author: Ismael Juma <ism...@juma.me.uk> Date: 2015-07-17T11:03:00Z Introduce `testJsonParse` Simple test that shows existing behaviour. commit 26b0525df2c04b0ffd7718b1ef31b94ce12e486c Author: Ismael Juma <ism...@juma.me.uk> Date: 2015-07-17T11:05:00Z KAFKA-1595; Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount A thin wrapper over Jackson's Tree Model API is used as the replacement. This wrapper increases safety while providing a simple, but powerful API through the usage of the `DecodeJson` type class. Even though this has a maintenance cost, it makes the API much more convenient from Scala. A number of tests were added to verify the behaviour of this wrapper. The Scala module for Jackson doesn't provide any help for our current usage, so we don't depend on it. An attempt has been made to maintain the existing behaviour regarding when exceptions are thrown. There are a number of cases where `JsonMappingException` will be thrown instead of `ClassCastException`, however. It is expected that users would not try to catch `ClassCastException`. commit 8a952c31866933fd769840221293ba9c95fed162 Author: Ismael Juma <ism...@juma.me.uk> Date: 2015-07-17T11:06:00Z Minor clean-ups in `Json.encode` ---- > Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount > ----------------------------------------------------------------------------- > > Key: KAFKA-1595 > URL: https://issues.apache.org/jira/browse/KAFKA-1595 > Project: Kafka > Issue Type: Improvement > Affects Versions: 0.8.1.1 > Reporter: Jagbir > Assignee: Ismael Juma > Labels: newbie > > The following issue is created as a follow up suggested by Jun Rao > in a kafka news group message with the Subject > "Blocking Recursive parsing from > kafka.consumer.TopicCount$.constructTopicCount" > SUMMARY: > An issue was detected in a typical cluster of 3 kafka instances backed > by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, > java version 1.7.0_65). On consumer end, when consumers get recycled, > there is a troubling JSON parsing recursion which takes a busy lock and > blocks consumers thread pool. > In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes > a global lock (0x00000000d3a7e1d0) during the rebalance, and fires an > expensive JSON parsing, while keeping the other consumers from shutting > down, see, e.g, > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) > The deep recursive JSON parsing should be deprecated in favor > of a better JSON parser, see, e.g, > http://engineering.ooyala.com/blog/comparing-scala-json-libraries? > DETAILS: > The first dump is for a recursive blocking thread holding the lock for > 0x00000000d3a7e1d0 > and the subsequent dump is for a waiting thread. > (Please grep for 0x00000000d3a7e1d0 to see the locked object.) > Â > -------------------------8<----------------------------------------- > "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor" > prio=10 tid=0x00007f24dc285800 nid=0xda9 runnable [0x00007f249e40b000] > java.lang.Thread.State: RUNNABLE > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.lexical.Scanners$Scanner.<init>(Scanners.scala:49) > at > scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:60) > at > scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:44) > at > scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:608) > at > scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:606) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:736) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) > at > scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) > at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:54) > at scala.util.parsing.json.JSON$.parseFull(JSON.scala:68) > at kafka.utils.Json$.liftedTree1$1(Json.scala:37) > at kafka.utils.Json$.parseFull(Json.scala:36) > - locked <0x00000000c5a7cdd8> (a java.lang.Object) > at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:56) > at > kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:678) > at > kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:677) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.utils.ZkUtils$.getConsumersPerTopic(ZkUtils.scala:677) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:437) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402) > - locked <0x00000000d3a7e1d0> (a java.lang.Object) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355) > -------------------------------------------------------------------- > Many BLOCKED Threads had stack trace similar to the following stack waiting > to lock 0x00000000d3a7e1d0 > -------------------------8<----------------------------------------- > "application-my-context-105" prio=10 tid=0x00007f24e45aa800 nid=0x3494 > waiting for monitor entry [0x00007f24afe26000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) > - waiting to lock <0x00000000d3a7e1d0> (a java.lang.Object) > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) > at com.custom1.Consumer.cancel(Consumer.java:312) > at com.custom1.Consumer.run(Consumer.java:302) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > ------------------------------------------------------------------- -- This message was sent by Atlassian JIRA (v6.3.15#6346)