E. Sammer created KAFKA-2284:
--------------------------------

             Summary: ConsumerRebalanceListener receives wrong type in 
partitionOwnership values
                 Key: KAFKA-2284
                 URL: https://issues.apache.org/jira/browse/KAFKA-2284
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.8.2.0
            Reporter: E. Sammer
            Assignee: Neha Narkhede
            Priority: Blocker


The ConsumerRebalanceListener's beforeReleasingPartitions() method is supposed 
to receive an arg of Map<String, Set<Integer>> (topic -> Set(partitions)). Even 
though the type of the map value is specified as a java.util.Set, a 
scala.collection.convert.Wrappers$JSetWrapper is passed instead which does not 
implement Set<T> causing a class cast exception as soon as one attempts to 
access any value of the map. It looks as if this method was never tested 
against the actual types specified by the interface.

Here's what happens if you call {{Set<T> foo = partitionOwnership.get(topic)}}:

{code}
2015-06-18 07:28:43,776 
(search-consumer_esammer-mbp.local-1434637723383-12126c1b_watcher_executor) 
[WARN - 
com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:246)]
 Exception while rebalancing!
java.lang.ClassCastException: scala.collection.convert.Wrappers$JSetWrapper 
cannot be cast to java.util.Set
        at 
com.rocana.search.consumer.IndexConsumerWorker.onRebalance(IndexConsumerWorker.java:80)
        at 
com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:244)
        at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:675)
        at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142)
        at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:619)
        at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
        at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615)
        at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:568)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to