By the "reassignment tool", do you mean the tool as described at
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool,
which just writes the JSON to ZooKeeper, or the code in KafkaController
that is actually responsible for reassignment?

Would it be appropriate to add a warning to the documentation, to the
effect of "WARNING: in 0.8, this tool may corrupt the state of partition
assignment, rendering your topic unusable"?

Thanks,

Ryan


On Thu, Mar 13, 2014 at 7:09 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> There are some known bugs with the partition reassignment tool in 0.8. We
> fixed a lot of those, tested it on large set of partitions and found it to
> be stable in 0.8.1. Could you give 0.8.1 a spin instead?
>
>
> On Thu, Mar 13, 2014 at 4:05 PM, Ryan Berdeen <rberd...@hubspot.com>
> wrote:
>
> > After reassigning a topic to 5 new brokers, the topic metadata returned
> by
> > the brokers for the topic is causing an exception when a producer tries
> to
> > read it:
> >
> > java.util.NoSuchElementException: key not found: 7
> > ...
> > at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
> >
> > The reassignment was from brokers 1, 4, 5, 7, 8 to 10, 11, 12, 13, 14. In
> > the topic metadata response, only the 5 new brokers (>= 10) are included,
> > but there is a partition whose ISR includes broker 7. It looks like this:
> >
> > {:error-code 0, :partition-id 13, :leader-id 10, :num-replicas 4,
> > :replica-ids [10 12 13 14], :num-isr 5, :isr-ids [10 14 13 12 7]}
> >
> > For all other partitions, the ISR and the replicas are the same brokers,
> > but for this partition, the ISR is larger. I stopped and started the
> > controller, and the and topic metadata was corrected.
> >
> > Has anyone seen this before? Is there anything in particular to look for
> in
> > the logs?
> >
> > Thanks,
> >
> > Ryan
> >
> > Here's the full exception:
> >
> > java.util.NoSuchElementException: key not found: 7
> >   at scala.collection.MapLike$class.default(MapLike.scala:225)
> >   at scala.collection.immutable.HashMap.default(HashMap.scala:38)
> >   at scala.collection.MapLike$class.apply(MapLike.scala:135)
> >   at scala.collection.immutable.HashMap.apply(HashMap.scala:38)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >   at scala.collection.immutable.VectorIterator.foreach(Vector.scala:648)
> >   at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> >   at scala.collection.immutable.Vector.foreach(Vector.scala:63)
> >   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> >   at scala.collection.immutable.Vector.map(Vector.scala:63)
> >   at kafka.api.PartitionMetadata$.readFrom(TopicMetadata.scala:104)
> >   at
> > kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:37)
> >   at
> > kafka.api.TopicMetadata$$anonfun$readFrom$1.apply(TopicMetadata.scala:36)
> >   at scala.collection.immutable.Range.foreach(Range.scala:78)
> >   at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
> >   at
> >
> kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
> >   at
> >
> kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >   at scala.collection.immutable.Range.foreach(Range.scala:81)
> >   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> >   at scala.collection.immutable.Range.map(Range.scala:46)
> >   at
> > kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
> >   at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> >   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> >   at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >   at
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> >   at
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> >   at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> >   at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> >   at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> >   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >   at
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> >   at
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> >   at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> >   at kafka.producer.Producer.send(Producer.scala:76)
> >   at kafka.javaapi.producer.Producer.send(Producer.scala:42)
> >
>

Reply via email to