Which version of Kafka are you using? Thanks,
Jun On Tue, May 13, 2014 at 5:56 PM, Robin Yamaguchi <ro...@hasoffers.com>wrote: > It seems like this mailing list wasn't updating through the web archives > for a few days last week, so I wanted to send this out again in case it > wasn't seen. My apologies for the repost. > > In further troubleshooting, I've also observed if a broker is shut down > while a connection in is CLOSE_WAIT, this error is generated on the broker > that is still up: > > 2014-05-13 20:57:35,794 - INFO > [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Controller > 0]: New leader and ISR for partition [Test3,10] is > {"leader":0,"leader_epoch":2,"isr":[0]} > 2014-05-13 20:57:35,796 - WARN > [Controller-0-to-broker-0-send-thread:Logging$class@89] - > [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a > request to broker id:0,host:localhost,port:13000 > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at kafka.utils.Utils$.read(Utils.scala:376) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > 2014-05-13 20:57:35,799 - ERROR > [Controller-0-to-broker-0-send-thread:Logging$class@103] - > [Controller-0-to-broker-0-send-thread], Controller 0 epoch 4 failed to send > UpdateMetadata request with correlation id 9 to broker > id:0,host:localhost,port:13000. Reconnecting to broker. > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > at > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > at > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > 2014-05-13 20:57:35,800 - INFO > [Controller-0-to-broker-0-send-thread:Logging$class@68] - > [Controller-0-to-broker-0-send-thread], Controller 0 connected to > id:0,host:localhost,port:13000 for sending state change requests > > > This WARN is logged for every partition: > > 2014-05-13 20:57:35,806 - WARN > [ZkClient-EventThread-12-localhost:2181:Logging$class@83] - [Channel > manager on controller 0]: Not sending request Name: StopReplicaRequest; > Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false; > ControllerId: 0; ControllerEpoch: 4; Partitions: [Test2,8] to broker 1, > since it is offline. > > > This ERROR is then logged for every partition continuously: > > 2014-05-13 20:57:45,839 - INFO [kafka-scheduler-3:Logging$class@68] - > Partition [Test1,6] on broker 0: Shrinking ISR for partition [Test1,6] from > 0,1 to 0 > 2014-05-13 20:57:45,841 - ERROR [kafka-scheduler-3:Logging$class@97] - > Conditional update of path /brokers/topics/Test1/partitions/6/state with > data > {"controller_epoch":4,"leader":0,"version":1,"leader_epoch":3,"isr":[0]} > and expected version 6 failed due to > org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = > BadVersion for /brokers/topics/Test1/partitions/6/state > 2014-05-13 20:57:45,841 - INFO [kafka-scheduler-3:Logging$class@68] - > Partition [Test1,6] on broker 0: Cached zkVersion [6] not equal to that in > zookeeper, skip updating ISR > > > > > On Thu, May 8, 2014 at 2:19 PM, Robin Yamaguchi <ro...@hasoffers.com> > wrote: > > > Greetings, > > > > I'm looking for some feedback with using advertised.host.name and > > advertised.port on kafka 0.8.1.1 through a load balancer. The brokers > are > > fronted with haproxy to support our cluster mirroring configuration. The > > setup has been working as expected, where producers, consumers, and > broker > > connections go through haproxy. I am however sometimes getting errors > when > > attempting to create a new topic: > > > > 2014-05-08 19:00:49,757 - WARN > > [Controller-0-to-broker-0-send-thread:Logging$class@89] - > > [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a > > request to broker id:0,host:localhost,port:13000 > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:376) > > at > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > at > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > 2014-05-08 19:00:49,769 - ERROR > > [Controller-0-to-broker-0-send-thread:Logging$class@103] - > > [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to > send > > UpdateMetadata request with correlation id 7 to broker > > id:0,host:localhost,port:13000. Reconnecting to broker. > > java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > > at > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > > at > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > 2014-05-08 19:00:49,770 - INFO > > [Controller-0-to-broker-0-send-thread:Logging$class@68] - > > [Controller-0-to-broker-0-send-thread], Controller 0 connected to > > id:0,host:localhost,port:13000 for sending state change requests > > > > > > When receiving this error, the new topic is registered in zookeeper, but > > not written to disk by the broker. The topic however will be written to > > disk the next time the kafka broker is restarted. I did not experience > > this behavior in other clusters that are not fronted by a load balancer. > I > > also do not get this error when kafka is initially started. > > > > To help simplify troubleshooting, I setup a single host with kafka, > > zookeeper, and haproxy running on it with these relevant configurations: > > > > Kafka: > > advertised.host.name = localhost > > advertised.port = 13000 > > > > Zookeeper: > > port = default > > > > Haproxy: > > listen kafka_13000 0.0.0.0:13000 > > mode tcp > > option tcpka > > timeout client 5m > > timeout server 5m > > timeout connect 5m > > server h-kafka01-1b localhost:9092 > > > > Here are the network sockets Kafka creates on start-up: > > > > [r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka > > java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398 > > (LISTEN) > > java 25532 kafka 23u IPv6 14717684 0t0 TCP > > localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED) > > java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092 > > (LISTEN) > > java 25532 kafka 39u IPv6 14717694 0t0 TCP > > localhost.localdomain:45037->localhost.localdomain:13000 (ESTABLISHED) > > java 25532 kafka 40u IPv6 14717698 0t0 TCP > > localhost.localdomain:9092->localhost.localdomain:46448 (ESTABLISHED) > > > > > > After the 5m timeout configured in haproxy is surpassed, the connection > > through port 13000 is closed (from kafka.log): > > > > 2014-05-08 19:05:40,904 - INFO [kafka-processor-9092-0:Logging$class@68 > ] > > - Closing socket connection to /127.0.0.1. > > > > > > Looking again at the network sockets, the controller to broker connection > > is in a CLOSE_WAIT state: > > > > [r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka > > java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398 > > (LISTEN) > > java 25532 kafka 23u IPv6 14717684 0t0 TCP > > localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED) > > java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092 > > (LISTEN) > > java 25532 kafka 39u IPv6 14717694 0t0 TCP > > localhost.localdomain:45037->localhost.localdomain:13000 (CLOSE_WAIT) > > > > > > This is when attemping to create a topic will error with: > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > > > The linux kernel will remove the socket in a CLOSE_WAIT state after the > > tcp keepalive expires, which defaults to 2 hours: > > > > [r...@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000 > > CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040 ::ffff: > > 127.0.0.1:13000 timer:(keepalive,46sec,0) > > > > > > List of kafka sockets after the controller to broker connection has been > > completely removed: > > > > [r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka > > java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398 > > (LISTEN) > > java 25532 kafka 23u IPv6 14717684 0t0 TCP > > localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED) > > java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092 > > (LISTEN) > > > > > > Now when attempting to create a new topic, Kafka detects that the > > controller to broker connection is down, reconnects successfully, and is > > able to write topic to disk: > > > > 2014-05-08 21:02:47,685 - INFO > > [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Partition > > state machine on Controller 0]: Invoking state change to OnlinePartition > > for partitions > > > [Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10] > > 2014-05-08 21:02:47,796 - ERROR > > [Controller-0-to-broker-0-send-thread:Logging$class@103] - > > [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to > send > > LeaderAndIsr request with correlation id 11 to broker > > id:0,host:localhost,port:13000. Reconnecting to broker. > > java.io.IOException: Broken pipe > > at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) > > at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) > > at sun.nio.ch.IOUtil.write(IOUtil.java:148) > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524) > > at java.nio.channels.SocketChannel.write(SocketChannel.java:493) > > at > > > kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) > > at kafka.network.Send$class.writeCompletely(Transmission.scala:75) > > at > > > kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:92) > > at > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > > at > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > 2014-05-08 21:02:47,802 - INFO > > [Controller-0-to-broker-0-send-thread:Logging$class@68] - > > [Controller-0-to-broker-0-send-thread], Controller 0 connected to > > id:0,host:localhost,port:13000 for sending state change requests > > > > > > It seems that the controller isn't able to properly resolve a connection > > in a CLOSE_WAIT state. The exceptions thrown is different from when the > > socket is in a CLOSE_WAIT vs not existing at all. > > > > I can somewhat work around this issue by lowering the kernel tcp > keepalive > > settings and increasing my haproxy timeouts, but thats not very desirable > > and wouldn't work 100% of the time. I've looked through the broker > > configuration documentation, and didn't get any meaningful results > changing > > controller.socket.timeout.ms. > > > > Any feedback / suggestions would be greatly appreciated. > > > > Thank you, > > Robin > > >