Hi Jun,

In my earlier runs, I had enabled delivery report (with and without offset
report) facility provided by librdkafka.

The producer has received successful delivery report for the all msg sent
even than the messages where lost.

as you mentioned. producer has nothing to do with this loss of messages.

I just want to know, as when can we get the fix for this bug ?

Thanks.

Regards,
Mazhar Shaikh


On Fri, Aug 19, 2016 at 1:24 AM, Jun Rao <j...@confluent.io> wrote:

> Mazhar,
>
> With ack=1, whether you lose messages or not is not deterministic. It
> depends on the time when the broker receives/acks a message, the follower
> fetches the data and the broker fails. So, it's possible that you got lucky
> in one version and unlucky in another.
>
> Thanks,
>
> Jun
>
> On Thu, Aug 18, 2016 at 12:11 PM, Mazhar Shaikh <
> mazhar.shaikh...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > Thanks for clarification, I'll give a try with ack=-1 (in producer).
> >
> > However, i did a fallback to older version of kafka
> (*kafka_2.10-0.8.2.1*),
> > and i don't see this issue (loss of messages).
> >
> > looks like kafka_2.11-0.9.0.1 has issues(BUG) during replication.
> >
> > Thanks,
> >
> > Regards,
> > Mazhar Shaikh.
> >
> >
> >
> > On Thu, Aug 18, 2016 at 10:30 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Mazhar,
> > >
> > > There is probably a mis-understanding. Ack=-1 (or all) doesn't mean
> > waiting
> > > for all replicas. It means waiting for all replicas that are in sync.
> So,
> > > if a replica is down, it will be removed from the in-sync replicas,
> which
> > > allows the producer to continue with fewer replicas.
> > >
> > > For the connection issue that you saw in the log, this could happen
> when
> > a
> > > connection is idle for some time. It won't break the replication logic
> > > since a new connection will be created automatically. You can increase
> > the
> > > socket idle time on the broker if you want to turn off this behavior.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Aug 18, 2016 at 12:07 AM, Mazhar Shaikh <
> > > mazhar.shaikh...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Setting to -1, may solve this issue.
> > > > But it will cause producer buffer full in load test resulting to
> > failures
> > > > and drop of messages from client(producer side)
> > > > Hence, this will not actually solve the problem.
> > > >
> > > > I need to fix this from kafka broker side, so that there is no impact
> > on
> > > > producer or consumer.
> > > >
> > > > From the logs looks like there is connection problem during between
> > > brokers
> > > > and kafka broker is loosing records during this process.
> > > >
> > > > But why is kafka broker loosing records,
> > > >
> > > > I feel this is a BUG in kafka.
> > > >
> > > > [2016-08-17 12:54:50,293] TRACE [Controller 2]: checking need to
> > trigger
> > > > partition rebalance (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: preferred replicas by
> > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0,
> > 1),
> > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
> List(5,
> > > 3),
> > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1
> ->
> > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69]
> > ->
> > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
> > [topic1,3]
> > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] ->
> > List(2,
> > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11]
> > ->
> > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
> > > [topic1,4]
> > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
> > > [topic1,30]
> > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
> > > Map([topic1,53]
> > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> > > > List(4, 2))) (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:55:32,783] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:32,894] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,36],
> > > > [topic1,30], [topic1,31], [topic1,86], [topic1,78], [topic1,74],
> > > > [topic1,82], [topic1,33]) (kafka.controller.
> > > IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:32,896] WARN [Controller-2-to-broker-2-
> send-thread],
> > > > Controller 2 epoch 2 fails to send request
> {controller_id=2,controller_
> > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
> > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
> > > > 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
> > > > protocol_type=0}]},{id=2,end_points=[{port=9092,host=b2.
> > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
> > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
> > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> > > > protocol_type=0}]}]}
> > > > to broker Node(2, b2.kafka, 9092). Reconnecting to broker.
> > > > (kafka.controller.RequestSendThread)
> > > > java.io.IOException: Connection to 2 was disconnected before the
> > response
> > > > was read
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:87)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:84)
> > > >         at scala.Option.foreach(Option.scala:257)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:84)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:80)
> > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > NetworkClientBlockingOps.scala:129)
> > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > NetworkClientBlockingOps.
> > > > scala:139)
> > > >         at kafka.utils.NetworkClientBlockingOps$.
> > blockingSendAndReceive$
> > > > extension(NetworkClientBlockingOps.scala:80)
> > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > 1(ControllerChannelManager.scala:180)
> > > >         at kafka.controller.RequestSendThread.doWork(
> > > > ControllerChannelManager.scala:171)
> > > >         at kafka.utils.ShutdownableThread.run(
> > > ShutdownableThread.scala:63)
> > > > [2016-08-17 12:55:32,897] WARN [Controller-2-to-broker-5-
> send-thread],
> > > > Controller 2 epoch 2 fails to send request
> {controller_id=2,controller_
> > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]}],live_brokers=[{id=0,end_
> > > > points=[{port=9092,host=b0.kafka,security_protocol_type=
> > > > 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
> > > > protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
> > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=2,end_
> > > > points=[{port=9092,host=b2.kafka,security_protocol_type=
> > > > 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_
> > > > protocol_type=0}]}]}
> > > > to broker Node(5, b5.kafka, 9092). Reconnecting to broker.
> > > > (kafka.controller.RequestSendThread)
> > > > java.io.IOException: Connection to 5 was disconnected before the
> > response
> > > > was read
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:87)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:84)
> > > >         at scala.Option.foreach(Option.scala:257)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:84)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:80)
> > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > NetworkClientBlockingOps.scala:129)
> > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > NetworkClientBlockingOps.
> > > > scala:139)
> > > >         at kafka.utils.NetworkClientBlockingOps$.
> > blockingSendAndReceive$
> > > > extension(NetworkClientBlockingOps.scala:80)
> > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > 1(ControllerChannelManager.scala:180)
> > > >         at kafka.controller.RequestSendThread.doWork(
> > > > ControllerChannelManager.scala:171)
> > > >         at kafka.utils.ShutdownableThread.run(
> > > ShutdownableThread.scala:63)
> > > > [2016-08-17 12:55:32,898] WARN [Controller-2-to-broker-4-
> send-thread],
> > > > Controller 2 epoch 2 fails to send request
> {controller_id=2,controller_
> > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
> > > > points=[{port=9092,host=b3.kafka,security_protocol_type=
> > > > 0}]},{id=1,end_points=[{port=9092,host=b1.kafka,security_
> > > > protocol_type=0}]},{id=4,end_points=[{port=9092,host=b4.
> > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=0,end_
> > > > points=[{port=9092,host=b0.kafka,security_protocol_type=
> > > > 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
> > > > protocol_type=0}]}]}
> > > > to broker Node(4, b4.kafka, 9092). Reconnecting to broker.
> > > > (kafka.controller.RequestSendThread)
> > > > java.io.IOException: Connection to 4 was disconnected before the
> > response
> > > > was read
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:87)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:84)
> > > >         at scala.Option.foreach(Option.scala:257)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:84)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:80)
> > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > NetworkClientBlockingOps.scala:129)
> > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > NetworkClientBlockingOps.
> > > > scala:139)
> > > >         at kafka.utils.NetworkClientBlockingOps$.
> > blockingSendAndReceive$
> > > > extension(NetworkClientBlockingOps.scala:80)
> > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > 1(ControllerChannelManager.scala:180)
> > > >         at kafka.controller.RequestSendThread.doWork(
> > > > ControllerChannelManager.scala:171)
> > > >         at kafka.utils.ShutdownableThread.run(
> > > ShutdownableThread.scala:63)
> > > > [2016-08-17 12:55:32,900] WARN [Controller-2-to-broker-1-
> send-thread],
> > > > Controller 2 epoch 2 fails to send request
> {controller_id=2,controller_
> > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
> > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
> > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> > > > protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
> > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
> > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
> > > > 0}]},{id=2,end_points=[{port=9092,host=b2.kafka,security_
> > > > protocol_type=0}]}]}
> > > > to broker Node(1, b1.kafka, 9092). Reconnecting to broker.
> > > > (kafka.controller.RequestSendThread)
> > > > java.io.IOException: Connection to 1 was disconnected before the
> > response
> > > > was read
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:87)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:84)
> > > >         at scala.Option.foreach(Option.scala:257)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:84)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:80)
> > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > NetworkClientBlockingOps.scala:129)
> > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > NetworkClientBlockingOps.
> > > > scala:139)
> > > >         at kafka.utils.NetworkClientBlockingOps$.
> > blockingSendAndReceive$
> > > > extension(NetworkClientBlockingOps.scala:80)
> > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > 1(ControllerChannelManager.scala:180)
> > > >         at kafka.controller.RequestSendThread.doWork(
> > > > ControllerChannelManager.scala:171)
> > > >         at kafka.utils.ShutdownableThread.run(
> > > ShutdownableThread.scala:63)
> > > > [2016-08-17 12:55:32,902] WARN [Controller-2-to-broker-3-
> send-thread],
> > > > Controller 2 epoch 2 fails to send request
> {controller_id=2,controller_
> > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
> > > > points=[{port=9092,host=b3.kafka,security_protocol_type=
> > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> > > > protocol_type=0}]},{id=5,end_points=[{port=9092,host=b5.
> > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=1,end_
> > > > points=[{port=9092,host=b1.kafka,security_protocol_type=
> > > > 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_
> > > > protocol_type=0}]}]}
> > > > to broker Node(3, b3.kafka, 9092). Reconnecting to broker.
> > > > (kafka.controller.RequestSendThread)
> > > > java.io.IOException: Connection to 3 was disconnected before the
> > response
> > > > was read
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:87)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:84)
> > > >         at scala.Option.foreach(Option.scala:257)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:84)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:80)
> > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > NetworkClientBlockingOps.scala:129)
> > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > NetworkClientBlockingOps.
> > > > scala:139)
> > > >         at kafka.utils.NetworkClientBlockingOps$.
> > blockingSendAndReceive$
> > > > extension(NetworkClientBlockingOps.scala:80)
> > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > 1(ControllerChannelManager.scala:180)
> > > >         at kafka.controller.RequestSendThread.doWork(
> > > > ControllerChannelManager.scala:171)
> > > >         at kafka.utils.ShutdownableThread.run(
> > > ShutdownableThread.scala:63)
> > > > [2016-08-17 12:55:32,903] WARN [Controller-2-to-broker-0-
> send-thread],
> > > > Controller 2 epoch 2 fails to send request
> {controller_id=2,controller_
> > > > epoch=2,partition_states=[{topic=topic1,partition=82,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=30,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=78,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=12,replicas=[5,3]},{topic=topic1,partition=86,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=31,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=20,replicas=[3,5]},{topic=topic1,partition=36,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]},{topic=topic1,partition=74,
> > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> > > > version=14,replicas=[5,3]},{topic=topic1,partition=33,
> > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> > > > version=18,replicas=[3,5]}],live_brokers=[{id=4,end_
> > > > points=[{port=9092,host=b4.kafka,security_protocol_type=
> > > > 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
> > > > protocol_type=0}]},{id=1,end_points=[{port=9092,host=b1.
> > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=5,end_
> > > > points=[{port=9092,host=b5.kafka,security_protocol_type=
> > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> > > > protocol_type=0}]}]}
> > > > to broker Node(0, b0.kafka, 9092). Reconnecting to broker.
> > > > (kafka.controller.RequestSendThread)
> > > > java.io.IOException: Connection to 0 was disconnected before the
> > response
> > > > was read
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:87)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > > > NetworkClientBlockingOps.scala:84)
> > > >         at scala.Option.foreach(Option.scala:257)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:84)
> > > >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > > > scala:80)
> > > >         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> > > > NetworkClientBlockingOps.scala:129)
> > > >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > NetworkClientBlockingOps$$pollUntilFound$extension(
> > > > NetworkClientBlockingOps.
> > > > scala:139)
> > > >         at kafka.utils.NetworkClientBlockingOps$.
> > blockingSendAndReceive$
> > > > extension(NetworkClientBlockingOps.scala:80)
> > > >         at kafka.controller.RequestSendThread.liftedTree1$
> > > > 1(ControllerChannelManager.scala:180)
> > > >         at kafka.controller.RequestSendThread.doWork(
> > > > ControllerChannelManager.scala:171)
> > > >         at kafka.utils.ShutdownableThread.run(
> > > ShutdownableThread.scala:63)
> > > > [2016-08-17 12:55:32,927] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:33,162] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:33,169] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,50])
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:33,194] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:33,198] INFO [Controller-2-to-broker-2-
> send-thread],
> > > > Controller 2 connected to Node(2, b2.kafka, 9092) for sending state
> > > change
> > > > requests (kafka.controller.RequestSendThread)
> > > > [2016-08-17 12:55:33,199] INFO [Controller-2-to-broker-5-
> send-thread],
> > > > Controller 2 connected to Node(5, b5.kafka, 9092) for sending state
> > > change
> > > > requests (kafka.controller.RequestSendThread)
> > > > [2016-08-17 12:55:33,200] INFO [Controller-2-to-broker-4-
> send-thread],
> > > > Controller 2 connected to Node(4, b4.kafka, 9092) for sending state
> > > change
> > > > requests (kafka.controller.RequestSendThread)
> > > > [2016-08-17 12:55:33,202] INFO [Controller-2-to-broker-1-
> send-thread],
> > > > Controller 2 connected to Node(1, b1.kafka, 9092) for sending state
> > > change
> > > > requests (kafka.controller.RequestSendThread)
> > > > [2016-08-17 12:55:33,204] INFO [Controller-2-to-broker-0-
> send-thread],
> > > > Controller 2 connected to Node(0, b0.kafka, 9092) for sending state
> > > change
> > > > requests (kafka.controller.RequestSendThread)
> > > > [2016-08-17 12:55:33,207] INFO [Controller-2-to-broker-3-
> send-thread],
> > > > Controller 2 connected to Node(3, b3.kafka, 9092) for sending state
> > > change
> > > > requests (kafka.controller.RequestSendThread)
> > > > [2016-08-17 12:55:39,981] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:40,018] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,34],
> > > > [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
> > > > [topic1,35], [topic1,33]) (kafka.controller.
> > > IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:40,377] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:40,388] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,86],
> > > > [topic1,78], [topic1,82]) (kafka.controller.
> > > IsrChangeNotificationListener)
> > > > [2016-08-17 12:55:40,409] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 12:59:50,293] TRACE [Controller 2]: checking need to
> > trigger
> > > > partition rebalance (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: preferred replicas by
> > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0,
> > 1),
> > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
> List(5,
> > > 3),
> > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1
> ->
> > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69]
> > ->
> > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
> > [topic1,3]
> > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] ->
> > List(2,
> > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11]
> > ->
> > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
> > > [topic1,4]
> > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
> > > [topic1,30]
> > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
> > > Map([topic1,53]
> > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> > > > List(4, 2))) (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:00:39,546] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:39,604] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,5])
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:39,649] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:39,888] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:40,071] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,37],
> > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> > > > [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
> > > > [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:40,103] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:40,261] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:40,283] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,72],
> > > > [topic1,80]) (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:40,296] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:40,656] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:40,662] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,55])
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:40,934] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:47,335] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:47,393] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,37],
> > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> > > > [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
> > > > [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:47,423] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:47,897] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:47,944] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,5],
> > > > [topic1,3], [topic1,7], [topic1,11], [topic1,2], [topic1,6],
> > [topic1,1],
> > > > [topic1,10], [topic1,14], [topic1,9], [topic1,15]) (kafka.controller.
> > > > IsrChangeNotificationListener)
> > > > [2016-08-17 13:00:48,020] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:04:50,293] TRACE [Controller 2]: checking need to
> > trigger
> > > > partition rebalance (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: preferred replicas by
> > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0,
> > 1),
> > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
> List(5,
> > > 3),
> > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1
> ->
> > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69]
> > ->
> > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
> > [topic1,3]
> > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] ->
> > List(2,
> > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11]
> > ->
> > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
> > > [topic1,4]
> > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
> > > [topic1,30]
> > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
> > > Map([topic1,53]
> > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> > > > List(4, 2))) (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,295] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:05:34,317] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:34,365] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,80],
> > > > [topic1,40], [topic1,21], [topic1,31], [topic1,84], [topic1,33])
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:34,388] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:36,426] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:36,437] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,92])
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:36,699] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:40,225] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:40,239] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,80],
> > > > [topic1,84]) (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:40,246] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:40,958] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:41,006] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,22],
> > > > [topic1,16], [topic1,20], [topic1,19], [topic1,40], [topic1,21],
> > > > [topic1,18], [topic1,47], [topic1,44], [topic1,45], [topic1,42],
> > > > [topic1,46], [topic1,43], [topic1,23]) (kafka.controller.
> > > > IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:41,067] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:42,517] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:42,622] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,37],
> > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> > > > [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
> > > > [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> (kafka.controller.
> > > > IsrChangeNotificationListener)
> > > > [2016-08-17 13:05:42,690] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:09:50,293] TRACE [Controller 2]: checking need to
> > trigger
> > > > partition rebalance (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: preferred replicas by
> > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0,
> > 1),
> > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] ->
> List(5,
> > > 3),
> > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1
> ->
> > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69]
> > ->
> > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4),
> > [topic1,3]
> > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] ->
> > List(2,
> > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11]
> > ->
> > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4),
> > > [topic1,4]
> > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5),
> > > [topic1,30]
> > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 ->
> > > Map([topic1,53]
> > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> > > > List(4, 2))) (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,295] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 0 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 5 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 1 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 2 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 3 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in
> preferred
> > > > replica Map() (kafka.controller.KafkaController)
> > > > [2016-08-17 13:09:50,297] TRACE [Controller 2]: leader imbalance
> ratio
> > > for
> > > > broker 4 is 0.000000 (kafka.controller.KafkaController)
> > > > [2016-08-17 13:10:37,278] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:10:37,292] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,67],
> > > > [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:10:37,304] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:10:43,375] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:10:43,383] DEBUG Sending MetadataRequest to
> > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> > > > topic1,67],
> > > > [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
> > > > [2016-08-17 13:10:43,394] DEBUG [IsrChangeNotificationListener]
> > Fired!!!
> > > > (kafka.controller.IsrChangeNotificationListener)
> > > >
> > > > Thanks
> > > >
> > > > Regards,
> > > > Mazhar Shaikh.
> > > >
> > > >
> > > >
> > > > On Wed, Aug 17, 2016 at 9:50 PM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Yes, you can try setting it to -1 in 0.8.1, which is the equivalent
> > of
> > > > > "all" in 0.9 and above.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Aug 17, 2016 at 8:32 AM, Mazhar Shaikh <
> > > > mazhar.shaikh...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > I'm using default configuration (ack=1),
> > > > > > changing it t0 all or 2 will not help, as the producer queue will
> > be
> > > > > > exhausted is any kafka broker goes down for long time.
> > > > > >
> > > > > >
> > > > > > Thanks.
> > > > > >
> > > > > > Regards,
> > > > > > Mazhar Shaikh.
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 17, 2016 at 8:11 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > > > Are you using acks=1 or acks=all in the producer? Only the
> latter
> > > > > > > guarantees acked messages won't be lost after leader failure.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Wed, Aug 10, 2016 at 11:41 PM, Mazhar Shaikh <
> > > > > > > mazhar.shaikh...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Kafka Team,
> > > > > > > >
> > > > > > > > I'm using kafka (kafka_2.11-0.9.0.1) with librdkafka (0.8.1)
> > API
> > > > for
> > > > > > > > producer
> > > > > > > > During a run of 2hrs, I notice the total number of messaged
> > ack'd
> > > > by
> > > > > > > > librdkafka delivery report is greater than the maxoffset of a
> > > > > partition
> > > > > > > in
> > > > > > > > kafka broker.
> > > > > > > > I'm running kafka broker with replication factor of 2.
> > > > > > > >
> > > > > > > > Here, message has been lost between librdkafka - kafka
> broker.
> > > > > > > >
> > > > > > > > As librdkafka is providing success delivery report for all
> the
> > > > > > messages.
> > > > > > > >
> > > > > > > > Looks like kafka broker is dropping the messages after
> > > > acknowledging
> > > > > > > > librdkafka.
> > > > > > > >
> > > > > > > > Requesting you help in solving this issue.
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Mazhar Shaikh
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to