[ 
https://issues.apache.org/jira/browse/KAFKA-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15634855#comment-15634855
 ] 

Bernard Leach commented on KAFKA-2284:
--------------------------------------

Just came across this as a failing test in 
https://github.com/apache/kafka/pull/2101 where I had fixed the underlying bug. 
 I've separated the fix out to a separate PR 
https://github.com/apache/kafka/pull/2102.

> ConsumerRebalanceListener receives wrong type in partitionOwnership values
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-2284
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2284
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.2.0
>            Reporter: E. Sammer
>            Assignee: Neha Narkhede
>
> The ConsumerRebalanceListener's beforeReleasingPartitions() method is 
> supposed to receive an arg of Map<String, Set<Integer>> (topic -> 
> Set(partitions)). Even though the type of the map value is specified as a 
> java.util.Set, a scala.collection.convert.Wrappers$JSetWrapper is passed 
> instead which does not implement Set<T> causing a class cast exception as 
> soon as one attempts to access any value of the map. It looks as if this 
> method was never tested against the actual types specified by the interface.
> Here's what happens if you call {{Set<T> foo = 
> partitionOwnership.get(topic)}}:
> {code}
> 2015-06-18 07:28:43,776 
> (search-consumer_esammer-mbp.local-1434637723383-12126c1b_watcher_executor) 
> [WARN - 
> com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:246)]
>  Exception while rebalancing!
> java.lang.ClassCastException: scala.collection.convert.Wrappers$JSetWrapper 
> cannot be cast to java.util.Set
>       at 
> com.rocana.search.consumer.IndexConsumerWorker.onRebalance(IndexConsumerWorker.java:80)
>       at 
> com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:244)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:675)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:619)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:568)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to