Hi Valentin, Is inter.broker.protocol.version set correctly in brokers 1 and 2? It should be 0.10.0 so that they can talk to the older broker without issue.
Ismael On Thu, Dec 22, 2016 at 4:42 PM, Valentin Golev <valentin.go...@gdeslon.ru> wrote: > Hello, > > I have a three broker Kafka setup (the ids are 1, 2 (kafka 0.10.1.0) and > 1001 (kafka 0.10.0.0)). After a failure of two of them a lot of the > partitions have the third one (1001) as their leader. It's like this: > > Topic: userevents0.open Partition: 5 Leader: 1 Replicas: > 1,2,1001 Isr: 1,1001,2 > Topic: userevents0.open Partition: 6 Leader: 2 Replicas: > 2,1,1001 Isr: 1,2,1001 > Topic: userevents0.open Partition: 7 Leader: 1001 Replicas: > 1001,2,1 Isr: 1001 > Topic: userevents0.open Partition: 8 Leader: 1 Replicas: > 1,1001,2 Isr: 1,1001,2 > Topic: userevents0.open Partition: 9 Leader: 1001 Replicas: > 2,1001,1 Isr: 1001 > Topic: userevents0.open Partition: 10 Leader: 1001 Replicas: > 1001,1,2 Isr: 1001 > > As you can see, only the partitions with Leaders 1 or 2 have successfully > replicated. Brokers 1 and 2, however, are unable to fetch data from the > 1001. > > All of the partitions are available to the consumers and producers. So > everything is fine except replication. 1001 is available from the other > servers. > > I can't restart the broker 1001 because it seems that it will cause data > loss (as you can see, it's the only ISR on many partitions). Restarting the > other brokers didn't help at all. Neither did just plain waiting (it's the > third day of this going on). So what do I do? > > The logs of the broker 2 (the one which tries to fetch data) are full of > this: > > [2016-12-22 16:38:52,199] WARN [ReplicaFetcherThread-0-1001], Error in > fetch kafka.server.ReplicaFetcherThread$FetchRequest@117a49bf > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 1001 was disconnected before the > response was read > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$ > extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$ > extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) > at scala.Option.foreach(Option.scala:257) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$ > extension$1.apply(NetworkClientBlockingOps.scala:112) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$ > extension$1.apply(NetworkClientBlockingOps.scala:108) > at > kafka.utils.NetworkClientBlockingOps$.recursivePoll$1( > NetworkClientBlockingOps.scala:137) > at > kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > NetworkClientBlockingOps$$pollContinuously$extension( > NetworkClientBlockingOps.scala:143) > at > kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension( > NetworkClientBlockingOps.scala:108) > at > kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala: > 253) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:118) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > > The logs of the broker 1001 are full of this: > > [2016-12-22 16:38:54,226] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.nio.BufferUnderflowException > at java.nio.Buffer.nextGetIndex(Buffer.java:506) > at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361) > at > kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1. > apply(FetchRequest.scala:55) > at > kafka.api.FetchRequest$$anonfun$1$$anonfun$apply$1. > apply(FetchRequest.scala:52) > at > scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:52) > at kafka.api.FetchRequest$$anonfun$1.apply(FetchRequest.scala:49) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply( > TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply( > TraversableLike.scala:241) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at kafka.api.FetchRequest$.readFrom(FetchRequest.scala:49) > at > kafka.network.RequestChannel$Request$$anonfun$2.apply( > RequestChannel.scala:65) > at > kafka.network.RequestChannel$Request$$anonfun$2.apply( > RequestChannel.scala:65) > at > kafka.network.RequestChannel$Request$$anonfun$4.apply( > RequestChannel.scala:71) > at > kafka.network.RequestChannel$Request$$anonfun$4.apply( > RequestChannel.scala:71) > at scala.Option.map(Option.scala:146) > at > kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:71) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1. > apply(SocketServer.scala:488) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1. > apply(SocketServer.scala:483) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.network.Processor.processCompletedReceives(SocketServer.scala:483) > at kafka.network.Processor.run(SocketServer.scala:413) > at java.lang.Thread.run(Thread.java:745) >