Mazhar, With ack=1, whether you lose messages or not is not deterministic. It depends on the time when the broker receives/acks a message, the follower fetches the data and the broker fails. So, it's possible that you got lucky in one version and unlucky in another.
Thanks, Jun On Thu, Aug 18, 2016 at 12:11 PM, Mazhar Shaikh <mazhar.shaikh...@gmail.com> wrote: > Hi Jun, > > Thanks for clarification, I'll give a try with ack=-1 (in producer). > > However, i did a fallback to older version of kafka (*kafka_2.10-0.8.2.1*), > and i don't see this issue (loss of messages). > > looks like kafka_2.11-0.9.0.1 has issues(BUG) during replication. > > Thanks, > > Regards, > Mazhar Shaikh. > > > > On Thu, Aug 18, 2016 at 10:30 PM, Jun Rao <j...@confluent.io> wrote: > > > Mazhar, > > > > There is probably a mis-understanding. Ack=-1 (or all) doesn't mean > waiting > > for all replicas. It means waiting for all replicas that are in sync. So, > > if a replica is down, it will be removed from the in-sync replicas, which > > allows the producer to continue with fewer replicas. > > > > For the connection issue that you saw in the log, this could happen when > a > > connection is idle for some time. It won't break the replication logic > > since a new connection will be created automatically. You can increase > the > > socket idle time on the broker if you want to turn off this behavior. > > > > Thanks, > > > > Jun > > > > On Thu, Aug 18, 2016 at 12:07 AM, Mazhar Shaikh < > > mazhar.shaikh...@gmail.com> > > wrote: > > > > > Hi Jun, > > > > > > Setting to -1, may solve this issue. > > > But it will cause producer buffer full in load test resulting to > failures > > > and drop of messages from client(producer side) > > > Hence, this will not actually solve the problem. > > > > > > I need to fix this from kafka broker side, so that there is no impact > on > > > producer or consumer. > > > > > > From the logs looks like there is connection problem during between > > brokers > > > and kafka broker is loosing records during this process. > > > > > > But why is kafka broker loosing records, > > > > > > I feel this is a BUG in kafka. > > > > > > [2016-08-17 12:54:50,293] TRACE [Controller 2]: checking need to > trigger > > > partition rebalance (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: preferred replicas by > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, > 1), > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] -> > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1), > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] -> > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1), > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] -> > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, > > 3), > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] -> > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3), > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] -> > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3), > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] -> > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 -> > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] > -> > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0), > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] -> > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0), > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] -> > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0), > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), > [topic1,3] > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4), > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> > List(2, > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] > -> > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), > > [topic1,4] > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4), > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), > > [topic1,30] > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5), > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] -> > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5), > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] -> > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5), > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> > > Map([topic1,53] > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2), > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] -> > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2), > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] -> > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2), > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] -> > > > List(4, 2))) (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 0 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 5 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 1 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 2 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 3 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 4 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:55:32,783] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 12:55:32,894] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,36], > > > [topic1,30], [topic1,31], [topic1,86], [topic1,78], [topic1,74], > > > [topic1,82], [topic1,33]) (kafka.controller. > > IsrChangeNotificationListener) > > > [2016-08-17 12:55:32,896] WARN [Controller-2-to-broker-2-send-thread], > > > Controller 2 epoch 2 fails to send request {controller_id=2,controller_ > > > epoch=2,partition_states=[{topic=topic1,partition=82, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=30, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=78, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=12,replicas=[5,3]},{topic=topic1,partition=86, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=31, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=20,replicas=[3,5]},{topic=topic1,partition=36, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=74, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=33, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]}],live_brokers=[{id=5,end_ > > > points=[{port=9092,host=b5.kafka,security_protocol_type= > > > 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_ > > > protocol_type=0}]},{id=2,end_points=[{port=9092,host=b2. > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port= > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_ > > > points=[{port=9092,host=b4.kafka,security_protocol_type= > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_ > > > protocol_type=0}]}]} > > > to broker Node(2, b2.kafka, 9092). Reconnecting to broker. > > > (kafka.controller.RequestSendThread) > > > java.io.IOException: Connection to 2 was disconnected before the > response > > > was read > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:87) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:84) > > > at scala.Option.foreach(Option.scala:257) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:84) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:80) > > > at kafka.utils.NetworkClientBlockingOps$.recurse$1( > > > NetworkClientBlockingOps.scala:129) > > > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > > > NetworkClientBlockingOps$$pollUntilFound$extension( > > > NetworkClientBlockingOps. > > > scala:139) > > > at kafka.utils.NetworkClientBlockingOps$. > blockingSendAndReceive$ > > > extension(NetworkClientBlockingOps.scala:80) > > > at kafka.controller.RequestSendThread.liftedTree1$ > > > 1(ControllerChannelManager.scala:180) > > > at kafka.controller.RequestSendThread.doWork( > > > ControllerChannelManager.scala:171) > > > at kafka.utils.ShutdownableThread.run( > > ShutdownableThread.scala:63) > > > [2016-08-17 12:55:32,897] WARN [Controller-2-to-broker-5-send-thread], > > > Controller 2 epoch 2 fails to send request {controller_id=2,controller_ > > > epoch=2,partition_states=[{topic=topic1,partition=82, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=30, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=78, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=12,replicas=[5,3]},{topic=topic1,partition=86, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=31, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=20,replicas=[3,5]},{topic=topic1,partition=36, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=74, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=33, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]}],live_brokers=[{id=0,end_ > > > points=[{port=9092,host=b0.kafka,security_protocol_type= > > > 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_ > > > protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3. > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port= > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=2,end_ > > > points=[{port=9092,host=b2.kafka,security_protocol_type= > > > 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_ > > > protocol_type=0}]}]} > > > to broker Node(5, b5.kafka, 9092). Reconnecting to broker. > > > (kafka.controller.RequestSendThread) > > > java.io.IOException: Connection to 5 was disconnected before the > response > > > was read > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:87) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:84) > > > at scala.Option.foreach(Option.scala:257) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:84) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:80) > > > at kafka.utils.NetworkClientBlockingOps$.recurse$1( > > > NetworkClientBlockingOps.scala:129) > > > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > > > NetworkClientBlockingOps$$pollUntilFound$extension( > > > NetworkClientBlockingOps. > > > scala:139) > > > at kafka.utils.NetworkClientBlockingOps$. > blockingSendAndReceive$ > > > extension(NetworkClientBlockingOps.scala:80) > > > at kafka.controller.RequestSendThread.liftedTree1$ > > > 1(ControllerChannelManager.scala:180) > > > at kafka.controller.RequestSendThread.doWork( > > > ControllerChannelManager.scala:171) > > > at kafka.utils.ShutdownableThread.run( > > ShutdownableThread.scala:63) > > > [2016-08-17 12:55:32,898] WARN [Controller-2-to-broker-4-send-thread], > > > Controller 2 epoch 2 fails to send request {controller_id=2,controller_ > > > epoch=2,partition_states=[{topic=topic1,partition=82, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=30, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=78, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=12,replicas=[5,3]},{topic=topic1,partition=86, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=31, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=20,replicas=[3,5]},{topic=topic1,partition=36, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=74, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=33, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]}],live_brokers=[{id=3,end_ > > > points=[{port=9092,host=b3.kafka,security_protocol_type= > > > 0}]},{id=1,end_points=[{port=9092,host=b1.kafka,security_ > > > protocol_type=0}]},{id=4,end_points=[{port=9092,host=b4. > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port= > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=0,end_ > > > points=[{port=9092,host=b0.kafka,security_protocol_type= > > > 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_ > > > protocol_type=0}]}]} > > > to broker Node(4, b4.kafka, 9092). Reconnecting to broker. > > > (kafka.controller.RequestSendThread) > > > java.io.IOException: Connection to 4 was disconnected before the > response > > > was read > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:87) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:84) > > > at scala.Option.foreach(Option.scala:257) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:84) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:80) > > > at kafka.utils.NetworkClientBlockingOps$.recurse$1( > > > NetworkClientBlockingOps.scala:129) > > > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > > > NetworkClientBlockingOps$$pollUntilFound$extension( > > > NetworkClientBlockingOps. > > > scala:139) > > > at kafka.utils.NetworkClientBlockingOps$. > blockingSendAndReceive$ > > > extension(NetworkClientBlockingOps.scala:80) > > > at kafka.controller.RequestSendThread.liftedTree1$ > > > 1(ControllerChannelManager.scala:180) > > > at kafka.controller.RequestSendThread.doWork( > > > ControllerChannelManager.scala:171) > > > at kafka.utils.ShutdownableThread.run( > > ShutdownableThread.scala:63) > > > [2016-08-17 12:55:32,900] WARN [Controller-2-to-broker-1-send-thread], > > > Controller 2 epoch 2 fails to send request {controller_id=2,controller_ > > > epoch=2,partition_states=[{topic=topic1,partition=82, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=30, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=78, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=12,replicas=[5,3]},{topic=topic1,partition=86, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=31, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=20,replicas=[3,5]},{topic=topic1,partition=36, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=74, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=33, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]}],live_brokers=[{id=5,end_ > > > points=[{port=9092,host=b5.kafka,security_protocol_type= > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_ > > > protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3. > > > kafka,security_protocol_type=0}]},{id=1,end_points=[{port= > > > 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_ > > > points=[{port=9092,host=b4.kafka,security_protocol_type= > > > 0}]},{id=2,end_points=[{port=9092,host=b2.kafka,security_ > > > protocol_type=0}]}]} > > > to broker Node(1, b1.kafka, 9092). Reconnecting to broker. > > > (kafka.controller.RequestSendThread) > > > java.io.IOException: Connection to 1 was disconnected before the > response > > > was read > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:87) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:84) > > > at scala.Option.foreach(Option.scala:257) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:84) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:80) > > > at kafka.utils.NetworkClientBlockingOps$.recurse$1( > > > NetworkClientBlockingOps.scala:129) > > > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > > > NetworkClientBlockingOps$$pollUntilFound$extension( > > > NetworkClientBlockingOps. > > > scala:139) > > > at kafka.utils.NetworkClientBlockingOps$. > blockingSendAndReceive$ > > > extension(NetworkClientBlockingOps.scala:80) > > > at kafka.controller.RequestSendThread.liftedTree1$ > > > 1(ControllerChannelManager.scala:180) > > > at kafka.controller.RequestSendThread.doWork( > > > ControllerChannelManager.scala:171) > > > at kafka.utils.ShutdownableThread.run( > > ShutdownableThread.scala:63) > > > [2016-08-17 12:55:32,902] WARN [Controller-2-to-broker-3-send-thread], > > > Controller 2 epoch 2 fails to send request {controller_id=2,controller_ > > > epoch=2,partition_states=[{topic=topic1,partition=82, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=30, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=78, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=12,replicas=[5,3]},{topic=topic1,partition=86, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=31, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=20,replicas=[3,5]},{topic=topic1,partition=36, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=74, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=33, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]}],live_brokers=[{id=3,end_ > > > points=[{port=9092,host=b3.kafka,security_protocol_type= > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_ > > > protocol_type=0}]},{id=5,end_points=[{port=9092,host=b5. > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port= > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=1,end_ > > > points=[{port=9092,host=b1.kafka,security_protocol_type= > > > 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_ > > > protocol_type=0}]}]} > > > to broker Node(3, b3.kafka, 9092). Reconnecting to broker. > > > (kafka.controller.RequestSendThread) > > > java.io.IOException: Connection to 3 was disconnected before the > response > > > was read > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:87) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:84) > > > at scala.Option.foreach(Option.scala:257) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:84) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:80) > > > at kafka.utils.NetworkClientBlockingOps$.recurse$1( > > > NetworkClientBlockingOps.scala:129) > > > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > > > NetworkClientBlockingOps$$pollUntilFound$extension( > > > NetworkClientBlockingOps. > > > scala:139) > > > at kafka.utils.NetworkClientBlockingOps$. > blockingSendAndReceive$ > > > extension(NetworkClientBlockingOps.scala:80) > > > at kafka.controller.RequestSendThread.liftedTree1$ > > > 1(ControllerChannelManager.scala:180) > > > at kafka.controller.RequestSendThread.doWork( > > > ControllerChannelManager.scala:171) > > > at kafka.utils.ShutdownableThread.run( > > ShutdownableThread.scala:63) > > > [2016-08-17 12:55:32,903] WARN [Controller-2-to-broker-0-send-thread], > > > Controller 2 epoch 2 fails to send request {controller_id=2,controller_ > > > epoch=2,partition_states=[{topic=topic1,partition=82, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=30, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=78, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=12,replicas=[5,3]},{topic=topic1,partition=86, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=31, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=20,replicas=[3,5]},{topic=topic1,partition=36, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]},{topic=topic1,partition=74, > > > controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_ > > > version=14,replicas=[5,3]},{topic=topic1,partition=33, > > > controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_ > > > version=18,replicas=[3,5]}],live_brokers=[{id=4,end_ > > > points=[{port=9092,host=b4.kafka,security_protocol_type= > > > 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_ > > > protocol_type=0}]},{id=1,end_points=[{port=9092,host=b1. > > > kafka,security_protocol_type=0}]},{id=2,end_points=[{port= > > > 9092,host=b2.kafka,security_protocol_type=0}]},{id=5,end_ > > > points=[{port=9092,host=b5.kafka,security_protocol_type= > > > 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_ > > > protocol_type=0}]}]} > > > to broker Node(0, b0.kafka, 9092). Reconnecting to broker. > > > (kafka.controller.RequestSendThread) > > > java.io.IOException: Connection to 0 was disconnected before the > response > > > was read > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:87) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > > > NetworkClientBlockingOps.scala:84) > > > at scala.Option.foreach(Option.scala:257) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:84) > > > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > > > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > > > scala:80) > > > at kafka.utils.NetworkClientBlockingOps$.recurse$1( > > > NetworkClientBlockingOps.scala:129) > > > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > > > NetworkClientBlockingOps$$pollUntilFound$extension( > > > NetworkClientBlockingOps. > > > scala:139) > > > at kafka.utils.NetworkClientBlockingOps$. > blockingSendAndReceive$ > > > extension(NetworkClientBlockingOps.scala:80) > > > at kafka.controller.RequestSendThread.liftedTree1$ > > > 1(ControllerChannelManager.scala:180) > > > at kafka.controller.RequestSendThread.doWork( > > > ControllerChannelManager.scala:171) > > > at kafka.utils.ShutdownableThread.run( > > ShutdownableThread.scala:63) > > > [2016-08-17 12:55:32,927] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 12:55:33,162] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 12:55:33,169] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,50]) > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 12:55:33,194] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 12:55:33,198] INFO [Controller-2-to-broker-2-send-thread], > > > Controller 2 connected to Node(2, b2.kafka, 9092) for sending state > > change > > > requests (kafka.controller.RequestSendThread) > > > [2016-08-17 12:55:33,199] INFO [Controller-2-to-broker-5-send-thread], > > > Controller 2 connected to Node(5, b5.kafka, 9092) for sending state > > change > > > requests (kafka.controller.RequestSendThread) > > > [2016-08-17 12:55:33,200] INFO [Controller-2-to-broker-4-send-thread], > > > Controller 2 connected to Node(4, b4.kafka, 9092) for sending state > > change > > > requests (kafka.controller.RequestSendThread) > > > [2016-08-17 12:55:33,202] INFO [Controller-2-to-broker-1-send-thread], > > > Controller 2 connected to Node(1, b1.kafka, 9092) for sending state > > change > > > requests (kafka.controller.RequestSendThread) > > > [2016-08-17 12:55:33,204] INFO [Controller-2-to-broker-0-send-thread], > > > Controller 2 connected to Node(0, b0.kafka, 9092) for sending state > > change > > > requests (kafka.controller.RequestSendThread) > > > [2016-08-17 12:55:33,207] INFO [Controller-2-to-broker-3-send-thread], > > > Controller 2 connected to Node(3, b3.kafka, 9092) for sending state > > change > > > requests (kafka.controller.RequestSendThread) > > > [2016-08-17 12:55:39,981] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 12:55:40,018] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,34], > > > [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38], > > > [topic1,35], [topic1,33]) (kafka.controller. > > IsrChangeNotificationListener) > > > [2016-08-17 12:55:40,377] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 12:55:40,388] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,86], > > > [topic1,78], [topic1,82]) (kafka.controller. > > IsrChangeNotificationListener) > > > [2016-08-17 12:55:40,409] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 12:59:50,293] TRACE [Controller 2]: checking need to > trigger > > > partition rebalance (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: preferred replicas by > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, > 1), > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] -> > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1), > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] -> > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1), > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] -> > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, > > 3), > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] -> > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3), > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] -> > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3), > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] -> > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 -> > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] > -> > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0), > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] -> > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0), > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] -> > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0), > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), > [topic1,3] > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4), > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> > List(2, > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] > -> > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), > > [topic1,4] > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4), > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), > > [topic1,30] > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5), > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] -> > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5), > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] -> > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5), > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> > > Map([topic1,53] > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2), > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] -> > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2), > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] -> > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2), > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] -> > > > List(4, 2))) (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 0 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 5 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 1 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 2 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 3 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 4 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:00:39,546] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:39,604] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,5]) > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:39,649] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:39,888] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:40,071] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,37], > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39], > > > [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29], > > > [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28]) > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:40,103] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:40,261] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:40,283] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,72], > > > [topic1,80]) (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:40,296] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:40,656] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:40,662] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,55]) > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:40,934] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:47,335] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:47,393] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,37], > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39], > > > [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29], > > > [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28]) > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:47,423] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:47,897] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:00:47,944] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,5], > > > [topic1,3], [topic1,7], [topic1,11], [topic1,2], [topic1,6], > [topic1,1], > > > [topic1,10], [topic1,14], [topic1,9], [topic1,15]) (kafka.controller. > > > IsrChangeNotificationListener) > > > [2016-08-17 13:00:48,020] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:04:50,293] TRACE [Controller 2]: checking need to > trigger > > > partition rebalance (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: preferred replicas by > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, > 1), > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] -> > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1), > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] -> > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1), > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] -> > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, > > 3), > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] -> > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3), > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] -> > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3), > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] -> > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 -> > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] > -> > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0), > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] -> > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0), > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] -> > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0), > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), > [topic1,3] > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4), > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> > List(2, > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] > -> > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), > > [topic1,4] > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4), > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), > > [topic1,30] > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5), > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] -> > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5), > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] -> > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5), > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> > > Map([topic1,53] > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2), > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] -> > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2), > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] -> > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2), > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] -> > > > List(4, 2))) (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,295] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 0 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 5 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 1 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 2 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 3 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 4 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:05:34,317] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:34,365] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,80], > > > [topic1,40], [topic1,21], [topic1,31], [topic1,84], [topic1,33]) > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:34,388] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:36,426] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:36,437] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,92]) > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:36,699] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:40,225] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:40,239] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,80], > > > [topic1,84]) (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:40,246] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:40,958] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:41,006] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,22], > > > [topic1,16], [topic1,20], [topic1,19], [topic1,40], [topic1,21], > > > [topic1,18], [topic1,47], [topic1,44], [topic1,45], [topic1,42], > > > [topic1,46], [topic1,43], [topic1,23]) (kafka.controller. > > > IsrChangeNotificationListener) > > > [2016-08-17 13:05:41,067] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:42,517] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:05:42,622] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,37], > > > [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39], > > > [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38], > > > [topic1,26], [topic1,35], [topic1,33], [topic1,28]) (kafka.controller. > > > IsrChangeNotificationListener) > > > [2016-08-17 13:05:42,690] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:09:50,293] TRACE [Controller 2]: checking need to > trigger > > > partition rebalance (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: preferred replicas by > > > broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, > 1), > > > [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] -> > > > List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1), > > > [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] -> > > > List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1), > > > [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] -> > > > List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, > > 3), > > > [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] -> > > > List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3), > > > [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] -> > > > List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3), > > > [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] -> > > > List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 -> > > > Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] > -> > > > List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0), > > > [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] -> > > > List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0), > > > [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] -> > > > List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0), > > > [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), > [topic1,3] > > > -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4), > > > [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> > List(2, > > > 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] > -> > > > List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), > > [topic1,4] > > > -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4), > > > [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), > > [topic1,30] > > > -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5), > > > [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] -> > > > List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5), > > > [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] -> > > > List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5), > > > [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> > > Map([topic1,53] > > > -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2), > > > [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] -> > > > List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2), > > > [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] -> > > > List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2), > > > [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] -> > > > List(4, 2))) (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,295] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 0 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 5 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 1 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 2 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 3 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred > > > replica Map() (kafka.controller.KafkaController) > > > [2016-08-17 13:09:50,297] TRACE [Controller 2]: leader imbalance ratio > > for > > > broker 4 is 0.000000 (kafka.controller.KafkaController) > > > [2016-08-17 13:10:37,278] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:10:37,292] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,67], > > > [topic1,95]) (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:10:37,304] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:10:43,375] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:10:43,383] DEBUG Sending MetadataRequest to > > > Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([ > > > topic1,67], > > > [topic1,95]) (kafka.controller.IsrChangeNotificationListener) > > > [2016-08-17 13:10:43,394] DEBUG [IsrChangeNotificationListener] > Fired!!! > > > (kafka.controller.IsrChangeNotificationListener) > > > > > > Thanks > > > > > > Regards, > > > Mazhar Shaikh. > > > > > > > > > > > > On Wed, Aug 17, 2016 at 9:50 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Yes, you can try setting it to -1 in 0.8.1, which is the equivalent > of > > > > "all" in 0.9 and above. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Wed, Aug 17, 2016 at 8:32 AM, Mazhar Shaikh < > > > mazhar.shaikh...@gmail.com > > > > > > > > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > I'm using default configuration (ack=1), > > > > > changing it t0 all or 2 will not help, as the producer queue will > be > > > > > exhausted is any kafka broker goes down for long time. > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > Regards, > > > > > Mazhar Shaikh. > > > > > > > > > > > > > > > On Wed, Aug 17, 2016 at 8:11 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > > > Are you using acks=1 or acks=all in the producer? Only the latter > > > > > > guarantees acked messages won't be lost after leader failure. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Wed, Aug 10, 2016 at 11:41 PM, Mazhar Shaikh < > > > > > > mazhar.shaikh...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Kafka Team, > > > > > > > > > > > > > > I'm using kafka (kafka_2.11-0.9.0.1) with librdkafka (0.8.1) > API > > > for > > > > > > > producer > > > > > > > During a run of 2hrs, I notice the total number of messaged > ack'd > > > by > > > > > > > librdkafka delivery report is greater than the maxoffset of a > > > > partition > > > > > > in > > > > > > > kafka broker. > > > > > > > I'm running kafka broker with replication factor of 2. > > > > > > > > > > > > > > Here, message has been lost between librdkafka - kafka broker. > > > > > > > > > > > > > > As librdkafka is providing success delivery report for all the > > > > > messages. > > > > > > > > > > > > > > Looks like kafka broker is dropping the messages after > > > acknowledging > > > > > > > librdkafka. > > > > > > > > > > > > > > Requesting you help in solving this issue. > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > Mazhar Shaikh > > > > > > > > > > > > > > > > > > > > > > > > > > > >