[ 
https://issues.apache.org/jira/browse/KAFKA-4596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823797#comment-15823797
 ] 

Ismael Juma commented on KAFKA-4596:
------------------------------------

[~ewencp], I'm tentatively assigning this to Ben as he knows this code well and 
the fix is trivial, but important. Ben is back on Wednesday and if he can't do 
it, we'll find an alternative.

> KIP-73 rebalance throttling breaks on plans for specific partitions
> -------------------------------------------------------------------
>
>                 Key: KAFKA-4596
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4596
>             Project: Kafka
>          Issue Type: Bug
>         Environment: Kafka 0.10.1.1
>            Reporter: Tom Crayford
>            Assignee: Ben Stopford
>             Fix For: 0.10.2.0
>
>
> The reassign-partitions.sh command fails if you both *throttle* and give it a 
> specific partition reassignment. For example, upon reassigning 
> {code}__consumer_offsets{code} partition 19, you get the following error:
> {code}
> Save this to use as the --reassignment-json-file option during rollback
> Warning: You must run Verify periodically, until the reassignment completes, 
> to ensure the throttle is removed. You can also alter the throttle by 
> rerunning the Execute command passing a new value.
> The throttle limit was set to 1048576 B/s
> Partitions reassignment failed due to key not found: [__consumer_offsets,30]
> java.util.NoSuchElementException: key not found: [__consumer_offsets,30]
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:59)
>         at scala.collection.MapLike$class.apply(MapLike.scala:141)
>         at scala.collection.AbstractMap.apply(Map.scala:59)
>         at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
> 2)
>         at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
> 1)
>         at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>         at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
>         at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
>         at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
>         at 
> kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371)
>         at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347)
>         at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343)
>         at 
> kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317)
>         at 
> kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387)
>         at 
> kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165)
>         at 
> kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149)
>         at 
> kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46)
>         at 
> kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
> {code}
> This effectively breaks the throttling feature unless you want to rebalance 
> many many partitions at once.
> For reference the command that was run is:
> {code}
> kafka-reassign-partitions.sh --reassignment-json-file 
> 9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829
> -4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576'
> {code}
> and the contents of the plan file is:
> {code}
> {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]}
> {code}
> This seems like a simple logic error to me, where we're trying to look up a 
> partition that's not been proposed, when we should not be. It looks like the 
> logic assumes that {code}Map.apply{code} doesn't error if the lookup value 
> isn't found, when in fact it does.
> I checked that this cluster does indeed have the __consumer_offsets topic 
> populated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to