Greetings,

I'm looking for some feedback with using advertised.host.name and
advertised.port on kafka 0.8.1.1 through a load balancer.  The brokers are
fronted with haproxy to support our cluster mirroring configuration.  The
setup has been working as expected, where producers, consumers, and broker
connections go through haproxy.  I am however sometimes getting errors when
attempting to create a new topic:

2014-05-08 19:00:49,757 - WARN
 [Controller-0-to-broker-0-send-thread:Logging$class@89] -
[Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
request to broker id:0,host:localhost,port:13000
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-05-08 19:00:49,769 - ERROR
[Controller-0-to-broker-0-send-thread:Logging$class@103] -
[Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
UpdateMetadata request with correlation id 7 to broker
id:0,host:localhost,port:13000. Reconnecting to broker.
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-05-08 19:00:49,770 - INFO
 [Controller-0-to-broker-0-send-thread:Logging$class@68] -
[Controller-0-to-broker-0-send-thread], Controller 0 connected to
id:0,host:localhost,port:13000 for sending state change requests


When receiving this error, the new topic is registered in zookeeper, but
not written to disk by the broker.  The topic however will be written to
disk the next time the kafka broker is restarted.  I did not experience
this behavior in other clusters that are not fronted by a load balancer.  I
also do not get this error when kafka is initially started.

To help simplify troubleshooting, I setup a single host with kafka,
zookeeper, and haproxy running on it with these relevant configurations:

Kafka:
advertised.host.name = localhost
advertised.port = 13000

Zookeeper:
port = default

Haproxy:
listen kafka_13000 0.0.0.0:13000
mode tcp
option tcpka
timeout client 5m
timeout server 5m
timeout connect 5m
server h-kafka01-1b localhost:9092

Here are the network sockets Kafka creates on start-up:

[r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
java      25532     kafka   18u  IPv6 14717680      0t0  TCP *:44398
(LISTEN)
java      25532     kafka   23u  IPv6 14717684      0t0  TCP
localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
java      25532     kafka   38u  IPv6 14717692      0t0  TCP *:9092 (LISTEN)
java      25532     kafka   39u  IPv6 14717694      0t0  TCP
localhost.localdomain:45037->localhost.localdomain:13000 (ESTABLISHED)
java      25532     kafka   40u  IPv6 14717698      0t0  TCP
localhost.localdomain:9092->localhost.localdomain:46448 (ESTABLISHED)


After the 5m timeout configured in haproxy is surpassed, the connection
through port 13000 is closed (from kafka.log):

2014-05-08 19:05:40,904 - INFO  [kafka-processor-9092-0:Logging$class@68] -
Closing socket connection to /127.0.0.1.


Looking again at the network sockets, the controller to broker connection
is in a CLOSE_WAIT state:

[r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
java      25532     kafka   18u  IPv6 14717680      0t0  TCP *:44398
(LISTEN)
java      25532     kafka   23u  IPv6 14717684      0t0  TCP
localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
java      25532     kafka   38u  IPv6 14717692      0t0  TCP *:9092 (LISTEN)
java      25532     kafka   39u  IPv6 14717694      0t0  TCP
localhost.localdomain:45037->localhost.localdomain:13000 (CLOSE_WAIT)


This is when attemping to create a topic will error with:
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.

The linux kernel will remove the socket in a CLOSE_WAIT state after the tcp
keepalive expires, which defaults to 2 hours:

[r...@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000
CLOSE-WAIT 1      0        ::ffff:127.0.0.1:45040
::ffff:127.0.0.1:13000   timer:(keepalive,46sec,0)


List of kafka sockets after the controller to broker connection has been
completely removed:

[r...@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
java      25532     kafka   18u  IPv6 14717680      0t0  TCP *:44398
(LISTEN)
java      25532     kafka   23u  IPv6 14717684      0t0  TCP
localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
java      25532     kafka   38u  IPv6 14717692      0t0  TCP *:9092 (LISTEN)


Now when attempting to create a new topic, Kafka detects that the
controller to broker connection is down, reconnects successfully, and is
able to write topic to disk:

2014-05-08 21:02:47,685 - INFO
 [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Partition
state machine on Controller 0]: Invoking state change to OnlinePartition
for partitions
[Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10]
2014-05-08 21:02:47,796 - ERROR
[Controller-0-to-broker-0-send-thread:Logging$class@103] -
[Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
LeaderAndIsr request with correlation id 11 to broker
id:0,host:localhost,port:13000. Reconnecting to broker.
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-05-08 21:02:47,802 - INFO
 [Controller-0-to-broker-0-send-thread:Logging$class@68] -
[Controller-0-to-broker-0-send-thread], Controller 0 connected to
id:0,host:localhost,port:13000 for sending state change requests


It seems that the controller isn't able to properly resolve a connection in
a CLOSE_WAIT state.  The exceptions thrown is different from when the
socket is in a CLOSE_WAIT vs not existing at all.

I can somewhat work around this issue by lowering the kernel tcp keepalive
settings and increasing my haproxy timeouts, but thats not very desirable
and wouldn't work 100% of the time.  I've looked through the broker
configuration documentation, and didn't get any meaningful results changing
controller.socket.timeout.ms.

Any feedback / suggestions would be greatly appreciated.

Thank you,
Robin

Reply via email to