Hi Jun,

Yes, the data is lost during leader broker failure.
But the leader broker failed due to zookeeper session expiry.
GC logs doesn't show any error/warns during this period.

Its not easy reproduce. during long run (>12hrs) with 30k msg/sec load
balanced across 96 partitions, some time in between this failure is noticed
(once/twice).

looks like this issue is similar to "
https://issues.apache.org/jira/browse/KAFKA-1211";.

But here the leader broker could have synced/committed all the the existing
data to replica before the replica is elected as leader.

below are few log around this time.

broker b2 was controller.

b2:
Server.log:
[2016-08-26 16:15:49,701] INFO re-registering broker info in ZK for broker
1 (kafka.server.KafkaHealthcheck)
[2016-08-26 16:15:49,738] INFO Registered broker 1 at path /brokers/ids/1
with address b2.broker.com:9092. (kafka.utils.ZkUtils$)
[2016-08-26 16:15:49,739] INFO done re-registering broker
(kafka.server.KafkaHealthcheck)
[2016-08-26 16:15:49,740] INFO Subscribing to /brokers/topics path to watch
for new topics (kafka.server.KafkaHealthcheck)
[2016-08-26 16:15:50,055] INFO New leader is 0
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-08-26 16:15:50,538] WARN [KafkaApi-1] Produce request with
correlation id 422786 from client rdkafka on partition [topic,92] failed
due to Leader not local for partition [topic,92] on broker 1
(kafka.server.KafkaApis)
[2016-08-26 16:15:50,544] INFO Truncating log topic-92 to offset 1746617.
(kafka.log.Log)
[2016-08-26 16:15:50,562] WARN [KafkaApi-1] Produce request with
correlation id 422793 from client rdkafka on partition [topic,92] failed
due to Leader not local for partition [topic,92] on broker 1
(kafka.server.KafkaApis)
[2016-08-26 16:15:50,578] WARN [KafkaApi-1] Produce request with
correlation id 422897 from client rdkafka on partition [topic,92] failed
due to Leader not local for partition [topic,92] on broker 1
(kafka.server.KafkaApis)
[2016-08-26 16:15:50,719] ERROR Closing socket for /169.254.2.116 because
of error (kafka.network.Processor)
kafka.common.KafkaException: Size of FileMessageSet
/data/kafka/broker-b2/topic-66/00000000000000000000.log has been truncated
during write: old size 1048576, new size 0
        at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
        at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
        at kafka.network.MultiSend.writeTo(Transmission.scala:101)
        at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
        at kafka.network.MultiSend.writeTo(Transmission.scala:101)
        at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
        at kafka.network.Processor.write(SocketServer.scala:472)
        at kafka.network.Processor.run(SocketServer.scala:342)
        at java.lang.Thread.run(Thread.java:744)
[2016-08-26 16:15:50,729] ERROR Closing socket for /169.254.2.116 because
of error (kafka.network.Processor)
kafka.common.KafkaException: Size of FileMessageSet
/data/kafka/broker-b2/topic-68/00000000000000000000.log has been truncated
during write: old size 1048576, new size 0
        at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
        at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
        at kafka.network.MultiSend.writeTo(Transmission.scala:101)
        at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
        at kafka.network.MultiSend.writeTo(Transmission.scala:101)
        at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
        at kafka.network.Processor.write(SocketServer.scala:472)
        at kafka.network.Processor.run(SocketServer.scala:342)
        at java.lang.Thread.run(Thread.java:744)


kafkaServer-gc.log:
2016-08-26T16:14:47.123+0530: 6123.763: [GC2016-08-26T16:14:47.123+0530:
6123.763: [ParNew: 285567K->5992K(314560K), 0.0115200 secs]
648907K->369981K(1013632K), 0.0116800 secs] [Times: user=0.19 sys=0.00,
real=0.01 secs]
2016-08-26T16:14:56.327+0530: 6132.967: [GC2016-08-26T16:14:56.327+0530:
6132.967: [ParNew: 285608K->5950K(314560K), 0.0105600 secs]
649597K->370626K(1013632K), 0.0107550 secs] [Times: user=0.15 sys=0.01,
real=0.01 secs]
2016-08-26T16:15:06.615+0530: 6143.255: [GC2016-08-26T16:15:06.615+0530:
6143.255: [ParNew: 285566K->6529K(314560K), 0.0214330 secs]
650242K->371864K(1013632K), 0.0216380 secs] [Times: user=0.30 sys=0.03,
real=0.02 secs]
2016-08-26T16:15:17.255+0530: 6153.895: [GC2016-08-26T16:15:17.255+0530:
6153.895: [ParNew: 286145K->6413K(314560K), 0.0248930 secs]
651480K->372390K(1013632K), 0.0251060 secs] [Times: user=0.32 sys=0.09,
real=0.03 secs]
2016-08-26T16:15:36.892+0530: 6173.533: [GC2016-08-26T16:15:36.892+0530:
6173.533: [ParNew: 286029K->5935K(314560K), 0.0083220 secs]
652006K->372627K(1013632K), 0.0085410 secs] [Times: user=0.11 sys=0.02,
real=0.01 secs]
2016-08-26T16:15:50.113+0530: 6186.753: [GC2016-08-26T16:15:50.113+0530:
6186.753: [ParNew: 285551K->15693K(314560K), 0.0139890 secs]
652243K->383039K(1013632K), 0.0142240 secs] [Times: user=0.26 sys=0.00,
real=0.01 secs]
2016-08-26T16:15:56.403+0530: 6193.044: [GC2016-08-26T16:15:56.403+0530:
6193.044: [ParNew: 295309K->4838K(314560K), 0.0147950 secs]
662655K->372758K(1013632K), 0.0149970 secs] [Times: user=0.21 sys=0.02,
real=0.01 secs]
2016-08-26T16:16:06.321+0530: 6202.961: [GC2016-08-26T16:16:06.321+0530:
6202.962: [ParNew: 284454K->5116K(314560K), 0.0129300 secs]
652374K->373794K(1013632K), 0.0131210 secs] [Times: user=0.23 sys=0.00,
real=0.01 secs]
2016-08-26T16:16:18.586+0530: 6215.226: [GC2016-08-26T16:16:18.586+0530:
6215.226: [ParNew: 284732K->4909K(314560K), 0.0115050 secs]
653410K->374210K(1013632K), 0.0116810 secs] [Times: user=0.18 sys=0.01,
real=0.01 secs]


controller.log:
[2016-08-26 16:15:41,076] INFO [SessionExpirationListener on 1], ZK
expired; shut down all controller components and try to re-elect
(kafka.controller.KafkaController$SessionExpirationListener)

zookeeper.log:
[2016-08-26 16:15:36,001] INFO Expiring session 0x156c616475a0000, timeout
of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2016-08-26 16:15:36,001] INFO Processed session termination for sessionid:
0x156c616475a0000 (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-08-26 16:15:36,901] INFO Client session timed out, have not heard
from server in 9582ms for sessionid 0x156c616475a0000, closing socket
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2016-08-26 16:15:36,902] INFO Closing socket connection to /169.254.2.216.
(kafka.network.Processor)
[2016-08-26 16:15:36,960] ERROR Unexpected Exception:
(org.apache.zookeeper.server.NIOServerCnxn)
java.nio.channels.CancelledKeyException
        at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
        at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
        at
org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:151)
        at
org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1081)
        at
org.apache.zookeeper.server.NIOServerCnxn.process(NIOServerCnxn.java:1118)
        at
org.apache.zookeeper.server.WatchManager.triggerWatch(WatchManager.java:120)
        at
org.apache.zookeeper.server.WatchManager.triggerWatch(WatchManager.java:92)
        at
org.apache.zookeeper.server.DataTree.deleteNode(DataTree.java:594)
        at
org.apache.zookeeper.server.DataTree.killSession(DataTree.java:966)
        at
org.apache.zookeeper.server.DataTree.processTxn(DataTree.java:818)
        at
org.apache.zookeeper.server.ZKDatabase.processTxn(ZKDatabase.java:329)
        at
org.apache.zookeeper.server.ZooKeeperServer.processTxn(ZooKeeperServer.java:994)
        at
org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:116)
        at
org.apache.zookeeper.server.quorum.Leader$ToBeAppliedRequestProcessor.processRequest(Leader.java:644)
        at
org.apache.zookeeper.server.quorum.CommitProcessor.run(CommitProcessor.java:74)

another zookeeper.
[2016-08-26 16:15:41,501] WARN caught end of stream exception
(org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client sessionid
0x356c616954f0000, likely client has closed socket
        at
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
        at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
        at java.lang.Thread.run(Thread.java:744)
[2016-08-26 16:15:41,502] INFO Closed socket connection for client /
169.254.2.28:53561 which had sessionid 0x356c616954f0000
(org.apache.zookeeper.server.NIOServerCnxn)
[2016-08-26 16:15:41,503] WARN caught end of stream exception
(org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client sessionid
0x356c616954f0001, likely client has closed socket
        at
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
        at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
        at java.lang.Thread.run(Thread.java:744)
[2016-08-26 16:15:41,503] INFO Closed socket connection for client /
169.254.2.27:49090 which had sessionid 0x356c616954f0001
(org.apache.zookeeper.server.NIOServerCnxn)
[2016-08-26 16:15:41,504] WARN caught end of stream exception
(org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client sessionid
0x256c616476a0000, likely client has closed socket
        at
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
        at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
        at java.lang.Thread.run(Thread.java:744)
[2016-08-26 16:15:41,504] INFO Closed socket connection for client /
169.254.2.18:39069 which had sessionid 0x256c616476a0000
(org.apache.zookeeper.server.NIOServerCnxn)
[2016-08-26 16:15:41,602] INFO Closing socket connection to /169.254.2.216.
(kafka.network.Processor)
[2016-08-26 16:15:41,607] INFO Closing socket connection to /169.254.2.216.
(kafka.network.Processor)
[2016-08-26 16:15:42,090] INFO Closing socket connection to /169.254.2.216.
(kafka.network.Processor)
[2016-08-26 16:15:42,093] INFO Closing socket connection to /169.254.2.216.
(kafka.network.Processor)
[2016-08-26 16:15:42,461] INFO Client session timed out, have not heard
from server in 4002ms for sessionid 0x156c616475a0001, closing socket
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2016-08-26 16:15:42,461] WARN caught end of stream exception
(org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client sessionid
0x156c616475a0001, likely client has closed socket
        at
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
        at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
        at java.lang.Thread.run(Thread.java:744)


Thank you.

Regards,
Mazhar Shaikh


On Fri, Aug 19, 2016 at 8:05 PM, Jun Rao <j...@confluent.io> wrote:

> Mazhar,
>
> Let's first confirm if this is indeed a bug. As I mentioned earlier, it's
> possible to have message loss with ack=1 when there are (leader) broker
> failures. If this is not the case, please file a jira and describe how to
> reproduce the problem. Also, it would be useful to know if the message loss
> happens with the java producer too. This will help isolate whether this is
> a server side or a client side issue.
>
> Thanks,
>
> Jun
>
> On Fri, Aug 19, 2016 at 2:15 AM, Mazhar Shaikh <mazhar.shaikh...@gmail.com
> >
> wrote:
>
> > Hi Jun,
> >
> > In my earlier runs, I had enabled delivery report (with and without
> offset
> > report) facility provided by librdkafka.
> >
> > The producer has received successful delivery report for the all msg sent
> > even than the messages where lost.
> >
> > as you mentioned. producer has nothing to do with this loss of messages.
> >
> > I just want to know, as when can we get the fix for this bug ?
> >
> > Thanks.
> >
> > Regards,
> > Mazhar Shaikh
> >
> >
> > On Fri, Aug 19, 2016 at 1:24 AM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Mazhar,
> > >
> > > With ack=1, whether you lose messages or not is not deterministic. It
> > > depends on the time when the broker receives/acks a message, the
> follower
> > > fetches the data and the broker fails. So, it's possible that you got
> > lucky
> > > in one version and unlucky in another.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Aug 18, 2016 at 12:11 PM, Mazhar Shaikh <
> > > mazhar.shaikh...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for clarification, I'll give a try with ack=-1 (in producer).
> > > >
> > > > However, i did a fallback to older version of kafka
> > > (*kafka_2.10-0.8.2.1*),
> > > > and i don't see this issue (loss of messages).
> > > >
> > > > looks like kafka_2.11-0.9.0.1 has issues(BUG) during replication.
> > > >
> > > > Thanks,
> > > >
> > > > Regards,
> > > > Mazhar Shaikh.
> > > >
> > > >
> > > >
> > > > On Thu, Aug 18, 2016 at 10:30 PM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Mazhar,
> > > > >
> > > > > There is probably a mis-understanding. Ack=-1 (or all) doesn't mean
> > > > waiting
> > > > > for all replicas. It means waiting for all replicas that are in
> sync.
> > > So,
> > > > > if a replica is down, it will be removed from the in-sync replicas,
> > > which
> > > > > allows the producer to continue with fewer replicas.
> > > > >
> > > > > For the connection issue that you saw in the log, this could happen
> > > when
> > > > a
> > > > > connection is idle for some time. It won't break the replication
> > logic
> > > > > since a new connection will be created automatically. You can
> > increase
> > > > the
> > > > > socket idle time on the broker if you want to turn off this
> behavior.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, Aug 18, 2016 at 12:07 AM, Mazhar Shaikh <
> > > > > mazhar.shaikh...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Setting to -1, may solve this issue.
> > > > > > But it will cause producer buffer full in load test resulting to
> > > > failures
> > > > > > and drop of messages from client(producer side)
> > > > > > Hence, this will not actually solve the problem.
> > > > > >
> > > > > > I need to fix this from kafka broker side, so that there is no
> > impact
> > > > on
> > > > > > producer or consumer.
> > > > > >
> > > > > > From the logs looks like there is connection problem during
> between
> > > > > brokers
> > > > > > and kafka broker is loosing records during this process.
> > > > > >
> > > > > > But why is kafka broker loosing records,
> > > > > >
> > > > > > I feel this is a BUG in kafka.
> > > > > >
> > > > > > [2016-08-17 12:54:50,293] TRACE [Controller 2]: checking need to
> > > > trigger
> > > > > > partition rebalance (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: preferred
> replicas
> > by
> > > > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] ->
> > List(0,
> > > > 1),
> > > > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43]
> > ->
> > > > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> > > > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41]
> > ->
> > > > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> > > > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23]
> > ->
> > > > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
> > > List(5,
> > > > > 3),
> > > > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74]
> > ->
> > > > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> > > > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75]
> > ->
> > > > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> > > > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72]
> > ->
> > > > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5,
> 3)),
> > 1
> > > ->
> > > > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0),
> > [topic1,69]
> > > > ->
> > > > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> > > > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88]
> > ->
> > > > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> > > > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89]
> > ->
> > > > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> > > > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
> > > > [topic1,3]
> > > > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2,
> 4),
> > > > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] ->
> > > > List(2,
> > > > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4),
> > [topic1,11]
> > > > ->
> > > > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
> > > > > [topic1,4]
> > > > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2,
> 4),
> > > > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
> > > > > [topic1,30]
> > > > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3,
> > 5),
> > > > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27]
> > ->
> > > > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> > > > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32]
> > ->
> > > > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> > > > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
> > > > > Map([topic1,53]
> > > > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4,
> > 2),
> > > > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58]
> > ->
> > > > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> > > > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62]
> > ->
> > > > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> > > > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59]
> > ->
> > > > > > List(4, 2))) (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:55:32,783] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:32,894] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,36],
> > > > > > [topic1,30], [topic1,31], [topic1,86], [topic1,78], [topic1,74],
> > > > > > [topic1,82], [topic1,33]) (kafka.controller.
> > > > > IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:32,896] WARN [Controller-2-to-broker-2-
> > > send-thread],
> > > > > > Controller 2 epoch 2 fails to send request
> > > {controller_id=2,controller_
> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
> > > > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
> > > > > > 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
> > > > > > protocol_type=0}]},{id=2,end_points=[{port=9092,host=b2.
> > > > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> > > > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
> > > > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
> > > > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> > > > > > protocol_type=0}]}]}
> > > > > > to broker Node(2, b2.kafka, 9092). Reconnecting to broker.
> > > > > > (kafka.controller.RequestSendThread)
> > > > > > java.io.IOException: Connection to 2 was disconnected before the
> > > > response
> > > > > > was read
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:87)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:84)
> > > > > >         at scala.Option.foreach(Option.scala:257)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:84)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:80)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > > > NetworkClientBlockingOps.scala:129)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > > > NetworkClientBlockingOps.
> > > > > > scala:139)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
> > > > blockingSendAndReceive$
> > > > > > extension(NetworkClientBlockingOps.scala:80)
> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > > > 1(ControllerChannelManager.scala:180)
> > > > > >         at kafka.controller.RequestSendThread.doWork(
> > > > > > ControllerChannelManager.scala:171)
> > > > > >         at kafka.utils.ShutdownableThread.run(
> > > > > ShutdownableThread.scala:63)
> > > > > > [2016-08-17 12:55:32,897] WARN [Controller-2-to-broker-5-
> > > send-thread],
> > > > > > Controller 2 epoch 2 fails to send request
> > > {controller_id=2,controller_
> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=0,end_
> > > > > > points=[{port=9092,host=b0.kafka,security_protocol_type=
> > > > > > 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
> > > > > > protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
> > > > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> > > > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=2,end_
> > > > > > points=[{port=9092,host=b2.kafka,security_protocol_type=
> > > > > > 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_
> > > > > > protocol_type=0}]}]}
> > > > > > to broker Node(5, b5.kafka, 9092). Reconnecting to broker.
> > > > > > (kafka.controller.RequestSendThread)
> > > > > > java.io.IOException: Connection to 5 was disconnected before the
> > > > response
> > > > > > was read
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:87)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:84)
> > > > > >         at scala.Option.foreach(Option.scala:257)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:84)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:80)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > > > NetworkClientBlockingOps.scala:129)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > > > NetworkClientBlockingOps.
> > > > > > scala:139)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
> > > > blockingSendAndReceive$
> > > > > > extension(NetworkClientBlockingOps.scala:80)
> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > > > 1(ControllerChannelManager.scala:180)
> > > > > >         at kafka.controller.RequestSendThread.doWork(
> > > > > > ControllerChannelManager.scala:171)
> > > > > >         at kafka.utils.ShutdownableThread.run(
> > > > > ShutdownableThread.scala:63)
> > > > > > [2016-08-17 12:55:32,898] WARN [Controller-2-to-broker-4-
> > > send-thread],
> > > > > > Controller 2 epoch 2 fails to send request
> > > {controller_id=2,controller_
> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
> > > > > > points=[{port=9092,host=b3.kafka,security_protocol_type=
> > > > > > 0}]},{id=1,end_points=[{port=9092,host=b1.kafka,security_
> > > > > > protocol_type=0}]},{id=4,end_points=[{port=9092,host=b4.
> > > > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> > > > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=0,end_
> > > > > > points=[{port=9092,host=b0.kafka,security_protocol_type=
> > > > > > 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
> > > > > > protocol_type=0}]}]}
> > > > > > to broker Node(4, b4.kafka, 9092). Reconnecting to broker.
> > > > > > (kafka.controller.RequestSendThread)
> > > > > > java.io.IOException: Connection to 4 was disconnected before the
> > > > response
> > > > > > was read
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:87)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:84)
> > > > > >         at scala.Option.foreach(Option.scala:257)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:84)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:80)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > > > NetworkClientBlockingOps.scala:129)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > > > NetworkClientBlockingOps.
> > > > > > scala:139)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
> > > > blockingSendAndReceive$
> > > > > > extension(NetworkClientBlockingOps.scala:80)
> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > > > 1(ControllerChannelManager.scala:180)
> > > > > >         at kafka.controller.RequestSendThread.doWork(
> > > > > > ControllerChannelManager.scala:171)
> > > > > >         at kafka.utils.ShutdownableThread.run(
> > > > > ShutdownableThread.scala:63)
> > > > > > [2016-08-17 12:55:32,900] WARN [Controller-2-to-broker-1-
> > > send-thread],
> > > > > > Controller 2 epoch 2 fails to send request
> > > {controller_id=2,controller_
> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
> > > > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
> > > > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> > > > > > protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
> > > > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> > > > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
> > > > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
> > > > > > 0}]},{id=2,end_points=[{port=9092,host=b2.kafka,security_
> > > > > > protocol_type=0}]}]}
> > > > > > to broker Node(1, b1.kafka, 9092). Reconnecting to broker.
> > > > > > (kafka.controller.RequestSendThread)
> > > > > > java.io.IOException: Connection to 1 was disconnected before the
> > > > response
> > > > > > was read
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:87)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:84)
> > > > > >         at scala.Option.foreach(Option.scala:257)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:84)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:80)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > > > NetworkClientBlockingOps.scala:129)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > > > NetworkClientBlockingOps.
> > > > > > scala:139)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
> > > > blockingSendAndReceive$
> > > > > > extension(NetworkClientBlockingOps.scala:80)
> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > > > 1(ControllerChannelManager.scala:180)
> > > > > >         at kafka.controller.RequestSendThread.doWork(
> > > > > > ControllerChannelManager.scala:171)
> > > > > >         at kafka.utils.ShutdownableThread.run(
> > > > > ShutdownableThread.scala:63)
> > > > > > [2016-08-17 12:55:32,902] WARN [Controller-2-to-broker-3-
> > > send-thread],
> > > > > > Controller 2 epoch 2 fails to send request
> > > {controller_id=2,controller_
> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
> > > > > > points=[{port=9092,host=b3.kafka,security_protocol_type=
> > > > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> > > > > > protocol_type=0}]},{id=5,end_points=[{port=9092,host=b5.
> > > > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> > > > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=1,end_
> > > > > > points=[{port=9092,host=b1.kafka,security_protocol_type=
> > > > > > 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_
> > > > > > protocol_type=0}]}]}
> > > > > > to broker Node(3, b3.kafka, 9092). Reconnecting to broker.
> > > > > > (kafka.controller.RequestSendThread)
> > > > > > java.io.IOException: Connection to 3 was disconnected before the
> > > > response
> > > > > > was read
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:87)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:84)
> > > > > >         at scala.Option.foreach(Option.scala:257)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:84)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:80)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > > > NetworkClientBlockingOps.scala:129)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > > > NetworkClientBlockingOps.
> > > > > > scala:139)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
> > > > blockingSendAndReceive$
> > > > > > extension(NetworkClientBlockingOps.scala:80)
> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > > > 1(ControllerChannelManager.scala:180)
> > > > > >         at kafka.controller.RequestSendThread.doWork(
> > > > > > ControllerChannelManager.scala:171)
> > > > > >         at kafka.utils.ShutdownableThread.run(
> > > > > ShutdownableThread.scala:63)
> > > > > > [2016-08-17 12:55:32,903] WARN [Controller-2-to-broker-0-
> > > send-thread],
> > > > > > Controller 2 epoch 2 fails to send request
> > > {controller_id=2,controller_
> > > > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > > > version=18,replicas=[3,5]}],live_brokers=[{id=4,end_
> > > > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
> > > > > > 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
> > > > > > protocol_type=0}]},{id=1,end_points=[{port=9092,host=b1.
> > > > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> > > > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=5,end_
> > > > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
> > > > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> > > > > > protocol_type=0}]}]}
> > > > > > to broker Node(0, b0.kafka, 9092). Reconnecting to broker.
> > > > > > (kafka.controller.RequestSendThread)
> > > > > > java.io.IOException: Connection to 0 was disconnected before the
> > > > response
> > > > > > was read
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:87)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > > > NetworkClientBlockingOps.scala:84)
> > > > > >         at scala.Option.foreach(Option.scala:257)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:84)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > > > blockingSendAndReceive$extension$1.apply(
> NetworkClientBlockingOps.
> > > > > > scala:80)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > > > NetworkClientBlockingOps.scala:129)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > > > NetworkClientBlockingOps.
> > > > > > scala:139)
> > > > > >         at kafka.utils.NetworkClientBlockingOps$.
> > > > blockingSendAndReceive$
> > > > > > extension(NetworkClientBlockingOps.scala:80)
> > > > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > > > 1(ControllerChannelManager.scala:180)
> > > > > >         at kafka.controller.RequestSendThread.doWork(
> > > > > > ControllerChannelManager.scala:171)
> > > > > >         at kafka.utils.ShutdownableThread.run(
> > > > > ShutdownableThread.scala:63)
> > > > > > [2016-08-17 12:55:32,927] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:33,162] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:33,169] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,50])
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:33,194] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:33,198] INFO [Controller-2-to-broker-2-
> > > send-thread],
> > > > > > Controller 2 connected to Node(2, b2.kafka, 9092) for sending
> state
> > > > > change
> > > > > > requests (kafka.controller.RequestSendThread)
> > > > > > [2016-08-17 12:55:33,199] INFO [Controller-2-to-broker-5-
> > > send-thread],
> > > > > > Controller 2 connected to Node(5, b5.kafka, 9092) for sending
> state
> > > > > change
> > > > > > requests (kafka.controller.RequestSendThread)
> > > > > > [2016-08-17 12:55:33,200] INFO [Controller-2-to-broker-4-
> > > send-thread],
> > > > > > Controller 2 connected to Node(4, b4.kafka, 9092) for sending
> state
> > > > > change
> > > > > > requests (kafka.controller.RequestSendThread)
> > > > > > [2016-08-17 12:55:33,202] INFO [Controller-2-to-broker-1-
> > > send-thread],
> > > > > > Controller 2 connected to Node(1, b1.kafka, 9092) for sending
> state
> > > > > change
> > > > > > requests (kafka.controller.RequestSendThread)
> > > > > > [2016-08-17 12:55:33,204] INFO [Controller-2-to-broker-0-
> > > send-thread],
> > > > > > Controller 2 connected to Node(0, b0.kafka, 9092) for sending
> state
> > > > > change
> > > > > > requests (kafka.controller.RequestSendThread)
> > > > > > [2016-08-17 12:55:33,207] INFO [Controller-2-to-broker-3-
> > > send-thread],
> > > > > > Controller 2 connected to Node(3, b3.kafka, 9092) for sending
> state
> > > > > change
> > > > > > requests (kafka.controller.RequestSendThread)
> > > > > > [2016-08-17 12:55:39,981] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:40,018] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,34],
> > > > > > [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
> > > > > > [topic1,35], [topic1,33]) (kafka.controller.
> > > > > IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:40,377] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:40,388] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,86],
> > > > > > [topic1,78], [topic1,82]) (kafka.controller.
> > > > > IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:55:40,409] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 12:59:50,293] TRACE [Controller 2]: checking need to
> > > > trigger
> > > > > > partition rebalance (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: preferred
> replicas
> > by
> > > > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] ->
> > List(0,
> > > > 1),
> > > > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43]
> > ->
> > > > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> > > > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41]
> > ->
> > > > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> > > > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23]
> > ->
> > > > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
> > > List(5,
> > > > > 3),
> > > > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74]
> > ->
> > > > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> > > > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75]
> > ->
> > > > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> > > > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72]
> > ->
> > > > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5,
> 3)),
> > 1
> > > ->
> > > > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0),
> > [topic1,69]
> > > > ->
> > > > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> > > > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88]
> > ->
> > > > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> > > > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89]
> > ->
> > > > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> > > > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
> > > > [topic1,3]
> > > > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2,
> 4),
> > > > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] ->
> > > > List(2,
> > > > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4),
> > [topic1,11]
> > > > ->
> > > > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
> > > > > [topic1,4]
> > > > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2,
> 4),
> > > > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
> > > > > [topic1,30]
> > > > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3,
> > 5),
> > > > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27]
> > ->
> > > > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> > > > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32]
> > ->
> > > > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> > > > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
> > > > > Map([topic1,53]
> > > > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4,
> > 2),
> > > > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58]
> > ->
> > > > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> > > > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62]
> > ->
> > > > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> > > > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59]
> > ->
> > > > > > List(4, 2))) (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:00:39,546] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:39,604] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,5])
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:39,649] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:39,888] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:40,071] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,37],
> > > > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> > > > > > [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
> > > > > > [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:40,103] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:40,261] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:40,283] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,72],
> > > > > > [topic1,80]) (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:40,296] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:40,656] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:40,662] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,55])
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:40,934] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:47,335] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:47,393] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,37],
> > > > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> > > > > > [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
> > > > > > [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:47,423] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:47,897] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:47,944] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,5],
> > > > > > [topic1,3], [topic1,7], [topic1,11], [topic1,2], [topic1,6],
> > > > [topic1,1],
> > > > > > [topic1,10], [topic1,14], [topic1,9], [topic1,15])
> > (kafka.controller.
> > > > > > IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:00:48,020] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:04:50,293] TRACE [Controller 2]: checking need to
> > > > trigger
> > > > > > partition rebalance (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: preferred
> replicas
> > by
> > > > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] ->
> > List(0,
> > > > 1),
> > > > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43]
> > ->
> > > > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> > > > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41]
> > ->
> > > > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> > > > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23]
> > ->
> > > > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
> > > List(5,
> > > > > 3),
> > > > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74]
> > ->
> > > > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> > > > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75]
> > ->
> > > > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> > > > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72]
> > ->
> > > > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5,
> 3)),
> > 1
> > > ->
> > > > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0),
> > [topic1,69]
> > > > ->
> > > > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> > > > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88]
> > ->
> > > > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> > > > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89]
> > ->
> > > > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> > > > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
> > > > [topic1,3]
> > > > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2,
> 4),
> > > > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] ->
> > > > List(2,
> > > > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4),
> > [topic1,11]
> > > > ->
> > > > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
> > > > > [topic1,4]
> > > > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2,
> 4),
> > > > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
> > > > > [topic1,30]
> > > > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3,
> > 5),
> > > > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27]
> > ->
> > > > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> > > > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32]
> > ->
> > > > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> > > > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
> > > > > Map([topic1,53]
> > > > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4,
> > 2),
> > > > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58]
> > ->
> > > > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> > > > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62]
> > ->
> > > > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> > > > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59]
> > ->
> > > > > > List(4, 2))) (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,295] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:05:34,317] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:34,365] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,80],
> > > > > > [topic1,40], [topic1,21], [topic1,31], [topic1,84], [topic1,33])
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:34,388] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:36,426] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:36,437] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,92])
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:36,699] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:40,225] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:40,239] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,80],
> > > > > > [topic1,84]) (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:40,246] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:40,958] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:41,006] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,22],
> > > > > > [topic1,16], [topic1,20], [topic1,19], [topic1,40], [topic1,21],
> > > > > > [topic1,18], [topic1,47], [topic1,44], [topic1,45], [topic1,42],
> > > > > > [topic1,46], [topic1,43], [topic1,23]) (kafka.controller.
> > > > > > IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:41,067] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:42,517] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:42,622] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,37],
> > > > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> > > > > > [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
> > > > > > [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> > > (kafka.controller.
> > > > > > IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:05:42,690] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:09:50,293] TRACE [Controller 2]: checking need to
> > > > trigger
> > > > > > partition rebalance (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: preferred
> replicas
> > by
> > > > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] ->
> > List(0,
> > > > 1),
> > > > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43]
> > ->
> > > > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> > > > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41]
> > ->
> > > > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> > > > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23]
> > ->
> > > > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
> > > List(5,
> > > > > 3),
> > > > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74]
> > ->
> > > > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> > > > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75]
> > ->
> > > > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> > > > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72]
> > ->
> > > > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5,
> 3)),
> > 1
> > > ->
> > > > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0),
> > [topic1,69]
> > > > ->
> > > > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> > > > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88]
> > ->
> > > > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> > > > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89]
> > ->
> > > > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> > > > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
> > > > [topic1,3]
> > > > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2,
> 4),
> > > > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] ->
> > > > List(2,
> > > > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4),
> > [topic1,11]
> > > > ->
> > > > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
> > > > > [topic1,4]
> > > > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2,
> 4),
> > > > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
> > > > > [topic1,30]
> > > > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3,
> > 5),
> > > > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27]
> > ->
> > > > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> > > > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32]
> > ->
> > > > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> > > > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
> > > > > Map([topic1,53]
> > > > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4,
> > 2),
> > > > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58]
> > ->
> > > > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> > > > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62]
> > ->
> > > > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> > > > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59]
> > ->
> > > > > > List(4, 2))) (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,295] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
> > > preferred
> > > > > > replica Map() (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:09:50,297] TRACE [Controller 2]: leader imbalance
> > > ratio
> > > > > for
> > > > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
> > > > > > [2016-08-17 13:10:37,278] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:10:37,292] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,67],
> > > > > > [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:10:37,304] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:10:43,375] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:10:43,383] DEBUG Sending MetadataRequest to
> > > > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for
> TopicAndPartitions:Set([
> > > > > > topic1,67],
> > > > > > [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
> > > > > > [2016-08-17 13:10:43,394] DEBUG [IsrChangeNotificationListener]
> > > > Fired!!!
> > > > > > (kafka.controller.IsrChangeNotificationListener)
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > Regards,
> > > > > > Mazhar Shaikh.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 17, 2016 at 9:50 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > > > Yes, you can try setting it to -1 in 0.8.1, which is the
> > equivalent
> > > > of
> > > > > > > "all" in 0.9 and above.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Wed, Aug 17, 2016 at 8:32 AM, Mazhar Shaikh <
> > > > > > mazhar.shaikh...@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > I'm using default configuration (ack=1),
> > > > > > > > changing it t0 all or 2 will not help, as the producer queue
> > will
> > > > be
> > > > > > > > exhausted is any kafka broker goes down for long time.
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Mazhar Shaikh.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 17, 2016 at 8:11 PM, Jun Rao <j...@confluent.io>
> > > wrote:
> > > > > > > >
> > > > > > > > > Are you using acks=1 or acks=all in the producer? Only the
> > > latter
> > > > > > > > > guarantees acked messages won't be lost after leader
> failure.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Wed, Aug 10, 2016 at 11:41 PM, Mazhar Shaikh <
> > > > > > > > > mazhar.shaikh...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Kafka Team,
> > > > > > > > > >
> > > > > > > > > > I'm using kafka (kafka_2.11-0.9.0.1) with librdkafka
> > (0.8.1)
> > > > API
> > > > > > for
> > > > > > > > > > producer
> > > > > > > > > > During a run of 2hrs, I notice the total number of
> messaged
> > > > ack'd
> > > > > > by
> > > > > > > > > > librdkafka delivery report is greater than the maxoffset
> > of a
> > > > > > > partition
> > > > > > > > > in
> > > > > > > > > > kafka broker.
> > > > > > > > > > I'm running kafka broker with replication factor of 2.
> > > > > > > > > >
> > > > > > > > > > Here, message has been lost between librdkafka - kafka
> > > broker.
> > > > > > > > > >
> > > > > > > > > > As librdkafka is providing success delivery report for
> all
> > > the
> > > > > > > > messages.
> > > > > > > > > >
> > > > > > > > > > Looks like kafka broker is dropping the messages after
> > > > > > acknowledging
> > > > > > > > > > librdkafka.
> > > > > > > > > >
> > > > > > > > > > Requesting you help in solving this issue.
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regards
> > > > > > > > > > Mazhar Shaikh
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to