I see a bugs raised over the same.

which is still open.

do we have any solution for this ?

https://issues.apache.org/jira/browse/KAFKA-3916
http://mail-archives.apache.org/mod_mbox/kafka-dev/201606.
mbox/%3cjira.12984498.1467148670000.10722.1467148737...@atlassian.jira%3E

Regards,
Mazhar Shaikh.




On Thu, Aug 18, 2016 at 12:37 PM, Mazhar Shaikh <mazhar.shaikh...@gmail.com>
wrote:

> Hi Jun,
>
> Setting to -1, may solve this issue.
> But it will cause producer buffer full in load test resulting to failures
> and drop of messages from client(producer side)
> Hence, this will not actually solve the problem.
>
> I need to fix this from kafka broker side, so that there is no impact on
> producer or consumer.
>
> From the logs looks like there is connection problem during between
> brokers and kafka broker is loosing records during this process.
>
> But why is kafka broker loosing records,
>
> I feel this is a BUG in kafka.
>
>
> [2016-08-17 12:54:50,293] TRACE [Controller 2]: checking need to trigger
> partition rebalance (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] DEBUG [Controller 2]: preferred replicas by
> broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
> [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
> [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
> -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
> 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
> -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
> -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
> -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> List(4, 2))) (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 0 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 5 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 1 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 2 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 3 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 4 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:55:32,783] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:32,894] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,36],
> [topic1,30], [topic1,31], [topic1,86], [topic1,78], [topic1,74],
> [topic1,82], [topic1,33]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:32,896] WARN [Controller-2-to-broker-2-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_ep
> och=2,partition_states=[{topic=topic1,partition=82,controlle
> r_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=30,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=78,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 12,replicas=[5,3]},{topic=topic1,partition=86,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=31,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 20,replicas=[3,5]},{topic=topic1,partition=36,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=74,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=33,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]}],live_brokers=[{id=5,end_points=[{
> port=9092,host=b5.kafka,security_protocol_type=0}]},{
> id=3,end_points=[{port=9092,host=b3.kafka,security_protoco
> l_type=0}]},{id=2,end_points=[{port=9092,host=b2.kafka,
> security_protocol_type=0}]},{id=1,end_points=[{port=9092,
> host=b1.kafka,security_protocol_type=0}]},{id=4,end_points=[
> {port=9092,host=b4.kafka,security_protocol_type=0}]},{
> id=0,end_points=[{port=9092,host=b0.kafka,security_protocol_type=0}]}]}
> to broker Node(2, b2.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 2 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClien
> tBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkCli
> entBlockingOps$$pollUntilFound$extension(NetworkClientBlocki
> ngOps.scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive
> $extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$1(
> ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(ControllerChannelM
> anager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,897] WARN [Controller-2-to-broker-5-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_ep
> och=2,partition_states=[{topic=topic1,partition=82,controlle
> r_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=30,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=78,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 12,replicas=[5,3]},{topic=topic1,partition=86,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=31,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 20,replicas=[3,5]},{topic=topic1,partition=36,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=74,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=33,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]}],live_brokers=[{id=0,end_points=[{
> port=9092,host=b0.kafka,security_protocol_type=0}]},{
> id=5,end_points=[{port=9092,host=b5.kafka,security_protoco
> l_type=0}]},{id=3,end_points=[{port=9092,host=b3.kafka,
> security_protocol_type=0}]},{id=1,end_points=[{port=9092,
> host=b1.kafka,security_protocol_type=0}]},{id=2,end_points=[
> {port=9092,host=b2.kafka,security_protocol_type=0}]},{
> id=4,end_points=[{port=9092,host=b4.kafka,security_protocol_type=0}]}]}
> to broker Node(5, b5.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 5 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClien
> tBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkCli
> entBlockingOps$$pollUntilFound$extension(NetworkClientBlocki
> ngOps.scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive
> $extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$1(
> ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(ControllerChannelM
> anager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,898] WARN [Controller-2-to-broker-4-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_ep
> och=2,partition_states=[{topic=topic1,partition=82,controlle
> r_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=30,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=78,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 12,replicas=[5,3]},{topic=topic1,partition=86,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=31,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 20,replicas=[3,5]},{topic=topic1,partition=36,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=74,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=33,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]}],live_brokers=[{id=3,end_points=[{
> port=9092,host=b3.kafka,security_protocol_type=0}]},{
> id=1,end_points=[{port=9092,host=b1.kafka,security_protoco
> l_type=0}]},{id=4,end_points=[{port=9092,host=b4.kafka,
> security_protocol_type=0}]},{id=2,end_points=[{port=9092,
> host=b2.kafka,security_protocol_type=0}]},{id=0,end_points=[
> {port=9092,host=b0.kafka,security_protocol_type=0}]},{
> id=5,end_points=[{port=9092,host=b5.kafka,security_protocol_type=0}]}]}
> to broker Node(4, b4.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 4 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClien
> tBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkCli
> entBlockingOps$$pollUntilFound$extension(NetworkClientBlocki
> ngOps.scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive
> $extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$1(
> ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(ControllerChannelM
> anager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,900] WARN [Controller-2-to-broker-1-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_ep
> och=2,partition_states=[{topic=topic1,partition=82,controlle
> r_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=30,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=78,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 12,replicas=[5,3]},{topic=topic1,partition=86,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=31,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 20,replicas=[3,5]},{topic=topic1,partition=36,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=74,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=33,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]}],live_brokers=[{id=5,end_points=[{
> port=9092,host=b5.kafka,security_protocol_type=0}]},{
> id=0,end_points=[{port=9092,host=b0.kafka,security_protoco
> l_type=0}]},{id=3,end_points=[{port=9092,host=b3.kafka,
> security_protocol_type=0}]},{id=1,end_points=[{port=9092,
> host=b1.kafka,security_protocol_type=0}]},{id=4,end_points=[
> {port=9092,host=b4.kafka,security_protocol_type=0}]},{
> id=2,end_points=[{port=9092,host=b2.kafka,security_protocol_type=0}]}]}
> to broker Node(1, b1.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 1 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClien
> tBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkCli
> entBlockingOps$$pollUntilFound$extension(NetworkClientBlocki
> ngOps.scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive
> $extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$1(
> ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(ControllerChannelM
> anager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,902] WARN [Controller-2-to-broker-3-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_ep
> och=2,partition_states=[{topic=topic1,partition=82,controlle
> r_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=30,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=78,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 12,replicas=[5,3]},{topic=topic1,partition=86,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=31,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 20,replicas=[3,5]},{topic=topic1,partition=36,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=74,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=33,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]}],live_brokers=[{id=3,end_points=[{
> port=9092,host=b3.kafka,security_protocol_type=0}]},{
> id=0,end_points=[{port=9092,host=b0.kafka,security_protoco
> l_type=0}]},{id=5,end_points=[{port=9092,host=b5.kafka,
> security_protocol_type=0}]},{id=2,end_points=[{port=9092,
> host=b2.kafka,security_protocol_type=0}]},{id=1,end_points=[
> {port=9092,host=b1.kafka,security_protocol_type=0}]},{
> id=4,end_points=[{port=9092,host=b4.kafka,security_protocol_type=0}]}]}
> to broker Node(3, b3.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 3 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClien
> tBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkCli
> entBlockingOps$$pollUntilFound$extension(NetworkClientBlocki
> ngOps.scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive
> $extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$1(
> ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(ControllerChannelM
> anager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,903] WARN [Controller-2-to-broker-0-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_ep
> och=2,partition_states=[{topic=topic1,partition=82,controlle
> r_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=30,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=78,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 12,replicas=[5,3]},{topic=topic1,partition=86,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=31,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 20,replicas=[3,5]},{topic=topic1,partition=36,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]},{topic=topic1,partition=74,controller
> _epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_version=
> 14,replicas=[5,3]},{topic=topic1,partition=33,controller
> _epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_version=
> 18,replicas=[3,5]}],live_brokers=[{id=4,end_points=[{
> port=9092,host=b4.kafka,security_protocol_type=0}]},{
> id=3,end_points=[{port=9092,host=b3.kafka,security_protoco
> l_type=0}]},{id=1,end_points=[{port=9092,host=b1.kafka,
> security_protocol_type=0}]},{id=2,end_points=[{port=9092,
> host=b2.kafka,security_protocol_type=0}]},{id=5,end_points=[
> {port=9092,host=b5.kafka,security_protocol_type=0}]},{
> id=0,end_points=[{port=9092,host=b0.kafka,security_protocol_type=0}]}]}
> to broker Node(0, b0.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 0 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlo
> ckingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAn
> dReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClien
> tBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkCli
> entBlockingOps$$pollUntilFound$extension(NetworkClientBlocki
> ngOps.scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive
> $extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$1(
> ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(ControllerChannelM
> anager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,927] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:33,162] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:33,169] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,50])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:33,194] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:33,198] INFO [Controller-2-to-broker-2-send-thread],
> Controller 2 connected to Node(2, b2.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,199] INFO [Controller-2-to-broker-5-send-thread],
> Controller 2 connected to Node(5, b5.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,200] INFO [Controller-2-to-broker-4-send-thread],
> Controller 2 connected to Node(4, b4.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,202] INFO [Controller-2-to-broker-1-send-thread],
> Controller 2 connected to Node(1, b1.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,204] INFO [Controller-2-to-broker-0-send-thread],
> Controller 2 connected to Node(0, b0.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,207] INFO [Controller-2-to-broker-3-send-thread],
> Controller 2 connected to Node(3, b3.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:39,981] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:40,018] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,34],
> [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
> [topic1,35], [topic1,33]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:40,377] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:40,388] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,86],
> [topic1,78], [topic1,82]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:40,409] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:59:50,293] TRACE [Controller 2]: checking need to trigger
> partition rebalance (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: preferred replicas by
> broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
> [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
> [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
> -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
> 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
> -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
> -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
> -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> List(4, 2))) (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 0 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 5 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 1 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 2 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 3 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 4 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:00:39,546] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:39,604] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,5])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:39,649] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:39,888] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,071] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37],
> [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
> [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,103] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,261] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,283] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,72],
> [topic1,80]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,296] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,656] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,662] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,55])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,934] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,335] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,393] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37],
> [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
> [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,423] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,897] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,944] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,5],
> [topic1,3], [topic1,7], [topic1,11], [topic1,2], [topic1,6], [topic1,1],
> [topic1,10], [topic1,14], [topic1,9], [topic1,15])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:48,020] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:04:50,293] TRACE [Controller 2]: checking need to trigger
> partition rebalance (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,295] DEBUG [Controller 2]: preferred replicas by
> broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
> [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
> [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
> -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
> 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
> -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
> -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
> -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> List(4, 2))) (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 0 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 5 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 1 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 2 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 3 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 4 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:05:34,317] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:34,365] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,80],
> [topic1,40], [topic1,21], [topic1,31], [topic1,84], [topic1,33])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:34,388] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:36,426] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:36,437] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,92])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:36,699] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:40,225] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:40,239] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,80],
> [topic1,84]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:40,246] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:40,958] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:41,006] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,22],
> [topic1,16], [topic1,20], [topic1,19], [topic1,40], [topic1,21],
> [topic1,18], [topic1,47], [topic1,44], [topic1,45], [topic1,42],
> [topic1,46], [topic1,43], [topic1,23]) (kafka.controller.IsrChangeNot
> ificationListener)
> [2016-08-17 13:05:41,067] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:42,517] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:42,622] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,37],
> [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
> [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:42,690] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:09:50,293] TRACE [Controller 2]: checking need to trigger
> partition rebalance (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,295] DEBUG [Controller 2]: preferred replicas by
> broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
> [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
> [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
> -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
> 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
> -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
> -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
> -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> List(4, 2))) (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 0 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 5 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 1 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 2 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 3 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,297] TRACE [Controller 2]: leader imbalance ratio for
> broker 4 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:10:37,278] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:37,292] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,67],
> [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:37,304] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:43,375] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:43,383] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([topic1,67],
> [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:43,394] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
>
> Thanks
>
> Regards,
> Mazhar Shaikh.
>
>
>
> On Wed, Aug 17, 2016 at 9:50 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Yes, you can try setting it to -1 in 0.8.1, which is the equivalent of
>> "all" in 0.9 and above.
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Aug 17, 2016 at 8:32 AM, Mazhar Shaikh <
>> mazhar.shaikh...@gmail.com>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > I'm using default configuration (ack=1),
>> > changing it t0 all or 2 will not help, as the producer queue will be
>> > exhausted is any kafka broker goes down for long time.
>> >
>> >
>> > Thanks.
>> >
>> > Regards,
>> > Mazhar Shaikh.
>> >
>> >
>> > On Wed, Aug 17, 2016 at 8:11 PM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Are you using acks=1 or acks=all in the producer? Only the latter
>> > > guarantees acked messages won't be lost after leader failure.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Wed, Aug 10, 2016 at 11:41 PM, Mazhar Shaikh <
>> > > mazhar.shaikh...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Kafka Team,
>> > > >
>> > > > I'm using kafka (kafka_2.11-0.9.0.1) with librdkafka (0.8.1) API for
>> > > > producer
>> > > > During a run of 2hrs, I notice the total number of messaged ack'd by
>> > > > librdkafka delivery report is greater than the maxoffset of a
>> partition
>> > > in
>> > > > kafka broker.
>> > > > I'm running kafka broker with replication factor of 2.
>> > > >
>> > > > Here, message has been lost between librdkafka - kafka broker.
>> > > >
>> > > > As librdkafka is providing success delivery report for all the
>> > messages.
>> > > >
>> > > > Looks like kafka broker is dropping the messages after acknowledging
>> > > > librdkafka.
>> > > >
>> > > > Requesting you help in solving this issue.
>> > > >
>> > > > Thank you.
>> > > >
>> > > >
>> > > > Regards
>> > > > Mazhar Shaikh
>> > > >
>> > >
>> >
>>
>
>

Reply via email to