Hi Jun / Kafka Team,

Do we have any solution for this issue ?

"During zookeeper re-init kafka broker truncates messages and ends up in
loosing records"

I'm ok with with duplicate messages being stored instead of dropping.

Is there any configuration in kafka where follower broker replicates these
messages to leader instead of dropping it ?

Thank you.

Regards,
Mazhar Shaikh.



On Tue, Aug 30, 2016 at 6:42 PM, Mazhar Shaikh <mazhar.shaikh...@gmail.com>
wrote:

> Hi Jun,
>
> Yes, the data is lost during leader broker failure.
> But the leader broker failed due to zookeeper session expiry.
> GC logs doesn't show any error/warns during this period.
>
> Its not easy reproduce. during long run (>12hrs) with 30k msg/sec load
> balanced across 96 partitions, some time in between this failure is noticed
> (once/twice).
>
> looks like this issue is similar to "https://issues.apache.org/
> jira/browse/KAFKA-1211".
>
> But here the leader broker could have synced/committed all the the
> existing data to replica before the replica is elected as leader.
>
> below are few log around this time.
>
> broker b2 was controller.
>
> b2:
> Server.log:
> [2016-08-26 16:15:49,701] INFO re-registering broker info in ZK for broker
> 1 (kafka.server.KafkaHealthcheck)
> [2016-08-26 16:15:49,738] INFO Registered broker 1 at path /brokers/ids/1
> with address b2.broker.com:9092. (kafka.utils.ZkUtils$)
> [2016-08-26 16:15:49,739] INFO done re-registering broker (kafka.server.
> KafkaHealthcheck)
> [2016-08-26 16:15:49,740] INFO Subscribing to /brokers/topics path to
> watch for new topics (kafka.server.KafkaHealthcheck)
> [2016-08-26 16:15:50,055] INFO New leader is 0 (kafka.server.
> ZookeeperLeaderElector$LeaderChangeListener)
> [2016-08-26 16:15:50,538] WARN [KafkaApi-1] Produce request with
> correlation id 422786 from client rdkafka on partition [topic,92] failed
> due to Leader not local for partition [topic,92] on broker 1
> (kafka.server.KafkaApis)
> [2016-08-26 16:15:50,544] INFO Truncating log topic-92 to offset 1746617.
> (kafka.log.Log)
> [2016-08-26 16:15:50,562] WARN [KafkaApi-1] Produce request with
> correlation id 422793 from client rdkafka on partition [topic,92] failed
> due to Leader not local for partition [topic,92] on broker 1
> (kafka.server.KafkaApis)
> [2016-08-26 16:15:50,578] WARN [KafkaApi-1] Produce request with
> correlation id 422897 from client rdkafka on partition [topic,92] failed
> due to Leader not local for partition [topic,92] on broker 1
> (kafka.server.KafkaApis)
> [2016-08-26 16:15:50,719] ERROR Closing socket for /169.254.2.116 because
> of error (kafka.network.Processor)
> kafka.common.KafkaException: Size of FileMessageSet
> /data/kafka/broker-b2/topic-66/00000000000000000000.log has been
> truncated during write: old size 1048576, new size 0
>         at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
>         at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
>         at kafka.network.MultiSend.writeTo(Transmission.scala:101)
>         at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
>         at kafka.network.MultiSend.writeTo(Transmission.scala:101)
>         at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
>         at kafka.network.Processor.write(SocketServer.scala:472)
>         at kafka.network.Processor.run(SocketServer.scala:342)
>         at java.lang.Thread.run(Thread.java:744)
> [2016-08-26 16:15:50,729] ERROR Closing socket for /169.254.2.116 because
> of error (kafka.network.Processor)
> kafka.common.KafkaException: Size of FileMessageSet
> /data/kafka/broker-b2/topic-68/00000000000000000000.log has been
> truncated during write: old size 1048576, new size 0
>         at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
>         at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
>         at kafka.network.MultiSend.writeTo(Transmission.scala:101)
>         at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
>         at kafka.network.MultiSend.writeTo(Transmission.scala:101)
>         at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
>         at kafka.network.Processor.write(SocketServer.scala:472)
>         at kafka.network.Processor.run(SocketServer.scala:342)
>         at java.lang.Thread.run(Thread.java:744)
>
>
> kafkaServer-gc.log:
> 2016-08-26T16:14:47.123+0530: 6123.763: [GC2016-08-26T16:14:47.123+0530:
> 6123.763: [ParNew: 285567K->5992K(314560K), 0.0115200 secs]
> 648907K->369981K(1013632K), 0.0116800 secs] [Times: user=0.19 sys=0.00,
> real=0.01 secs]
> 2016-08-26T16:14:56.327+0530: 6132.967: [GC2016-08-26T16:14:56.327+0530:
> 6132.967: [ParNew: 285608K->5950K(314560K), 0.0105600 secs]
> 649597K->370626K(1013632K), 0.0107550 secs] [Times: user=0.15 sys=0.01,
> real=0.01 secs]
> 2016-08-26T16:15:06.615+0530: 6143.255: [GC2016-08-26T16:15:06.615+0530:
> 6143.255: [ParNew: 285566K->6529K(314560K), 0.0214330 secs]
> 650242K->371864K(1013632K), 0.0216380 secs] [Times: user=0.30 sys=0.03,
> real=0.02 secs]
> 2016-08-26T16:15:17.255+0530: 6153.895: [GC2016-08-26T16:15:17.255+0530:
> 6153.895: [ParNew: 286145K->6413K(314560K), 0.0248930 secs]
> 651480K->372390K(1013632K), 0.0251060 secs] [Times: user=0.32 sys=0.09,
> real=0.03 secs]
> 2016-08-26T16:15:36.892+0530: 6173.533: [GC2016-08-26T16:15:36.892+0530:
> 6173.533: [ParNew: 286029K->5935K(314560K), 0.0083220 secs]
> 652006K->372627K(1013632K), 0.0085410 secs] [Times: user=0.11 sys=0.02,
> real=0.01 secs]
> 2016-08-26T16:15:50.113+0530: 6186.753: [GC2016-08-26T16:15:50.113+0530:
> 6186.753: [ParNew: 285551K->15693K(314560K), 0.0139890 secs]
> 652243K->383039K(1013632K), 0.0142240 secs] [Times: user=0.26 sys=0.00,
> real=0.01 secs]
> 2016-08-26T16:15:56.403+0530: 6193.044: [GC2016-08-26T16:15:56.403+0530:
> 6193.044: [ParNew: 295309K->4838K(314560K), 0.0147950 secs]
> 662655K->372758K(1013632K), 0.0149970 secs] [Times: user=0.21 sys=0.02,
> real=0.01 secs]
> 2016-08-26T16:16:06.321+0530: 6202.961: [GC2016-08-26T16:16:06.321+0530:
> 6202.962: [ParNew: 284454K->5116K(314560K), 0.0129300 secs]
> 652374K->373794K(1013632K), 0.0131210 secs] [Times: user=0.23 sys=0.00,
> real=0.01 secs]
> 2016-08-26T16:16:18.586+0530: 6215.226: [GC2016-08-26T16:16:18.586+0530:
> 6215.226: [ParNew: 284732K->4909K(314560K), 0.0115050 secs]
> 653410K->374210K(1013632K), 0.0116810 secs] [Times: user=0.18 sys=0.01,
> real=0.01 secs]
>
>
> controller.log:
> [2016-08-26 16:15:41,076] INFO [SessionExpirationListener on 1], ZK
> expired; shut down all controller components and try to re-elect
> (kafka.controller.KafkaController$SessionExpirationListener)
>
> zookeeper.log:
> [2016-08-26 16:15:36,001] INFO Expiring session 0x156c616475a0000, timeout
> of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-08-26 16:15:36,001] INFO Processed session termination for
> sessionid: 0x156c616475a0000 (org.apache.zookeeper.server.
> PrepRequestProcessor)
> [2016-08-26 16:15:36,901] INFO Client session timed out, have not heard
> from server in 9582ms for sessionid 0x156c616475a0000, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-08-26 16:15:36,902] INFO Closing socket connection to /169.254.2.216.
> (kafka.network.Processor)
> [2016-08-26 16:15:36,960] ERROR Unexpected Exception:
> (org.apache.zookeeper.server.NIOServerCnxn)
> java.nio.channels.CancelledKeyException
>         at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.
> java:73)
>         at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.
> java:77)
>         at org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(
> NIOServerCnxn.java:151)
>         at org.apache.zookeeper.server.NIOServerCnxn.sendResponse(
> NIOServerCnxn.java:1081)
>         at org.apache.zookeeper.server.NIOServerCnxn.process(
> NIOServerCnxn.java:1118)
>         at org.apache.zookeeper.server.WatchManager.triggerWatch(
> WatchManager.java:120)
>         at org.apache.zookeeper.server.WatchManager.triggerWatch(
> WatchManager.java:92)
>         at org.apache.zookeeper.server.DataTree.deleteNode(DataTree.
> java:594)
>         at org.apache.zookeeper.server.DataTree.killSession(DataTree.
> java:966)
>         at org.apache.zookeeper.server.DataTree.processTxn(DataTree.
> java:818)
>         at org.apache.zookeeper.server.ZKDatabase.processTxn(
> ZKDatabase.java:329)
>         at org.apache.zookeeper.server.ZooKeeperServer.processTxn(
> ZooKeeperServer.java:994)
>         at org.apache.zookeeper.server.FinalRequestProcessor.
> processRequest(FinalRequestProcessor.java:116)
>         at org.apache.zookeeper.server.quorum.Leader$
> ToBeAppliedRequestProcessor.processRequest(Leader.java:644)
>         at org.apache.zookeeper.server.quorum.CommitProcessor.run(
> CommitProcessor.java:74)
>
> another zookeeper.
> [2016-08-26 16:15:41,501] WARN caught end of stream exception
> (org.apache.zookeeper.server.NIOServerCnxn)
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x356c616954f0000, likely client has closed socket
>         at org.apache.zookeeper.server.NIOServerCnxn.doIO(
> NIOServerCnxn.java:228)
>         at org.apache.zookeeper.server.NIOServerCnxnFactory.run(
> NIOServerCnxnFactory.java:208)
>         at java.lang.Thread.run(Thread.java:744)
> [2016-08-26 16:15:41,502] INFO Closed socket connection for client /
> 169.254.2.28:53561 which had sessionid 0x356c616954f0000
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2016-08-26 16:15:41,503] WARN caught end of stream exception
> (org.apache.zookeeper.server.NIOServerCnxn)
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x356c616954f0001, likely client has closed socket
>         at org.apache.zookeeper.server.NIOServerCnxn.doIO(
> NIOServerCnxn.java:228)
>         at org.apache.zookeeper.server.NIOServerCnxnFactory.run(
> NIOServerCnxnFactory.java:208)
>         at java.lang.Thread.run(Thread.java:744)
> [2016-08-26 16:15:41,503] INFO Closed socket connection for client /
> 169.254.2.27:49090 which had sessionid 0x356c616954f0001
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2016-08-26 16:15:41,504] WARN caught end of stream exception
> (org.apache.zookeeper.server.NIOServerCnxn)
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x256c616476a0000, likely client has closed socket
>         at org.apache.zookeeper.server.NIOServerCnxn.doIO(
> NIOServerCnxn.java:228)
>         at org.apache.zookeeper.server.NIOServerCnxnFactory.run(
> NIOServerCnxnFactory.java:208)
>         at java.lang.Thread.run(Thread.java:744)
> [2016-08-26 16:15:41,504] INFO Closed socket connection for client /
> 169.254.2.18:39069 which had sessionid 0x256c616476a0000
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2016-08-26 16:15:41,602] INFO Closing socket connection to /169.254.2.216.
> (kafka.network.Processor)
> [2016-08-26 16:15:41,607] INFO Closing socket connection to /169.254.2.216.
> (kafka.network.Processor)
> [2016-08-26 16:15:42,090] INFO Closing socket connection to /169.254.2.216.
> (kafka.network.Processor)
> [2016-08-26 16:15:42,093] INFO Closing socket connection to /169.254.2.216.
> (kafka.network.Processor)
> [2016-08-26 16:15:42,461] INFO Client session timed out, have not heard
> from server in 4002ms for sessionid 0x156c616475a0001, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-08-26 16:15:42,461] WARN caught end of stream exception
> (org.apache.zookeeper.server.NIOServerCnxn)
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x156c616475a0001, likely client has closed socket
>         at org.apache.zookeeper.server.NIOServerCnxn.doIO(
> NIOServerCnxn.java:228)
>         at org.apache.zookeeper.server.NIOServerCnxnFactory.run(
> NIOServerCnxnFactory.java:208)
>         at java.lang.Thread.run(Thread.java:744)
>
>
> Thank you.
>
> Regards,
> Mazhar Shaikh
>
>
> On Fri, Aug 19, 2016 at 8:05 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Mazhar,
>>
>> Let's first confirm if this is indeed a bug. As I mentioned earlier, it's
>> possible to have message loss with ack=1 when there are (leader) broker
>> failures. If this is not the case, please file a jira and describe how to
>> reproduce the problem. Also, it would be useful to know if the message
>> loss
>> happens with the java producer too. This will help isolate whether this is
>> a server side or a client side issue.
>>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Aug 19, 2016 at 2:15 AM, Mazhar Shaikh <
>> mazhar.shaikh...@gmail.com>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > In my earlier runs, I had enabled delivery report (with and without
>> offset
>> > report) facility provided by librdkafka.
>> >
>> > The producer has received successful delivery report for the all msg
>> sent
>> > even than the messages where lost.
>> >
>> > as you mentioned. producer has nothing to do with this loss of messages.
>> >
>> > I just want to know, as when can we get the fix for this bug ?
>> >
>> > Thanks.
>> >
>> > Regards,
>> > Mazhar Shaikh
>> >
>> >
>> > On Fri, Aug 19, 2016 at 1:24 AM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Mazhar,
>> > >
>> > > With ack=1, whether you lose messages or not is not deterministic. It
>> > > depends on the time when the broker receives/acks a message, the
>> follower
>> > > fetches the data and the broker fails. So, it's possible that you got
>> > lucky
>> > > in one version and unlucky in another.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Thu, Aug 18, 2016 at 12:11 PM, Mazhar Shaikh <
>> > > mazhar.shaikh...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > Thanks for clarification, I'll give a try with ack=-1 (in producer).
>> > > >
>> > > > However, i did a fallback to older version of kafka
>> > > (*kafka_2.10-0.8.2.1*),
>> > > > and i don't see this issue (loss of messages).
>> > > >
>> > > > looks like kafka_2.11-0.9.0.1 has issues(BUG) during replication.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Regards,
>> > > > Mazhar Shaikh.
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Aug 18, 2016 at 10:30 PM, Jun Rao <j...@confluent.io> wrote:
>> > > >
>> > > > > Mazhar,
>> > > > >
>> > > > > There is probably a mis-understanding. Ack=-1 (or all) doesn't
>> mean
>> > > > waiting
>> > > > > for all replicas. It means waiting for all replicas that are in
>> sync.
>> > > So,
>> > > > > if a replica is down, it will be removed from the in-sync
>> replicas,
>> > > which
>> > > > > allows the producer to continue with fewer replicas.
>> > > > >
>> > > > > For the connection issue that you saw in the log, this could
>> happen
>> > > when
>> > > > a
>> > > > > connection is idle for some time. It won't break the replication
>> > logic
>> > > > > since a new connection will be created automatically. You can
>> > increase
>> > > > the
>> > > > > socket idle time on the broker if you want to turn off this
>> behavior.
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Thu, Aug 18, 2016 at 12:07 AM, Mazhar Shaikh <
>> > > > > mazhar.shaikh...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Jun,
>> > > > > >
>> > > > > > Setting to -1, may solve this issue.
>> > > > > > But it will cause producer buffer full in load test resulting to
>> > > > failures
>> > > > > > and drop of messages from client(producer side)
>> > > > > > Hence, this will not actually solve the problem.
>> > > > > >
>> > > > > > I need to fix this from kafka broker side, so that there is no
>> > impact
>> > > > on
>> > > > > > producer or consumer.
>> > > > > >
>> > > > > > From the logs looks like there is connection problem during
>> between
>> > > > > brokers
>> > > > > > and kafka broker is loosing records during this process.
>> > > > > >
>> > > > > > But why is kafka broker loosing records,
>> > > > > >
>> > > > > > I feel this is a BUG in kafka.
>> > > > > >
>> > > > > > [2016-08-17 12:54:50,293] TRACE [Controller 2]: checking need to
>> > > > trigger
>> > > > > > partition rebalance (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: preferred
>> replicas
>> > by
>> > > > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] ->
>> > List(0,
>> > > > 1),
>> > > > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1),
>> [topic1,43]
>> > ->
>> > > > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0,
>> 1),
>> > > > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1),
>> [topic1,41]
>> > ->
>> > > > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0,
>> 1),
>> > > > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1),
>> [topic1,23]
>> > ->
>> > > > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
>> > > List(5,
>> > > > > 3),
>> > > > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3),
>> [topic1,74]
>> > ->
>> > > > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5,
>> 3),
>> > > > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3),
>> [topic1,75]
>> > ->
>> > > > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5,
>> 3),
>> > > > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3),
>> [topic1,72]
>> > ->
>> > > > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5,
>> 3)),
>> > 1
>> > > ->
>> > > > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0),
>> > [topic1,69]
>> > > > ->
>> > > > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1,
>> 0),
>> > > > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0),
>> [topic1,88]
>> > ->
>> > > > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1,
>> 0),
>> > > > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0),
>> [topic1,89]
>> > ->
>> > > > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1,
>> 0),
>> > > > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
>> > > > [topic1,3]
>> > > > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2,
>> 4),
>> > > > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9]
>> ->
>> > > > List(2,
>> > > > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4),
>> > [topic1,11]
>> > > > ->
>> > > > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
>> > > > > [topic1,4]
>> > > > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2,
>> 4),
>> > > > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
>> > > > > [topic1,30]
>> > > > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3,
>> > 5),
>> > > > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5),
>> [topic1,27]
>> > ->
>> > > > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3,
>> 5),
>> > > > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5),
>> [topic1,32]
>> > ->
>> > > > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3,
>> 5),
>> > > > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
>> > > > > Map([topic1,53]
>> > > > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4,
>> > 2),
>> > > > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2),
>> [topic1,58]
>> > ->
>> > > > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4,
>> 2),
>> > > > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2),
>> [topic1,62]
>> > ->
>> > > > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4,
>> 2),
>> > > > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2),
>> [topic1,59]
>> > ->
>> > > > > > List(4, 2))) (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:55:32,783] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:32,894] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,36],
>> > > > > > [topic1,30], [topic1,31], [topic1,86], [topic1,78], [topic1,74],
>> > > > > > [topic1,82], [topic1,33]) (kafka.controller.
>> > > > > IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:32,896] WARN [Controller-2-to-broker-2-
>> > > send-thread],
>> > > > > > Controller 2 epoch 2 fails to send request
>> > > {controller_id=2,controller_
>> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
>> > > > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
>> > > > > > 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
>> > > > > > protocol_type=0}]},{id=2,end_points=[{port=9092,host=b2.
>> > > > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
>> > > > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
>> > > > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
>> > > > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
>> > > > > > protocol_type=0}]}]}
>> > > > > > to broker Node(2, b2.kafka, 9092). Reconnecting to broker.
>> > > > > > (kafka.controller.RequestSendThread)
>> > > > > > java.io.IOException: Connection to 2 was disconnected before the
>> > > > response
>> > > > > > was read
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:87)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:84)
>> > > > > >         at scala.Option.foreach(Option.scala:257)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:84)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:80)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
>> > > > > > NetworkClientBlockingOps.scala:129)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
>> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
>> > > > > > NetworkClientBlockingOps.
>> > > > > > scala:139)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
>> > > > blockingSendAndReceive$
>> > > > > > extension(NetworkClientBlockingOps.scala:80)
>> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
>> > > > > > 1(ControllerChannelManager.scala:180)
>> > > > > >         at kafka.controller.RequestSendThread.doWork(
>> > > > > > ControllerChannelManager.scala:171)
>> > > > > >         at kafka.utils.ShutdownableThread.run(
>> > > > > ShutdownableThread.scala:63)
>> > > > > > [2016-08-17 12:55:32,897] WARN [Controller-2-to-broker-5-
>> > > send-thread],
>> > > > > > Controller 2 epoch 2 fails to send request
>> > > {controller_id=2,controller_
>> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=0,end_
>> > > > > > points=[{port=9092,host=b0.kafka,security_protocol_type=
>> > > > > > 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
>> > > > > > protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
>> > > > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
>> > > > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=2,end_
>> > > > > > points=[{port=9092,host=b2.kafka,security_protocol_type=
>> > > > > > 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_
>> > > > > > protocol_type=0}]}]}
>> > > > > > to broker Node(5, b5.kafka, 9092). Reconnecting to broker.
>> > > > > > (kafka.controller.RequestSendThread)
>> > > > > > java.io.IOException: Connection to 5 was disconnected before the
>> > > > response
>> > > > > > was read
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:87)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:84)
>> > > > > >         at scala.Option.foreach(Option.scala:257)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:84)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:80)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
>> > > > > > NetworkClientBlockingOps.scala:129)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
>> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
>> > > > > > NetworkClientBlockingOps.
>> > > > > > scala:139)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
>> > > > blockingSendAndReceive$
>> > > > > > extension(NetworkClientBlockingOps.scala:80)
>> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
>> > > > > > 1(ControllerChannelManager.scala:180)
>> > > > > >         at kafka.controller.RequestSendThread.doWork(
>> > > > > > ControllerChannelManager.scala:171)
>> > > > > >         at kafka.utils.ShutdownableThread.run(
>> > > > > ShutdownableThread.scala:63)
>> > > > > > [2016-08-17 12:55:32,898] WARN [Controller-2-to-broker-4-
>> > > send-thread],
>> > > > > > Controller 2 epoch 2 fails to send request
>> > > {controller_id=2,controller_
>> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
>> > > > > > points=[{port=9092,host=b3.kafka,security_protocol_type=
>> > > > > > 0}]},{id=1,end_points=[{port=9092,host=b1.kafka,security_
>> > > > > > protocol_type=0}]},{id=4,end_points=[{port=9092,host=b4.
>> > > > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
>> > > > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=0,end_
>> > > > > > points=[{port=9092,host=b0.kafka,security_protocol_type=
>> > > > > > 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
>> > > > > > protocol_type=0}]}]}
>> > > > > > to broker Node(4, b4.kafka, 9092). Reconnecting to broker.
>> > > > > > (kafka.controller.RequestSendThread)
>> > > > > > java.io.IOException: Connection to 4 was disconnected before the
>> > > > response
>> > > > > > was read
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:87)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:84)
>> > > > > >         at scala.Option.foreach(Option.scala:257)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:84)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:80)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
>> > > > > > NetworkClientBlockingOps.scala:129)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
>> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
>> > > > > > NetworkClientBlockingOps.
>> > > > > > scala:139)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
>> > > > blockingSendAndReceive$
>> > > > > > extension(NetworkClientBlockingOps.scala:80)
>> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
>> > > > > > 1(ControllerChannelManager.scala:180)
>> > > > > >         at kafka.controller.RequestSendThread.doWork(
>> > > > > > ControllerChannelManager.scala:171)
>> > > > > >         at kafka.utils.ShutdownableThread.run(
>> > > > > ShutdownableThread.scala:63)
>> > > > > > [2016-08-17 12:55:32,900] WARN [Controller-2-to-broker-1-
>> > > send-thread],
>> > > > > > Controller 2 epoch 2 fails to send request
>> > > {controller_id=2,controller_
>> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
>> > > > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
>> > > > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
>> > > > > > protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
>> > > > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
>> > > > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
>> > > > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
>> > > > > > 0}]},{id=2,end_points=[{port=9092,host=b2.kafka,security_
>> > > > > > protocol_type=0}]}]}
>> > > > > > to broker Node(1, b1.kafka, 9092). Reconnecting to broker.
>> > > > > > (kafka.controller.RequestSendThread)
>> > > > > > java.io.IOException: Connection to 1 was disconnected before the
>> > > > response
>> > > > > > was read
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:87)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:84)
>> > > > > >         at scala.Option.foreach(Option.scala:257)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:84)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:80)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
>> > > > > > NetworkClientBlockingOps.scala:129)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
>> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
>> > > > > > NetworkClientBlockingOps.
>> > > > > > scala:139)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
>> > > > blockingSendAndReceive$
>> > > > > > extension(NetworkClientBlockingOps.scala:80)
>> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
>> > > > > > 1(ControllerChannelManager.scala:180)
>> > > > > >         at kafka.controller.RequestSendThread.doWork(
>> > > > > > ControllerChannelManager.scala:171)
>> > > > > >         at kafka.utils.ShutdownableThread.run(
>> > > > > ShutdownableThread.scala:63)
>> > > > > > [2016-08-17 12:55:32,902] WARN [Controller-2-to-broker-3-
>> > > send-thread],
>> > > > > > Controller 2 epoch 2 fails to send request
>> > > {controller_id=2,controller_
>> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
>> > > > > > points=[{port=9092,host=b3.kafka,security_protocol_type=
>> > > > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
>> > > > > > protocol_type=0}]},{id=5,end_points=[{port=9092,host=b5.
>> > > > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
>> > > > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=1,end_
>> > > > > > points=[{port=9092,host=b1.kafka,security_protocol_type=
>> > > > > > 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_
>> > > > > > protocol_type=0}]}]}
>> > > > > > to broker Node(3, b3.kafka, 9092). Reconnecting to broker.
>> > > > > > (kafka.controller.RequestSendThread)
>> > > > > > java.io.IOException: Connection to 3 was disconnected before the
>> > > > response
>> > > > > > was read
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:87)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:84)
>> > > > > >         at scala.Option.foreach(Option.scala:257)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:84)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:80)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
>> > > > > > NetworkClientBlockingOps.scala:129)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
>> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
>> > > > > > NetworkClientBlockingOps.
>> > > > > > scala:139)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
>> > > > blockingSendAndReceive$
>> > > > > > extension(NetworkClientBlockingOps.scala:80)
>> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
>> > > > > > 1(ControllerChannelManager.scala:180)
>> > > > > >         at kafka.controller.RequestSendThread.doWork(
>> > > > > > ControllerChannelManager.scala:171)
>> > > > > >         at kafka.utils.ShutdownableThread.run(
>> > > > > ShutdownableThread.scala:63)
>> > > > > > [2016-08-17 12:55:32,903] WARN [Controller-2-to-broker-0-
>> > > send-thread],
>> > > > > > Controller 2 epoch 2 fails to send request
>> > > {controller_id=2,controller_
>> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
>> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
>> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
>> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
>> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=4,end_
>> > > > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
>> > > > > > 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
>> > > > > > protocol_type=0}]},{id=1,end_points=[{port=9092,host=b1.
>> > > > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
>> > > > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=5,end_
>> > > > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
>> > > > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
>> > > > > > protocol_type=0}]}]}
>> > > > > > to broker Node(0, b0.kafka, 9092). Reconnecting to broker.
>> > > > > > (kafka.controller.RequestSendThread)
>> > > > > > java.io.IOException: Connection to 0 was disconnected before the
>> > > > response
>> > > > > > was read
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:87)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
>> > > > > > NetworkClientBlockingOps.scala:84)
>> > > > > >         at scala.Option.foreach(Option.scala:257)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:84)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
>> > > > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlocki
>> ngOps.
>> > > > > > scala:80)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
>> > > > > > NetworkClientBlockingOps.scala:129)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
>> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
>> > > > > > NetworkClientBlockingOps.
>> > > > > > scala:139)
>> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
>> > > > blockingSendAndReceive$
>> > > > > > extension(NetworkClientBlockingOps.scala:80)
>> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
>> > > > > > 1(ControllerChannelManager.scala:180)
>> > > > > >         at kafka.controller.RequestSendThread.doWork(
>> > > > > > ControllerChannelManager.scala:171)
>> > > > > >         at kafka.utils.ShutdownableThread.run(
>> > > > > ShutdownableThread.scala:63)
>> > > > > > [2016-08-17 12:55:32,927] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:33,162] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:33,169] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,50])
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:33,194] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:33,198] INFO [Controller-2-to-broker-2-
>> > > send-thread],
>> > > > > > Controller 2 connected to Node(2, b2.kafka, 9092) for sending
>> state
>> > > > > change
>> > > > > > requests (kafka.controller.RequestSendThread)
>> > > > > > [2016-08-17 12:55:33,199] INFO [Controller-2-to-broker-5-
>> > > send-thread],
>> > > > > > Controller 2 connected to Node(5, b5.kafka, 9092) for sending
>> state
>> > > > > change
>> > > > > > requests (kafka.controller.RequestSendThread)
>> > > > > > [2016-08-17 12:55:33,200] INFO [Controller-2-to-broker-4-
>> > > send-thread],
>> > > > > > Controller 2 connected to Node(4, b4.kafka, 9092) for sending
>> state
>> > > > > change
>> > > > > > requests (kafka.controller.RequestSendThread)
>> > > > > > [2016-08-17 12:55:33,202] INFO [Controller-2-to-broker-1-
>> > > send-thread],
>> > > > > > Controller 2 connected to Node(1, b1.kafka, 9092) for sending
>> state
>> > > > > change
>> > > > > > requests (kafka.controller.RequestSendThread)
>> > > > > > [2016-08-17 12:55:33,204] INFO [Controller-2-to-broker-0-
>> > > send-thread],
>> > > > > > Controller 2 connected to Node(0, b0.kafka, 9092) for sending
>> state
>> > > > > change
>> > > > > > requests (kafka.controller.RequestSendThread)
>> > > > > > [2016-08-17 12:55:33,207] INFO [Controller-2-to-broker-3-
>> > > send-thread],
>> > > > > > Controller 2 connected to Node(3, b3.kafka, 9092) for sending
>> state
>> > > > > change
>> > > > > > requests (kafka.controller.RequestSendThread)
>> > > > > > [2016-08-17 12:55:39,981] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:40,018] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,34],
>> > > > > > [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
>> > > > > > [topic1,35], [topic1,33]) (kafka.controller.
>> > > > > IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:40,377] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:40,388] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,86],
>> > > > > > [topic1,78], [topic1,82]) (kafka.controller.
>> > > > > IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:55:40,409] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 12:59:50,293] TRACE [Controller 2]: checking need to
>> > > > trigger
>> > > > > > partition rebalance (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: preferred
>> replicas
>> > by
>> > > > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] ->
>> > List(0,
>> > > > 1),
>> > > > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1),
>> [topic1,43]
>> > ->
>> > > > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0,
>> 1),
>> > > > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1),
>> [topic1,41]
>> > ->
>> > > > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0,
>> 1),
>> > > > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1),
>> [topic1,23]
>> > ->
>> > > > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
>> > > List(5,
>> > > > > 3),
>> > > > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3),
>> [topic1,74]
>> > ->
>> > > > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5,
>> 3),
>> > > > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3),
>> [topic1,75]
>> > ->
>> > > > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5,
>> 3),
>> > > > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3),
>> [topic1,72]
>> > ->
>> > > > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5,
>> 3)),
>> > 1
>> > > ->
>> > > > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0),
>> > [topic1,69]
>> > > > ->
>> > > > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1,
>> 0),
>> > > > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0),
>> [topic1,88]
>> > ->
>> > > > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1,
>> 0),
>> > > > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0),
>> [topic1,89]
>> > ->
>> > > > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1,
>> 0),
>> > > > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
>> > > > [topic1,3]
>> > > > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2,
>> 4),
>> > > > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9]
>> ->
>> > > > List(2,
>> > > > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4),
>> > [topic1,11]
>> > > > ->
>> > > > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
>> > > > > [topic1,4]
>> > > > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2,
>> 4),
>> > > > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
>> > > > > [topic1,30]
>> > > > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3,
>> > 5),
>> > > > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5),
>> [topic1,27]
>> > ->
>> > > > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3,
>> 5),
>> > > > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5),
>> [topic1,32]
>> > ->
>> > > > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3,
>> 5),
>> > > > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
>> > > > > Map([topic1,53]
>> > > > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4,
>> > 2),
>> > > > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2),
>> [topic1,58]
>> > ->
>> > > > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4,
>> 2),
>> > > > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2),
>> [topic1,62]
>> > ->
>> > > > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4,
>> 2),
>> > > > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2),
>> [topic1,59]
>> > ->
>> > > > > > List(4, 2))) (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:00:39,546] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:39,604] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,5])
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:39,649] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:39,888] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:40,071] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,37],
>> > > > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
>> > > > > > [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
>> > > > > > [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:40,103] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:40,261] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:40,283] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,72],
>> > > > > > [topic1,80]) (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:40,296] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:40,656] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:40,662] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,55])
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:40,934] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:47,335] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:47,393] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,37],
>> > > > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
>> > > > > > [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
>> > > > > > [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:47,423] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:47,897] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:47,944] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,5],
>> > > > > > [topic1,3], [topic1,7], [topic1,11], [topic1,2], [topic1,6],
>> > > > [topic1,1],
>> > > > > > [topic1,10], [topic1,14], [topic1,9], [topic1,15])
>> > (kafka.controller.
>> > > > > > IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:00:48,020] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:04:50,293] TRACE [Controller 2]: checking need to
>> > > > trigger
>> > > > > > partition rebalance (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: preferred
>> replicas
>> > by
>> > > > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] ->
>> > List(0,
>> > > > 1),
>> > > > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1),
>> [topic1,43]
>> > ->
>> > > > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0,
>> 1),
>> > > > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1),
>> [topic1,41]
>> > ->
>> > > > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0,
>> 1),
>> > > > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1),
>> [topic1,23]
>> > ->
>> > > > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
>> > > List(5,
>> > > > > 3),
>> > > > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3),
>> [topic1,74]
>> > ->
>> > > > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5,
>> 3),
>> > > > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3),
>> [topic1,75]
>> > ->
>> > > > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5,
>> 3),
>> > > > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3),
>> [topic1,72]
>> > ->
>> > > > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5,
>> 3)),
>> > 1
>> > > ->
>> > > > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0),
>> > [topic1,69]
>> > > > ->
>> > > > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1,
>> 0),
>> > > > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0),
>> [topic1,88]
>> > ->
>> > > > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1,
>> 0),
>> > > > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0),
>> [topic1,89]
>> > ->
>> > > > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1,
>> 0),
>> > > > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
>> > > > [topic1,3]
>> > > > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2,
>> 4),
>> > > > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9]
>> ->
>> > > > List(2,
>> > > > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4),
>> > [topic1,11]
>> > > > ->
>> > > > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
>> > > > > [topic1,4]
>> > > > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2,
>> 4),
>> > > > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
>> > > > > [topic1,30]
>> > > > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3,
>> > 5),
>> > > > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5),
>> [topic1,27]
>> > ->
>> > > > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3,
>> 5),
>> > > > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5),
>> [topic1,32]
>> > ->
>> > > > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3,
>> 5),
>> > > > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
>> > > > > Map([topic1,53]
>> > > > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4,
>> > 2),
>> > > > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2),
>> [topic1,58]
>> > ->
>> > > > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4,
>> 2),
>> > > > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2),
>> [topic1,62]
>> > ->
>> > > > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4,
>> 2),
>> > > > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2),
>> [topic1,59]
>> > ->
>> > > > > > List(4, 2))) (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,295] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:05:34,317] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:34,365] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,80],
>> > > > > > [topic1,40], [topic1,21], [topic1,31], [topic1,84], [topic1,33])
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:34,388] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:36,426] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:36,437] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,92])
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:36,699] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:40,225] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:40,239] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,80],
>> > > > > > [topic1,84]) (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:40,246] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:40,958] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:41,006] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,22],
>> > > > > > [topic1,16], [topic1,20], [topic1,19], [topic1,40], [topic1,21],
>> > > > > > [topic1,18], [topic1,47], [topic1,44], [topic1,45], [topic1,42],
>> > > > > > [topic1,46], [topic1,43], [topic1,23]) (kafka.controller.
>> > > > > > IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:41,067] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:42,517] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:42,622] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,37],
>> > > > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
>> > > > > > [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
>> > > > > > [topic1,26], [topic1,35], [topic1,33], [topic1,28])
>> > > (kafka.controller.
>> > > > > > IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:05:42,690] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:09:50,293] TRACE [Controller 2]: checking need to
>> > > > trigger
>> > > > > > partition rebalance (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: preferred
>> replicas
>> > by
>> > > > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] ->
>> > List(0,
>> > > > 1),
>> > > > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1),
>> [topic1,43]
>> > ->
>> > > > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0,
>> 1),
>> > > > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1),
>> [topic1,41]
>> > ->
>> > > > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0,
>> 1),
>> > > > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1),
>> [topic1,23]
>> > ->
>> > > > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
>> > > List(5,
>> > > > > 3),
>> > > > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3),
>> [topic1,74]
>> > ->
>> > > > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5,
>> 3),
>> > > > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3),
>> [topic1,75]
>> > ->
>> > > > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5,
>> 3),
>> > > > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3),
>> [topic1,72]
>> > ->
>> > > > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5,
>> 3)),
>> > 1
>> > > ->
>> > > > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0),
>> > [topic1,69]
>> > > > ->
>> > > > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1,
>> 0),
>> > > > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0),
>> [topic1,88]
>> > ->
>> > > > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1,
>> 0),
>> > > > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0),
>> [topic1,89]
>> > ->
>> > > > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1,
>> 0),
>> > > > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
>> > > > [topic1,3]
>> > > > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2,
>> 4),
>> > > > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9]
>> ->
>> > > > List(2,
>> > > > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4),
>> > [topic1,11]
>> > > > ->
>> > > > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
>> > > > > [topic1,4]
>> > > > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2,
>> 4),
>> > > > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
>> > > > > [topic1,30]
>> > > > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3,
>> > 5),
>> > > > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5),
>> [topic1,27]
>> > ->
>> > > > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3,
>> 5),
>> > > > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5),
>> [topic1,32]
>> > ->
>> > > > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3,
>> 5),
>> > > > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
>> > > > > Map([topic1,53]
>> > > > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4,
>> > 2),
>> > > > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2),
>> [topic1,58]
>> > ->
>> > > > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4,
>> 2),
>> > > > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2),
>> [topic1,62]
>> > ->
>> > > > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4,
>> 2),
>> > > > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2),
>> [topic1,59]
>> > ->
>> > > > > > List(4, 2))) (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,295] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
>> > > preferred
>> > > > > > replica Map() (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:09:50,297] TRACE [Controller 2]: leader imbalance
>> > > ratio
>> > > > > for
>> > > > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
>> > > > > > [2016-08-17 13:10:37,278] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:10:37,292] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,67],
>> > > > > > [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:10:37,304] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:10:43,375] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:10:43,383] DEBUG Sending MetadataRequest to
>> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
>> TopicAndPartitions:Set([
>> > > > > > topic1,67],
>> > > > > > [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
>> > > > > > [2016-08-17 13:10:43,394] DEBUG [IsrChangeNotificationListener]
>> > > > Fired!!!
>> > > > > > (kafka.controller.IsrChangeNotificationListener)
>> > > > > >
>> > > > > > Thanks
>> > > > > >
>> > > > > > Regards,
>> > > > > > Mazhar Shaikh.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Aug 17, 2016 at 9:50 PM, Jun Rao <j...@confluent.io>
>> wrote:
>> > > > > >
>> > > > > > > Yes, you can try setting it to -1 in 0.8.1, which is the
>> > equivalent
>> > > > of
>> > > > > > > "all" in 0.9 and above.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > Jun
>> > > > > > >
>> > > > > > > On Wed, Aug 17, 2016 at 8:32 AM, Mazhar Shaikh <
>> > > > > > mazhar.shaikh...@gmail.com
>> > > > > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Jun,
>> > > > > > > >
>> > > > > > > > I'm using default configuration (ack=1),
>> > > > > > > > changing it t0 all or 2 will not help, as the producer queue
>> > will
>> > > > be
>> > > > > > > > exhausted is any kafka broker goes down for long time.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Thanks.
>> > > > > > > >
>> > > > > > > > Regards,
>> > > > > > > > Mazhar Shaikh.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, Aug 17, 2016 at 8:11 PM, Jun Rao <j...@confluent.io>
>> > > wrote:
>> > > > > > > >
>> > > > > > > > > Are you using acks=1 or acks=all in the producer? Only the
>> > > latter
>> > > > > > > > > guarantees acked messages won't be lost after leader
>> failure.
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Jun
>> > > > > > > > >
>> > > > > > > > > On Wed, Aug 10, 2016 at 11:41 PM, Mazhar Shaikh <
>> > > > > > > > > mazhar.shaikh...@gmail.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Kafka Team,
>> > > > > > > > > >
>> > > > > > > > > > I'm using kafka (kafka_2.11-0.9.0.1) with librdkafka
>> > (0.8.1)
>> > > > API
>> > > > > > for
>> > > > > > > > > > producer
>> > > > > > > > > > During a run of 2hrs, I notice the total number of
>> messaged
>> > > > ack'd
>> > > > > > by
>> > > > > > > > > > librdkafka delivery report is greater than the maxoffset
>> > of a
>> > > > > > > partition
>> > > > > > > > > in
>> > > > > > > > > > kafka broker.
>> > > > > > > > > > I'm running kafka broker with replication factor of 2.
>> > > > > > > > > >
>> > > > > > > > > > Here, message has been lost between librdkafka - kafka
>> > > broker.
>> > > > > > > > > >
>> > > > > > > > > > As librdkafka is providing success delivery report for
>> all
>> > > the
>> > > > > > > > messages.
>> > > > > > > > > >
>> > > > > > > > > > Looks like kafka broker is dropping the messages after
>> > > > > > acknowledging
>> > > > > > > > > > librdkafka.
>> > > > > > > > > >
>> > > > > > > > > > Requesting you help in solving this issue.
>> > > > > > > > > >
>> > > > > > > > > > Thank you.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Regards
>> > > > > > > > > > Mazhar Shaikh
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to