Perhaps you will have to lower the tcp alive interval to avoid the socket connection from being killed.
Thanks, Jun On Fri, May 16, 2014 at 2:12 PM, Robin Yamaguchi <ro...@hasoffers.com>wrote: > The only errors logged are in the controller log: > > [2014-05-16 20:42:31,846] DEBUG [TopicChangeListener on Controller 1]: > Topic change listener fired for path /brokers/topics with children > Test2,Test1,Test3,Test4 > (kafka.controller.PartitionStateMachine$TopicChangeListener) > [2014-05-16 20:42:31,897] INFO [TopicChangeListener on Controller 1]: New > topics: [Set(Test4)], deleted topics: [Set()], new partition replica > assignment [Map([Test4,10] -> List(0, 1), [Test4,15] -> List(1, 0), > [Test4,5] -> List(1, 0), [Test4,9] -> List(1, 0), [Test4,14] -> List(0, 1), > [Test4,2] -> List(0, 1), [Test4,1] -> List(1, 0), [Test4,4] -> List(0, 1), > [T > est4,0] -> List(0, 1), [Test4,12] -> List(0, 1), [Test4,8] -> List(0, 1), > [Test4,6] -> List(0, 1), [Test4,7] -> List(1, 0), [Test4,13] -> List(1, 0), > [Test4,3] -> List(1, 0), [Test4,11] -> List(1, 0))] > (kafka.controller.PartitionStateMachine$TopicChangeListener) > [2014-05-16 20:42:31,908] INFO [Controller 1]: New topic creation callback > for > > [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11] > (kafka.controller.KafkaController) > [2014-05-16 20:42:31,911] INFO [Controller 1]: New partition creation > callback for > > [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11] > (kafka.controller.KafkaController) > [2014-05-16 20:42:31,912] INFO [Partition state machine on Controller 1]: > Invoking state change to NewPartition for partitions > > [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,069] INFO [Replica state machine on controller 1]: > Invoking state change to NewReplica for replicas > > [Topic=Test4,Partition=8,Replica=0],[Topic=Test4,Partition=11,Replica=0],[Topic=Test4,Partition=15,Replica=0],[Topic=Test4,Partition=5,Replica=1],[Topic=Test4,Partition=12,Replica=1],[Topic=Test4,Partition=10,Replica=0],[Topic=Test4,Partition= > > 3,Replica=1],[Topic=Test4,Partition=4,Replica=1],[Topic=Test4,Partition=7,Replica=1],[Topic=Test4,Partition=10,Replica=1],[Topic=Test4,Partition=0,Replica=1],[Topic=Test4,Partition=8,Replica=1],[Topic=Test4,Partition=14,Replica=0],[Topic=Test4,Partition=1,Replica=0],[Topic=Test4,Partition=2,Replica=1],[Topic=Test4,Partition=4,Replica=0],[Topic=Test4,Partition=1, > > Replica=1],[Topic=Test4,Partition=13,Replica=1],[Topic=Test4,Partition=13,Replica=0],[Topic=Test4,Partition=6,Replica=1],[Topic=Test4,Partition=14,Replica=1],[Topic=Test4,Partition=3,Replica=0],[Topic=Test4,Partition=5,Replica=0],[Topic=Test4,Partition=9,Replica=1],[Topic=Test4,Partition=2,Replica=0],[Topic=Test4,Partition=11,Replica=1],[Topic=Test4,Partition=9, > > Replica=0],[Topic=Test4,Partition=0,Replica=0],[Topic=Test4,Partition=7,Replica=0],[Topic=Test4,Partition=12,Replica=0],[Topic=Test4,Partition=15,Replica=1],[Topic=Test4,Partition=6,Replica=0] > (kafka.controller.ReplicaStateMachine) > [2014-05-16 20:42:32,091] INFO [Partition state machine on Controller 1]: > Invoking state change to OnlinePartition for partitions > > [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,092] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,3] are: [List(1, 0)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,093] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,3] to > (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,100] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,14] are: [List(0, 1)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,100] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,14] to > (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,103] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,2] are: [List(0, 1)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,103] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,2] to > (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,107] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,9] are: [List(1, 0)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,107] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,9] to > (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,110] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,10] are: [List(0, 1)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,110] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,10] to > (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,114] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,6] are: [List(0, 1)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,114] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,6] to > (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,117] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,1] are: [List(1, 0)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,117] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,1] to > (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,121] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,8] are: [List(0, 1)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,121] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,8] to > (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,124] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,0] are: [List(0, 1)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,124] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,0] to > (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,130] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,5] are: [List(1, 0)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,130] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,5] to > (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,135] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,15] are: [List(1, 0)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,135] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,15] to > (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,146] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,13] are: [List(1, 0)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,146] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,13] to > (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,150] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,4] are: [List(0, 1)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,150] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,4] to > (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,153] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,7] are: [List(1, 0)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,153] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,7] to > (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,157] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,12] are: [List(0, 1)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,157] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,12] to > (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,161] DEBUG [Partition state machine on Controller 1]: > Live assigned replicas for partition [Test4,11] are: [List(1, 0)] > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,161] DEBUG [Partition state machine on Controller 1]: > Initializing leader and isr for partition [Test4,11] to > (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13) > (kafka.controller.PartitionStateMachine) > [2014-05-16 20:42:32,167] WARN [Controller-1-to-broker-1-send-thread], > Controller 1 fails to send a request to broker > id:1,host:localhost,port:13001 (kafka.controller.RequestSendThread) > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at kafka.utils.Utils$.read(Utils.scala:376) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > [2014-05-16 20:42:32,171] WARN [Controller-1-to-broker-0-send-thread], > Controller 1 fails to send a request to broker > id:0,host:localhost,port:13000 (kafka.controller.RequestSendThread) > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at kafka.utils.Utils$.read(Utils.scala:376) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > [2014-05-16 20:42:32,179] ERROR [Controller-1-to-broker-1-send-thread], > Controller 1 epoch 13 failed to send UpdateMetadata request with > correlation id 11 to broker id:1,host:localhost,port:13001. Reconnecting to > broker. (kafka.controller.RequestSendThread) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > at > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > at > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > [2014-05-16 20:42:32,179] INFO [Controller-1-to-broker-1-send-thread], > Controller 1 connected to id:1,host:localhost,port:13001 for sending state > change requests (kafka.controller.RequestSendThread) > [2014-05-16 20:42:32,182] ERROR [Controller-1-to-broker-0-send-thread], > Controller 1 epoch 13 failed to send UpdateMetadata request with > correlation id 11 to broker id:0,host:localhost,port:13000. Reconnecting to > broker. (kafka.controller.RequestSendThread) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > at > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > at > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > [2014-05-16 20:42:32,183] INFO [Controller-1-to-broker-0-send-thread], > Controller 1 connected to id:0,host:localhost,port:13000 for sending state > change requests (kafka.controller.RequestSendThread) > [2014-05-16 20:42:32,185] INFO [Replica state machine on controller 1]: > Invoking state change to OnlineReplica for replicas > > [Topic=Test4,Partition=8,Replica=0],[Topic=Test4,Partition=11,Replica=0],[Topic=Test4,Partition=15,Replica=0],[Topic=Test4,Partition=5,Replica=1],[Topic=Test4,Partition=12,Replica=1],[Topic=Test4,Partition=10,Replica=0],[Topic=Test4,Partiti > > on=3,Replica=1],[Topic=Test4,Partition=4,Replica=1],[Topic=Test4,Partition=7,Replica=1],[Topic=Test4,Partition=10,Replica=1],[Topic=Test4,Partition=0,Replica=1],[Topic=Test4,Partition=8,Replica=1],[Topic=Test4,Partition=14,Replica=0],[Topic=Test4,Partition=1,Replica=0],[Topic=Test4,Partition=2,Replica=1],[Topic=Test4,Partition=4,Replica=0],[Topic=Test4,Partition > > =1,Replica=1],[Topic=Test4,Partition=13,Replica=1],[Topic=Test4,Partition=13,Replica=0],[Topic=Test4,Partition=6,Replica=1],[Topic=Test4,Partition=14,Replica=1],[Topic=Test4,Partition=3,Replica=0],[Topic=Test4,Partition=5,Replica=0],[Topic=Test4,Partition=9,Replica=1],[Topic=Test4,Partition=2,Replica=0],[Topic=Test4,Partition=11,Replica=1],[Topic=Test4,Partition > > =9,Replica=0],[Topic=Test4,Partition=0,Replica=0],[Topic=Test4,Partition=7,Replica=0],[Topic=Test4,Partition=12,Replica=0],[Topic=Test4,Partition=15,Replica=1],[Topic=Test4,Partition=6,Replica=0] > (kafka.controller.ReplicaStateMachine) > > > The state-change log contains no errors, and Kafka thinks the new topic is > online and in ISR: > > [r...@dp-robin01-dev.sea1.office.priv kafka01]# ./bin/kafka-topics.sh > --describe --zookeeper localhost:2181 --topic Test4 > Topic:Test4 PartitionCount:16 ReplicationFactor:2 Configs: > Topic: Test4 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: Test4 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 > Topic: Test4 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: Test4 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0 > Topic: Test4 Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: Test4 Partition: 5 Leader: 1 Replicas: 1,0 Isr: 1,0 > Topic: Test4 Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: Test4 Partition: 7 Leader: 1 Replicas: 1,0 Isr: 1,0 > Topic: Test4 Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: Test4 Partition: 9 Leader: 1 Replicas: 1,0 Isr: 1,0 > Topic: Test4 Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: Test4 Partition: 11 Leader: 1 Replicas: 1,0 Isr: 1,0 > Topic: Test4 Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: Test4 Partition: 13 Leader: 1 Replicas: 1,0 Isr: 1,0 > Topic: Test4 Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1 > Topic: Test4 Partition: 15 Leader: 1 Replicas: 1,0 Isr: 1,0 > > > But partitions were not written to disk: > > [r...@dp-robin01-dev.sea1.office.priv kafka01]# ls /data/kafka > recovery-point-offset-checkpoint Test1-0 Test1-10 Test1-12 Test1-14 > Test1-2 Test1-4 Test1-6 Test1-8 Test2-0 Test2-10 Test2-12 Test2-14 > Test2-2 Test2-4 Test2-6 Test2-8 Test3-0 Test3-10 Test3-12 Test3-14 > Test3-2 Test3-4 Test3-6 Test3-8 > replication-offset-checkpoint Test1-1 Test1-11 Test1-13 Test1-15 > Test1-3 Test1-5 Test1-7 Test1-9 Test2-1 Test2-11 Test2-13 Test2-15 > Test2-3 Test2-5 Test2-7 Test2-9 Test3-1 Test3-11 Test3-13 Test3-15 > Test3-3 Test3-5 Test3-7 Test3-9 > > [r...@dp-robin01-dev.sea1.office.priv kafka01]# ls /data/kafka01/ > recovery-point-offset-checkpoint Test1-0 Test1-10 Test1-12 Test1-14 > Test1-2 Test1-4 Test1-6 Test1-8 Test2-0 Test2-10 Test2-12 Test2-14 > Test2-2 Test2-4 Test2-6 Test2-8 Test3-0 Test3-10 Test3-12 Test3-14 > Test3-2 Test3-4 Test3-6 Test3-8 > replication-offset-checkpoint Test1-1 Test1-11 Test1-13 Test1-15 > Test1-3 Test1-5 Test1-7 Test1-9 Test2-1 Test2-11 Test2-13 Test2-15 > Test2-3 Test2-5 Test2-7 Test2-9 Test3-1 Test3-11 Test3-13 Test3-15 > Test3-3 Test3-5 Test3-7 Test3-9 > > > > On Wed, May 14, 2014 at 7:57 AM, Jun Rao <jun...@gmail.com> wrote: > > > Any error in the controller and state-change log? > > > > Thanks, > > > > Jun > > > > > > On Tue, May 13, 2014 at 9:16 PM, Robin Yamaguchi <ro...@hasoffers.com > > >wrote: > > > > > 0.8.1.1 > > > > > > > > > On Tue, May 13, 2014 at 9:02 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > Which version of Kafka are you using? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Tue, May 13, 2014 at 5:56 PM, Robin Yamaguchi < > ro...@hasoffers.com > > > > >wrote: > > > > > > > > > It seems like this mailing list wasn't updating through the web > > > archives > > > > > for a few days last week, so I wanted to send this out again in > case > > it > > > > > wasn't seen. My apologies for the repost. > > > > > > > > > > In further troubleshooting, I've also observed if a broker is shut > > down > > > > > while a connection in is CLOSE_WAIT, this error is generated on the > > > > broker > > > > > that is still up: > > > > > > > > > > 2014-05-13 20:57:35,794 - INFO > > > > > [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - > > > [Controller > > > > > 0]: New leader and ISR for partition [Test3,10] is > > > > > {"leader":0,"leader_epoch":2,"isr":[0]} > > > > > 2014-05-13 20:57:35,796 - WARN > > > > > [Controller-0-to-broker-0-send-thread:Logging$class@89] - > > > > > [Controller-0-to-broker-0-send-thread], Controller 0 fails to send > a > > > > > request to broker id:0,host:localhost,port:13000 > > > > > java.io.EOFException: Received -1 when reading from channel, socket > > has > > > > > likely been closed. > > > > > at kafka.utils.Utils$.read(Utils.scala:376) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > > > > at > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > > > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > 2014-05-13 20:57:35,799 - ERROR > > > > > [Controller-0-to-broker-0-send-thread:Logging$class@103] - > > > > > [Controller-0-to-broker-0-send-thread], Controller 0 epoch 4 failed > > to > > > > send > > > > > UpdateMetadata request with correlation id 9 to broker > > > > > id:0,host:localhost,port:13000. Reconnecting to broker. > > > > > java.nio.channels.ClosedChannelException > > > > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > 2014-05-13 20:57:35,800 - INFO > > > > > [Controller-0-to-broker-0-send-thread:Logging$class@68] - > > > > > [Controller-0-to-broker-0-send-thread], Controller 0 connected to > > > > > id:0,host:localhost,port:13000 for sending state change requests > > > > > > > > > > > > > > > This WARN is logged for every partition: > > > > > > > > > > 2014-05-13 20:57:35,806 - WARN > > > > > [ZkClient-EventThread-12-localhost:2181:Logging$class@83] - > > [Channel > > > > > manager on controller 0]: Not sending request Name: > > StopReplicaRequest; > > > > > Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false; > > > > > ControllerId: 0; ControllerEpoch: 4; Partitions: [Test2,8] to > broker > > 1, > > > > > since it is offline. > > > > > > > > > > > > > > > This ERROR is then logged for every partition continuously: > > > > > > > > > > 2014-05-13 20:57:45,839 - INFO [kafka-scheduler-3:Logging$class@68 > ] > > - > > > > > Partition [Test1,6] on broker 0: Shrinking ISR for partition > > [Test1,6] > > > > from > > > > > 0,1 to 0 > > > > > 2014-05-13 20:57:45,841 - ERROR [kafka-scheduler-3:Logging$class@97 > ] > > - > > > > > Conditional update of path /brokers/topics/Test1/partitions/6/state > > > with > > > > > data > > > > > > > > > {"controller_epoch":4,"leader":0,"version":1,"leader_epoch":3,"isr":[0]} > > > > > and expected version 6 failed due to > > > > > org.apache.zookeeper.KeeperException$BadVersionException: > > > > KeeperErrorCode = > > > > > BadVersion for /brokers/topics/Test1/partitions/6/state > > > > > 2014-05-13 20:57:45,841 - INFO [kafka-scheduler-3:Logging$class@68 > ] > > - > > > > > Partition [Test1,6] on broker 0: Cached zkVersion [6] not equal to > > that > > > > in > > > > > zookeeper, skip updating ISR > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 8, 2014 at 2:19 PM, Robin Yamaguchi < > ro...@hasoffers.com > > > > > > > > wrote: > > > > > > > > > > > Greetings, > > > > > > > > > > > > I'm looking for some feedback with using advertised.host.nameand > > > > > > advertised.port on kafka 0.8.1.1 through a load balancer. The > > > brokers > > > > > are > > > > > > fronted with haproxy to support our cluster mirroring > > configuration. > > > > The > > > > > > setup has been working as expected, where producers, consumers, > and > > > > > broker > > > > > > connections go through haproxy. I am however sometimes getting > > > errors > > > > > when > > > > > > attempting to create a new topic: > > > > > > > > > > > > 2014-05-08 19:00:49,757 - WARN > > > > > > [Controller-0-to-broker-0-send-thread:Logging$class@89] - > > > > > > [Controller-0-to-broker-0-send-thread], Controller 0 fails to > send > > a > > > > > > request to broker id:0,host:localhost,port:13000 > > > > > > java.io.EOFException: Received -1 when reading from channel, > socket > > > has > > > > > > likely been closed. > > > > > > at kafka.utils.Utils$.read(Utils.scala:376) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > > > > > at > > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > > > at > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > > > > > > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > 2014-05-08 19:00:49,769 - ERROR > > > > > > [Controller-0-to-broker-0-send-thread:Logging$class@103] - > > > > > > [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 > failed > > > to > > > > > send > > > > > > UpdateMetadata request with correlation id 7 to broker > > > > > > id:0,host:localhost,port:13000. Reconnecting to broker. > > > > > > java.nio.channels.ClosedChannelException > > > > > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > > > > > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > 2014-05-08 19:00:49,770 - INFO > > > > > > [Controller-0-to-broker-0-send-thread:Logging$class@68] - > > > > > > [Controller-0-to-broker-0-send-thread], Controller 0 connected to > > > > > > id:0,host:localhost,port:13000 for sending state change requests > > > > > > > > > > > > > > > > > > When receiving this error, the new topic is registered in > > zookeeper, > > > > but > > > > > > not written to disk by the broker. The topic however will be > > written > > > > to > > > > > > disk the next time the kafka broker is restarted. I did not > > > experience > > > > > > this behavior in other clusters that are not fronted by a load > > > > balancer. > > > > > I > > > > > > also do not get this error when kafka is initially started. > > > > > > > > > > > > To help simplify troubleshooting, I setup a single host with > kafka, > > > > > > zookeeper, and haproxy running on it with these relevant > > > > configurations: > > > > > > > > > > > > Kafka: > > > > > > advertised.host.name = localhost > > > > > > advertised.port = 13000 > > > > > > > > > > > > Zookeeper: > > > > > > port = default > > > > > > > > > > > > Haproxy: > > > > > > listen kafka_13000 0.0.0.0:13000 > > > > > > mode tcp > > > > > > option tcpka > > > > > > timeout client 5m > > > > > > timeout server 5m > > > > > > timeout connect 5m > > > > > > server h-kafka01-1b localhost:9092 > > > > > > > > > > > > Here are the network sockets Kafka creates on start-up: > > > > > > > > > > > > [r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep > -i > > > > kafka > > > > > > java 25532 kafka 18u IPv6 14717680 0t0 TCP > > *:44398 > > > > > > (LISTEN) > > > > > > java 25532 kafka 23u IPv6 14717684 0t0 TCP > > > > > > localhost.localdomain:58093->localhost.localdomain:2181 > > (ESTABLISHED) > > > > > > java 25532 kafka 38u IPv6 14717692 0t0 TCP > *:9092 > > > > > > (LISTEN) > > > > > > java 25532 kafka 39u IPv6 14717694 0t0 TCP > > > > > > localhost.localdomain:45037->localhost.localdomain:13000 > > > (ESTABLISHED) > > > > > > java 25532 kafka 40u IPv6 14717698 0t0 TCP > > > > > > localhost.localdomain:9092->localhost.localdomain:46448 > > (ESTABLISHED) > > > > > > > > > > > > > > > > > > After the 5m timeout configured in haproxy is surpassed, the > > > connection > > > > > > through port 13000 is closed (from kafka.log): > > > > > > > > > > > > 2014-05-08 19:05:40,904 - INFO > > > > [kafka-processor-9092-0:Logging$class@68 > > > > > ] > > > > > > - Closing socket connection to /127.0.0.1. > > > > > > > > > > > > > > > > > > Looking again at the network sockets, the controller to broker > > > > connection > > > > > > is in a CLOSE_WAIT state: > > > > > > > > > > > > [r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep > -i > > > > kafka > > > > > > java 25532 kafka 18u IPv6 14717680 0t0 TCP > > *:44398 > > > > > > (LISTEN) > > > > > > java 25532 kafka 23u IPv6 14717684 0t0 TCP > > > > > > localhost.localdomain:58093->localhost.localdomain:2181 > > (ESTABLISHED) > > > > > > java 25532 kafka 38u IPv6 14717692 0t0 TCP > *:9092 > > > > > > (LISTEN) > > > > > > java 25532 kafka 39u IPv6 14717694 0t0 TCP > > > > > > localhost.localdomain:45037->localhost.localdomain:13000 > > (CLOSE_WAIT) > > > > > > > > > > > > > > > > > > This is when attemping to create a topic will error with: > > > > > > java.io.EOFException: Received -1 when reading from channel, > socket > > > has > > > > > > likely been closed. > > > > > > > > > > > > The linux kernel will remove the socket in a CLOSE_WAIT state > after > > > the > > > > > > tcp keepalive expires, which defaults to 2 hours: > > > > > > > > > > > > [r...@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000 > > > > > > CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040 ::ffff: > > > > > > 127.0.0.1:13000 timer:(keepalive,46sec,0) > > > > > > > > > > > > > > > > > > List of kafka sockets after the controller to broker connection > has > > > > been > > > > > > completely removed: > > > > > > > > > > > > [r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep > -i > > > > kafka > > > > > > java 25532 kafka 18u IPv6 14717680 0t0 TCP > > *:44398 > > > > > > (LISTEN) > > > > > > java 25532 kafka 23u IPv6 14717684 0t0 TCP > > > > > > localhost.localdomain:58093->localhost.localdomain:2181 > > (ESTABLISHED) > > > > > > java 25532 kafka 38u IPv6 14717692 0t0 TCP > *:9092 > > > > > > (LISTEN) > > > > > > > > > > > > > > > > > > Now when attempting to create a new topic, Kafka detects that the > > > > > > controller to broker connection is down, reconnects successfully, > > and > > > > is > > > > > > able to write topic to disk: > > > > > > > > > > > > 2014-05-08 21:02:47,685 - INFO > > > > > > [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - > > > > [Partition > > > > > > state machine on Controller 0]: Invoking state change to > > > > OnlinePartition > > > > > > for partitions > > > > > > > > > > > > > > > > > > > > > [Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10] > > > > > > 2014-05-08 21:02:47,796 - ERROR > > > > > > [Controller-0-to-broker-0-send-thread:Logging$class@103] - > > > > > > [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 > failed > > > to > > > > > send > > > > > > LeaderAndIsr request with correlation id 11 to broker > > > > > > id:0,host:localhost,port:13000. Reconnecting to broker. > > > > > > java.io.IOException: Broken pipe > > > > > > at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) > > > > > > at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) > > > > > > at sun.nio.ch.IOUtil.write(IOUtil.java:148) > > > > > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524) > > > > > > at java.nio.channels.SocketChannel.write(SocketChannel.java:493) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) > > > > > > at > kafka.network.Send$class.writeCompletely(Transmission.scala:75) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) > > > > > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:92) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > > > > > > at > > > > > > > > > > > > > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > > > > > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > 2014-05-08 21:02:47,802 - INFO > > > > > > [Controller-0-to-broker-0-send-thread:Logging$class@68] - > > > > > > [Controller-0-to-broker-0-send-thread], Controller 0 connected to > > > > > > id:0,host:localhost,port:13000 for sending state change requests > > > > > > > > > > > > > > > > > > It seems that the controller isn't able to properly resolve a > > > > connection > > > > > > in a CLOSE_WAIT state. The exceptions thrown is different from > > when > > > > the > > > > > > socket is in a CLOSE_WAIT vs not existing at all. > > > > > > > > > > > > I can somewhat work around this issue by lowering the kernel tcp > > > > > keepalive > > > > > > settings and increasing my haproxy timeouts, but thats not very > > > > desirable > > > > > > and wouldn't work 100% of the time. I've looked through the > broker > > > > > > configuration documentation, and didn't get any meaningful > results > > > > > changing > > > > > > controller.socket.timeout.ms. > > > > > > > > > > > > Any feedback / suggestions would be greatly appreciated. > > > > > > > > > > > > Thank you, > > > > > > Robin > > > > > > > > > > > > > > > > > > > > >