So, it seems that your consume thread was interrupted and therefore the socket channel was closed. Are you using SimpleConsumer?
Thanks, Jun On Wed, Mar 20, 2013 at 9:25 PM, Yonghui Zhao <zhaoyong...@gmail.com> wrote: > Hi Jun, > > I didn't find any error in producer log. > I did another test, first I injected data to kafka server, then stop > producer, and start consumer. > The exception still happened, so the exception is not related with > producer. > > From the log below, it seems consumer exception happened first. > * > Exceptions in consumers:* > > 2013/03/21* 12:07:17.940 *INFO [SimpleConsumer] [] Reconnect in multifetch > due to socket error: > java.nio.channels.ClosedByInterruptException > at > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:201) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:281) > at kafka.utils.Utils$.read(Utils.scala:538) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at > kafka.network.Receive$class.readCompletely(Transmission.scala:55) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > at > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > at > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > > 2013/03/21* 12:07:18.176* INFO [SimpleConsumer] [] Reconnect in multifetch > due to socket error: > java.nio.channels.ClosedByInterruptException > at > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:201) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:281) > at kafka.utils.Utils$.read(Utils.scala:538) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > at > kafka.network.Receive$class.readCompletely(Transmission.scala:55) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > at > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > at > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > > > *Exceptions in kafka server:* > > [2013-03-21 *12:07:18,128*] ERROR Closing socket for /127.0.0.1 because > of > error (kafka.network.Processor) > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > at > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456) > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557) > at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102) > at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53) > at kafka.network.MultiSend.writeTo(Transmission.scala:91) > at kafka.network.Processor.write(SocketServer.scala:339) > at kafka.network.Processor.run(SocketServer.scala:216) > at java.lang.Thread.run(Thread.java:679) > [2013-03-21 *12:07:19,263*] INFO Socket connection established to > localhost/ > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) > [2013-03-21* 12:07:18,055*] ERROR Closing socket for /127.0.0.1 because > of > error (kafka.network.Processor) > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcher.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:122) > at sun.nio.ch.IOUtil.write(IOUtil.java:93) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:352) > at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51) > at kafka.network.MultiSend.writeTo(Transmission.scala:91) > at kafka.network.Processor.write(SocketServer.scala:339) > at kafka.network.Processor.run(SocketServer.scala:216) > at java.lang.Thread.run(Thread.java:679) > > * > * > > 2013/3/20 Jun Rao <jun...@gmail.com> > > > "Connect reset by peer" means the other side of the socket has closed the > > connection for some reason. Could you provide the error/exception in both > > the producer and the broker when a produce request fails? > > > > Thanks, > > > > Jun > > > > On Tue, Mar 19, 2013 at 1:34 AM, Yonghui Zhao <zhaoyong...@gmail.com> > > wrote: > > > > > Connection reset exception reproed. > > > > > > [2013-03-19 16:30:45,814] INFO Closing socket connection to /127.0.0.1 > . > > > (kafka.network.Processor) > > > [2013-03-19 16:30:55,253] ERROR Closing socket for /127.0.0.1 because > of > > > error (kafka.network.Processor) > > > java.io.IOException: Connection reset by peer > > > at sun.nio.ch.FileDispatcher.read0(Native Method) > > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) > > > at sun.nio.ch.IOUtil.read(IOUtil.java:224) > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) > > > at kafka.utils.Utils$.read(Utils.scala:538) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > > at kafka.network.Processor.read(SocketServer.scala:311) > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > at java.lang.Thread.run(Thread.java:679) > > > [2013-03-19 16:31:02,476] ERROR Closing socket for /127.0.0.1 because > of > > > error (kafka.network.Processor) > > > java.io.IOException: Connection reset by peer > > > at sun.nio.ch.FileDispatcher.read0(Native Method) > > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) > > > at sun.nio.ch.IOUtil.read(IOUtil.java:224) > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) > > > at kafka.utils.Utils$.read(Utils.scala:538) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > > at kafka.network.Processor.read(SocketServer.scala:311) > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > at java.lang.Thread.run(Thread.java:679) > > > > > > > > > 2013/3/19 Yonghui Zhao <zhaoyong...@gmail.com> > > > > > > > Thanks Jun. > > > > > > > > Now I use onebox to test kafka, kafka server ip on zk is 127.0.0.1, > > > > network is not affected by external factors. > > > > > > > > Reset connection is not reproed, but I still find Broken pipe > > exceptions > > > > and a few zk exceptions. > > > > > > > > [2013-03-19 15:23:28,660] INFO Closed socket connection for client / > > > > 127.0.0.1:51902 which had sessionid 0x13d8152007b002c > > > > (org.apache.zookeeper.server.NIOServerCnxn) > > > > [2013-03-19 15:23:28,672] ERROR Unexpected Exception: > > > > (org.apache.zookeeper.server.NIOServerCnxn) > > > > java.nio.channels.CancelledKeyException > > > > at > > sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) > > > > at > > sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77) > > > > at > > > > > > > > > > org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:418) > > > > at > > > > > > > > > > org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1509) > > > > at > > > > > > > > > > org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:171) > > > > at > > > > > > > > > > org.apache.zookeeper.server.SyncRequestProcessor.run(SyncRequestProcessor.java:135) > > > > > > > > > > > > > > > > [2013-03-19 15:15:58,355] INFO Closing socket connection to / > 127.0.0.1 > > . > > > > (kafka.network.Processor) > > > > [2013-03-19 15:16:00,161] INFO Closing socket connection to / > 127.0.0.1 > > . > > > > (kafka.network.Processor) > > > > [2013-03-19 15:16:01,784] INFO Closing socket connection to / > 127.0.0.1 > > . > > > > (kafka.network.Processor) > > > > [2013-03-19 15:16:04,751] INFO Closing socket connection to / > 127.0.0.1 > > . > > > > (kafka.network.Processor) > > > > [2013-03-19 15:16:07,734] ERROR Closing socket for /127.0.0.1because > > of > > > > error (kafka.network.Processor) > > > > java.io.IOException: Broken pipe > > > > > > > > at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > > > > at > > > > > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456) > > > > at > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557) > > > > at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102) > > > > at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53) > > > > at kafka.network.MultiSend.writeTo(Transmission.scala:91) > > > > at kafka.network.Processor.write(SocketServer.scala:339) > > > > at kafka.network.Processor.run(SocketServer.scala:216) > > > > at java.lang.Thread.run(Thread.java:679) > > > > > > > > > > > > 2013/3/19 Jun Rao <jun...@gmail.com> > > > > > > > >> The error you saw on the broker is for consumer requests, not for > > > >> producer. > > > >> For the issues in the producer, are you using a VIP? Is there any > > > firewall > > > >> btw producer and broker? The typical issues with "connection reset" > > that > > > >> we > > > >> have seen are caused by the load balancer or the firewall killing > idle > > > >> connections. > > > >> > > > >> Thanks, > > > >> > > > >> Jun > > > >> > > > >> On Sun, Mar 17, 2013 at 8:24 PM, Yonghui Zhao < > zhaoyong...@gmail.com> > > > >> wrote: > > > >> > > > >> > In kafka 0.7.2, I use a producer to send 200 million message to > > kafka > > > >> > server. > > > >> > > > > >> > After sent 100 million this exception happend: > > > >> > > > > >> > In producer: > > > >> > > > > >> > Exception in thread "main" java.io.IOException: Connection reset > by > > > peer > > > >> > at sun.nio.ch.FileDispatcher.writev0(Native Method) > > > >> > at > sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) > > > >> > at sun.nio.ch.IOUtil.write(IOUtil.java:182) > > > >> > at > > sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:383) > > > >> > at > > sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:406) > > > >> > at > java.nio.channels.SocketChannel.write(SocketChannel.java:384) > > > >> > at > > > >> > > > > >> > > > > > > kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:49) > > > >> > at > > kafka.network.Send$class.writeCompletely(Transmission.scala:73) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:25) > > > >> > at > > > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:95) > > > >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:94) > > > >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:125) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114) > > > >> > at > > > >> > > > > >> > > > > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100) > > > >> > at > > > >> > > > > >> > > > > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100) > > > >> > at > > > >> > > > > >> > > > > >> > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > >> > at > > > >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > >> > at kafka.producer.ProducerPool.send(ProducerPool.scala:100) > > > >> > at kafka.producer.Producer.zkSend(Producer.scala:137) > > > >> > at kafka.producer.Producer.send(Producer.scala:99) > > > >> > at kafka.javaapi.producer.Producer.send(Producer.scala:103) > > > >> > > > > >> > > > > >> > In kafka server: > > > >> > > > > >> > [2013-03-16 06:59:49,491] ERROR Closing socket for > > > /10.2.201.201because > > > >> > of > > > >> > error (kafka.network.Processor) > > > >> > java.io.IOException: Connection reset by peer > > > >> > at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > > > >> > at > > > >> > > > > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456) > > > >> > at > > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557) > > > >> > at > > kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102) > > > >> > at > kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53) > > > >> > at kafka.network.MultiSend.writeTo(Transmission.scala:91) > > > >> > at kafka.network.Processor.write(SocketServer.scala:339) > > > >> > at kafka.network.Processor.run(SocketServer.scala:216) > > > >> > at java.lang.Thread.run(Thread.java:679) > > > >> > > > > >> > Have you ever seen this exception before, what's the root cause? > > > Thanks > > > >> > > > > >> > > > > > > > > > > > > > >