[ https://issues.apache.org/jira/browse/KAFKA-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15634855#comment-15634855 ]
Bernard Leach commented on KAFKA-2284: -------------------------------------- Just came across this as a failing test in https://github.com/apache/kafka/pull/2101 where I had fixed the underlying bug. I've separated the fix out to a separate PR https://github.com/apache/kafka/pull/2102. > ConsumerRebalanceListener receives wrong type in partitionOwnership values > -------------------------------------------------------------------------- > > Key: KAFKA-2284 > URL: https://issues.apache.org/jira/browse/KAFKA-2284 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.2.0 > Reporter: E. Sammer > Assignee: Neha Narkhede > > The ConsumerRebalanceListener's beforeReleasingPartitions() method is > supposed to receive an arg of Map<String, Set<Integer>> (topic -> > Set(partitions)). Even though the type of the map value is specified as a > java.util.Set, a scala.collection.convert.Wrappers$JSetWrapper is passed > instead which does not implement Set<T> causing a class cast exception as > soon as one attempts to access any value of the map. It looks as if this > method was never tested against the actual types specified by the interface. > Here's what happens if you call {{Set<T> foo = > partitionOwnership.get(topic)}}: > {code} > 2015-06-18 07:28:43,776 > (search-consumer_esammer-mbp.local-1434637723383-12126c1b_watcher_executor) > [WARN - > com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:246)] > Exception while rebalancing! > java.lang.ClassCastException: scala.collection.convert.Wrappers$JSetWrapper > cannot be cast to java.util.Set > at > com.rocana.search.consumer.IndexConsumerWorker.onRebalance(IndexConsumerWorker.java:80) > at > com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:244) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:675) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:619) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:568) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)