Hi Even, and Jay, I am not saying this is logging issue (logging is absolutely needed and can not turn off ERROR or WARN level in any env). What I am saying is there is TCP connection is establish and leaked till other side close it or producer is close or JVM exits.
Imagine you have l"TCP Routing Proxy" like you described to divert to appropriate Kafka Cluster in cloud / per DC or nearest DC. You want to divert the producer first request nearest DC cluster (you can do this via DNS routing but expensive to deal with configuration). Instead, you change or define rule in proxy. I think desire behavior would be if producer first discover brokers topology and advertise ips and ports, and it does not match with seed or bootstarp host:port, then producer close the active first TCP connection to bootstrap or seed server. Would you agree with this approach and behavior ? Thanks, Bhavesh On Wed, Feb 11, 2015 at 12:13 AM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > Agree with Jay. It's unfortunate that this gets logged because in this case > it's just noise, but this is an exception that can happen both in > potentially bad cases (remote peer closed connection forcibly with > outstanding unprocessed data) or in normal cases that aren't problematic > (TCP connection timeout). I'm pretty sure some load balancers, e.g. > HAProxy, disable socket lingering to avoid time-wait (i.e. they send a RST > even when they could use a FIN), which helps them avoid socket starvation. > > I think the generalized bug this is an instance of is that we're relying on > timeouts in lower layers, like TCP timeouts, to clean up after us. Ideally > anything that might trigger a timeout in a lower layer could, with the > correct settings, be caught and cleaned up earlier by Kafka. This means > adding timeouts on resources managed by Kafka, such as TCP connections to > brokers as KAFKA-1941 suggests. > > > > On Tue, Feb 10, 2015 at 1:45 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > I don't think this is a bug. Currently we don't support timing out > > connections in the clients, which would be a good feature to add. As a > > result the connection remains until the LB kills it. When that happens > you > > get a message logged that the connection was unexpectedly closed, which I > > think is what should happen (you can disable the logging in log4j if you > > don't want it). > > > > It would be nice to implement a client-side connection LRU for unused > > connections. I filed a ticket to track this: > > https://issues.apache.org/jira/browse/KAFKA-1941 > > > > -Jay > > > > On Tue, Feb 10, 2015 at 11:33 AM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com > > > wrote: > > > > > HI Ewen, > > > > > > The root of the problem is leak of TCP connection which idle for while. > > It > > > is just a log message as you mentioned, but suppose you have 50 or > more > > > producer instances created by application and everyone of then will > print > > > above log that becomes little concern. > > > > > > We configured producer with bootstrap list as "LB:port1" and it is set > > to > > > TCP port forward to "broker:port2". When producer fetches Cluster > > Metadata > > > and discovers that TCP connection "LB:port1" is not part of "broker > > > cluster or topology", it should close connection to "LB:port1" (In my > > > opinion, this would be expected behavior). > > > > > > As you mentioned, producer behavior is normal and this error is > harmless. > > > If you consider this as a bug, please let me know and I will file jira > > > ticket for this. > > > > > > We are on non-release 0.8.2 from trunk. > > > > > > Thanks, > > > > > > Bhavesh > > > > > > > > > > > > Thanks, > > > > > > Bhavesh > > > > > > On Tue, Feb 10, 2015 at 12:29 AM, Ewen Cheslack-Postava < > > e...@confluent.io > > > > > > > wrote: > > > > > > > Bhavesh, > > > > > > > > I'm unclear what the impact is here. The line numbers don't match up > > > > exactly with trunk or 0.8.2.0, but it looks like this exception is > just > > > > caught and logged. As far as I can tell the producer would continue > to > > > > function normally. Does this have any impact on the producer or is > the > > > > concern just that the exception is being logged? > > > > > > > > On Mon, Feb 9, 2015 at 11:21 PM, Bhavesh Mistry < > > > > mistry.p.bhav...@gmail.com> > > > > wrote: > > > > > > > > > HI Kafka Team, > > > > > > > > > > Please confirm if you would like to open Jira issue to track this ? > > > > > > > > > > Thanks, > > > > > > > > > > Bhavesh > > > > > > > > > > On Mon, Feb 9, 2015 at 12:39 PM, Bhavesh Mistry < > > > > > mistry.p.bhav...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Kakfa Team, > > > > > > > > > > > > We are getting this connection reset by pears after couple of > > minute > > > > > aster > > > > > > start-up of producer due to infrastructure deployment strategies > we > > > > have > > > > > > adopted from LinkedIn. > > > > > > > > > > > > We have LB hostname and port as seed server, and all producers > are > > > > > getting > > > > > > following exception because of TCP idle connection timeout set on > > LB > > > > > (which > > > > > > is 2 minutes and Kafka TCP connection idle is set to 10 minutes). > > > > This > > > > > > seems to be minor bug to close TCP connection after discovering > > that > > > > > seed > > > > > > server is not part of brokers list immediately. > > > > > > > > > > > > > > > > > > java.io.IOException: Connection reset by peer > > > > > > at sun.nio.ch.FileDispatcher.read0(Native Method) > > > > > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) > > > > > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) > > > > > > at sun.nio.ch.IOUtil.read(IOUtil.java:171) > > > > > > at > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) > > > > > > at > > > org.apache.kafka.common.network.Selector.poll(Selector.java:242) > > > > > > at > > > > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) > > > > > > at > > > > > > > > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) > > > > > > at > > > > > > > > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) > > > > > > at java.lang.Thread.run(Thread.java:662) > > > > > > java.io.IOException: Connection reset by peer > > > > > > at sun.nio.ch.FileDispatcher.read0(Native Method) > > > > > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) > > > > > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) > > > > > > at sun.nio.ch.IOUtil.read(IOUtil.java:171) > > > > > > at > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) > > > > > > at > > > org.apache.kafka.common.network.Selector.poll(Selector.java:242) > > > > > > at > > > > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) > > > > > > at > > > > > > > > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) > > > > > > at > > > > > > > > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) > > > > > > at java.lang.Thread.run(Thread.java:662) > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Bhavesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Ewen > > > > > > > > > > > > > -- > Thanks, > Ewen >