thanks Jun!Will tune our GC setting. Sent from my iPad
在 2013-3-22,23:05,Jun Rao <jun...@gmail.com> 写道: > A typical reason for many rebalancing is the consumer side GC. If so, you > will see logs in the consume saying sth like "expired session" for ZK. > Occasional rebalances are fine. Too many rebalances can slow down the > consumption and you will need to tune your GC setting. > > Thanks, > > Jun > > On Thu, Mar 21, 2013 at 11:07 PM, Yonghui Zhao <zhaoyong...@gmail.com>wrote: > >> Yes, before consumer exception: >> >> 2013/03/21 12:07:17.909 INFO [ZookeeperConsumerConnector] [] >> 0_lg-mc-db01.bj-1363784482043-f98c7868 *end rebalancing >> consumer*0_lg-mc-db01.bj-1363784482043-f98c7868 try #0 >> 2013/03/21 12:07:17.911 INFO [ZookeeperConsumerConnector] [] >> 0_lg-mc-db01.bj-1363784482043-f98c7868 *begin rebalancing >> consumer*0_lg-mc-db01.bj-1363784482043-f98c7868 try #0 >> 2013/03/21 12:07:17.934 INFO [FetcherRunnable] [] FetchRunnable-0 start >> fetching topic: sms part: 0 offset: 43667888259 from 127.0.0.1:9093 >> 2013/03/21 12:07:17.940 INFO [SimpleConsumer] [] Reconnect in multifetch >> due to socket error: >> java.nio.channels.*ClosedByInterruptException* >> at java.nio.channels.spi.*AbstractInterruptibleChannel* >> .end(AbstractInterruptibleChannel.java:201) >> >> >> 2013/03/21 12:07:17.978 INFO [ZookeeperConsumerConnector] [] >> 0_lg-mc-db01.bj-1363784482043-f98c7868 *end rebalancing >> consumer*0_lg-mc-db01.bj-1363784482043-f98c7868 try #0 >> 2013/03/21 12:07:18.004 INFO [FetcherRunnable] [] FetchRunnable-0 start >> fetching topic: sms part: 0 offset: 43667888259 from 127.0.0.1:9093 >> 2013/03/21 12:07:18.066 INFO [ZookeeperConsumerConnector] [] >> 0_lg-mc-db01.bj-1363784482043-f98c7868 *begin rebalancing consume*r >> 0_lg-mc-db01.bj-1363784482043-f98c7868 try #0 >> 2013/03/21 12:07:18.176 INFO [SimpleConsumer] [] Reconnect in multifetch >> due to socket error: >> java.nio.channels.*ClosedByInterruptException* >> at java.nio.channels.spi.*AbstractInterruptibleChannel* >> .end(AbstractInterruptibleChannel.java:201) >> >> >> So you think it is normal? How can we avoid this exception? >> >> I used 4 partitions in kafka, use only 1 partition? >> >> >> >> 2013/3/22 Jun Rao <jun...@gmail.com> >> >>> Do you see any rebalances in the consumer? Each rebalance will interrupt >>> existing fetcher threads first. >>> >>> Thanks, >>> >>> Jun >>> >>> On Thu, Mar 21, 2013 at 9:40 PM, Yonghui Zhao <zhaoyong...@gmail.com> >>> wrote: >>> >>>> The application won't shut down the consumer connector. The consumer >> is >>>> always alive. >>>> >>>> 2013/3/22 Jun Rao <jun...@gmail.com> >>>> >>>>> If you use the high level consumer, normally >> ClosedByInterruptException >>>>> happens because the application calls shutdown on the consumer >>> connector. >>>>> Is that the case? >>>>> >>>>> Thanks, >>>>> >>>>> Jun >>>>> >>>>> On Thu, Mar 21, 2013 at 8:38 PM, Yonghui Zhao <zhaoyong...@gmail.com >>> >>>>> wrote: >>>>> >>>>>> No, I use java consumer connector, and set 10 seconds timeout. >>>>>> >>>>>> ConsumerConfig consumerConfig = new ConsumerConfig(props); >>>>>> _consumerConnector = >>>>>> Consumer.createJavaConsumerConnector(consumerConfig); >>>>>> Map<String, Integer> topicCountMap = new HashMap<String, >>> Integer>(); >>>>>> topicCountMap.put(_topic, 1); >>>>>> Map<String, List<KafkaStream<Message>>> topicMessageStreams = >>>>>> _consumerConnector >>>>>> .createMessageStreams(topicCountMap); >>>>>> List<KafkaStream<Message>> streams = >>>> topicMessageStreams.get(_topic); >>>>>> KafkaStream<Message> KafkaStream = streams.iterator().next(); >>>>>> _consumerIterator = KafkaStream.iterator(); >>>>>> >>>>>> 2013/3/21 Jun Rao <jun...@gmail.com> >>>>>> >>>>>>> So, it seems that your consume thread was interrupted and >> therefore >>>> the >>>>>>> socket channel was closed. Are you using SimpleConsumer? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jun >>>>>>> >>>>>>> On Wed, Mar 20, 2013 at 9:25 PM, Yonghui Zhao < >>> zhaoyong...@gmail.com >>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jun, >>>>>>>> >>>>>>>> I didn't find any error in producer log. >>>>>>>> I did another test, first I injected data to kafka server, >> then >>>> stop >>>>>>>> producer, and start consumer. >>>>>>>> The exception still happened, so the exception is not related >>> with >>>>>>>> producer. >>>>>>>> >>>>>>>> From the log below, it seems consumer exception happened >> first. >>>>>>>> * >>>>>>>> Exceptions in consumers:* >>>>>>>> >>>>>>>> 2013/03/21* 12:07:17.940 *INFO [SimpleConsumer] [] Reconnect in >>>>>>> multifetch >>>>>>>> due to socket error: >>>>>>>> java.nio.channels.ClosedByInterruptException >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:201) >>>>>>>> at >>>>>> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:281) >>>>>>>> at kafka.utils.Utils$.read(Utils.scala:538) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >>>>>>>> at >>>>>>>> >> kafka.network.Receive$class.readCompletely(Transmission.scala:55) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >>>>>>>> at >>>>>>>> >>> kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) >>>>>>>> at >>>>>>>> >>>> kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) >>>>>>>> at >>>>>>>> >>> kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) >>>>>>>> at >>>>> kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) >>>>>>>> >>>>>>>> 2013/03/21* 12:07:18.176* INFO [SimpleConsumer] [] Reconnect in >>>>>>> multifetch >>>>>>>> due to socket error: >>>>>>>> java.nio.channels.ClosedByInterruptException >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:201) >>>>>>>> at >>>>>> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:281) >>>>>>>> at kafka.utils.Utils$.read(Utils.scala:538) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) >>>>>>>> at >>>>>>>> >> kafka.network.Receive$class.readCompletely(Transmission.scala:55) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >>>>>>>> at >>>>>>>> >>> kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) >>>>>>>> at >>>>>>>> >>>> kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) >>>>>>>> at >>>>>>>> >>> kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) >>>>>>>> at >>>>> kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) >>>>>>>> >>>>>>>> >>>>>>>> *Exceptions in kafka server:* >>>>>>>> >>>>>>>> [2013-03-21 *12:07:18,128*] ERROR Closing socket for >>>>> /127.0.0.1because >>>>>>>> of >>>>>>>> error (kafka.network.Processor) >>>>>>>> java.io.IOException: Connection reset by peer >>>>>>>> at sun.nio.ch.FileChannelImpl.transferTo0(Native >> Method) >>>>>>>> at >>>>>>>> >>>>> >> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456) >>>>>>>> at >>>>>>> sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557) >>>>>>>> at >>>>>> kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102) >>>>>>>> at >>>>> kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53) >>>>>>>> at >> kafka.network.MultiSend.writeTo(Transmission.scala:91) >>>>>>>> at >> kafka.network.Processor.write(SocketServer.scala:339) >>>>>>>> at kafka.network.Processor.run(SocketServer.scala:216) >>>>>>>> at java.lang.Thread.run(Thread.java:679) >>>>>>>> [2013-03-21 *12:07:19,263*] INFO Socket connection established >> to >>>>>>>> localhost/ >>>>>>>> 127.0.0.1:2181, initiating session >>>> (org.apache.zookeeper.ClientCnxn) >>>>>>>> [2013-03-21* 12:07:18,055*] ERROR Closing socket for >>>>> /127.0.0.1because >>>>>>>> of >>>>>>>> error (kafka.network.Processor) >>>>>>>> java.io.IOException: Broken pipe >>>>>>>> at sun.nio.ch.FileDispatcher.write0(Native Method) >>>>>>>> at >>>>> sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) >>>>>>>> at >>> sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:122) >>>>>>>> at sun.nio.ch.IOUtil.write(IOUtil.java:93) >>>>>>>> at >>>>>> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:352) >>>>>>>> at >>>>> kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51) >>>>>>>> at >> kafka.network.MultiSend.writeTo(Transmission.scala:91) >>>>>>>> at >> kafka.network.Processor.write(SocketServer.scala:339) >>>>>>>> at kafka.network.Processor.run(SocketServer.scala:216) >>>>>>>> at java.lang.Thread.run(Thread.java:679) >>>>>>>> >>>>>>>> * >>>>>>>> * >>>>>>>> >>>>>>>> 2013/3/20 Jun Rao <jun...@gmail.com> >>>>>>>> >>>>>>>>> "Connect reset by peer" means the other side of the socket >> has >>>>> closed >>>>>>> the >>>>>>>>> connection for some reason. Could you provide the >>> error/exception >>>>> in >>>>>>> both >>>>>>>>> the producer and the broker when a produce request fails? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jun >>>>>>>>> >>>>>>>>> On Tue, Mar 19, 2013 at 1:34 AM, Yonghui Zhao < >>>>> zhaoyong...@gmail.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Connection reset exception reproed. >>>>>>>>>> >>>>>>>>>> [2013-03-19 16:30:45,814] INFO Closing socket connection >> to / >>>>>>> 127.0.0.1 >>>>>>>> . >>>>>>>>>> (kafka.network.Processor) >>>>>>>>>> [2013-03-19 16:30:55,253] ERROR Closing socket for >>>>>> /127.0.0.1because >>>>>>>> of >>>>>>>>>> error (kafka.network.Processor) >>>>>>>>>> java.io.IOException: Connection reset by peer >>>>>>>>>> at sun.nio.ch.FileDispatcher.read0(Native Method) >>>>>>>>>> at >>>> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) >>>>>>>>>> at >>> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) >>>>>>>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:224) >>>>>>>>>> at >>>>>> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) >>>>>>>>>> at kafka.utils.Utils$.read(Utils.scala:538) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >>>>>>>>>> at kafka.network.Processor.read(SocketServer.scala:311) >>>>>>>>>> at kafka.network.Processor.run(SocketServer.scala:214) >>>>>>>>>> at java.lang.Thread.run(Thread.java:679) >>>>>>>>>> [2013-03-19 16:31:02,476] ERROR Closing socket for >>>>>> /127.0.0.1because >>>>>>>> of >>>>>>>>>> error (kafka.network.Processor) >>>>>>>>>> java.io.IOException: Connection reset by peer >>>>>>>>>> at sun.nio.ch.FileDispatcher.read0(Native Method) >>>>>>>>>> at >>>> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) >>>>>>>>>> at >>> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) >>>>>>>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:224) >>>>>>>>>> at >>>>>> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) >>>>>>>>>> at kafka.utils.Utils$.read(Utils.scala:538) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >>>>>>>>>> at kafka.network.Processor.read(SocketServer.scala:311) >>>>>>>>>> at kafka.network.Processor.run(SocketServer.scala:214) >>>>>>>>>> at java.lang.Thread.run(Thread.java:679) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2013/3/19 Yonghui Zhao <zhaoyong...@gmail.com> >>>>>>>>>> >>>>>>>>>>> Thanks Jun. >>>>>>>>>>> >>>>>>>>>>> Now I use onebox to test kafka, kafka server ip on zk is >>>>>> 127.0.0.1, >>>>>>>>>>> network is not affected by external factors. >>>>>>>>>>> >>>>>>>>>>> Reset connection is not reproed, but I still find Broken >>> pipe >>>>>>>>> exceptions >>>>>>>>>>> and a few zk exceptions. >>>>>>>>>>> >>>>>>>>>>> [2013-03-19 15:23:28,660] INFO Closed socket connection >> for >>>>>> client >>>>>>> / >>>>>>>>>>> 127.0.0.1:51902 which had sessionid 0x13d8152007b002c >>>>>>>>>>> (org.apache.zookeeper.server.NIOServerCnxn) >>>>>>>>>>> [2013-03-19 15:23:28,672] ERROR Unexpected Exception: >>>>>>>>>>> (org.apache.zookeeper.server.NIOServerCnxn) >>>>>>>>>>> java.nio.channels.CancelledKeyException >>>>>>>>>>> at >>>>>>>>> >>> sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) >>>>>>>>>>> at >>>>>>>>> >>> sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:418) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1509) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:171) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.zookeeper.server.SyncRequestProcessor.run(SyncRequestProcessor.java:135) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> [2013-03-19 15:15:58,355] INFO Closing socket connection >>> to / >>>>>>>> 127.0.0.1 >>>>>>>>> . >>>>>>>>>>> (kafka.network.Processor) >>>>>>>>>>> [2013-03-19 15:16:00,161] INFO Closing socket connection >>> to / >>>>>>>> 127.0.0.1 >>>>>>>>> . >>>>>>>>>>> (kafka.network.Processor) >>>>>>>>>>> [2013-03-19 15:16:01,784] INFO Closing socket connection >>> to / >>>>>>>> 127.0.0.1 >>>>>>>>> . >>>>>>>>>>> (kafka.network.Processor) >>>>>>>>>>> [2013-03-19 15:16:04,751] INFO Closing socket connection >>> to / >>>>>>>> 127.0.0.1 >>>>>>>>> . >>>>>>>>>>> (kafka.network.Processor) >>>>>>>>>>> [2013-03-19 15:16:07,734] ERROR Closing socket for >>>>>>> /127.0.0.1because >>>>>>>>> of >>>>>>>>>>> error (kafka.network.Processor) >>>>>>>>>>> java.io.IOException: Broken pipe >>>>>>>>>>> >>>>>>>>>>> at sun.nio.ch.FileChannelImpl.transferTo0(Native >>> Method) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>> >>>>> >> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456) >>>>>>>>>>> at >>>>>>>> sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557) >>>>>>>>>>> at >>>>>>> kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102) >>>>>>>>>>> at >>>>>> kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53) >>>>>>>>>>> at >>> kafka.network.MultiSend.writeTo(Transmission.scala:91) >>>>>>>>>>> at >>> kafka.network.Processor.write(SocketServer.scala:339) >>>>>>>>>>> at >> kafka.network.Processor.run(SocketServer.scala:216) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:679) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2013/3/19 Jun Rao <jun...@gmail.com> >>>>>>>>>>> >>>>>>>>>>>> The error you saw on the broker is for consumer >> requests, >>>> not >>>>>> for >>>>>>>>>>>> producer. >>>>>>>>>>>> For the issues in the producer, are you using a VIP? Is >>>> there >>>>>> any >>>>>>>>>> firewall >>>>>>>>>>>> btw producer and broker? The typical issues with >>> "connection >>>>>>> reset" >>>>>>>>> that >>>>>>>>>>>> we >>>>>>>>>>>> have seen are caused by the load balancer or the >> firewall >>>>>> killing >>>>>>>> idle >>>>>>>>>>>> connections. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>> Jun >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Mar 17, 2013 at 8:24 PM, Yonghui Zhao < >>>>>>>> zhaoyong...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> In kafka 0.7.2, I use a producer to send 200 million >>>> message >>>>>> to >>>>>>>>> kafka >>>>>>>>>>>>> server. >>>>>>>>>>>>> >>>>>>>>>>>>> After sent 100 million this exception happend: >>>>>>>>>>>>> >>>>>>>>>>>>> In producer: >>>>>>>>>>>>> >>>>>>>>>>>>> Exception in thread "main" java.io.IOException: >>> Connection >>>>>> reset >>>>>>>> by >>>>>>>>>> peer >>>>>>>>>>>>> at sun.nio.ch.FileDispatcher.writev0(Native >> Method) >>>>>>>>>>>>> at >>>>>>>> sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) >>>>>>>>>>>>> at sun.nio.ch.IOUtil.write(IOUtil.java:182) >>>>>>>>>>>>> at >>>>>>>>> >> sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:383) >>>>>>>>>>>>> at >>>>>>>>> >> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:406) >>>>>>>>>>>>> at >>>>>>>> java.nio.channels.SocketChannel.write(SocketChannel.java:384) >>>>>>>>>>>>> at >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:49) >>>>>>>>>>>>> at >>>>>>>>> >> kafka.network.Send$class.writeCompletely(Transmission.scala:73) >>>>>>>>>>>>> at >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:25) >>>>>>>>>>>>> at >>>>>>>>>> >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:95) >>>>>>>>>>>>> at >>>>> kafka.producer.SyncProducer.send(SyncProducer.scala:94) >>>>>>>>>>>>> at >>>>>> kafka.producer.SyncProducer.send(SyncProducer.scala:125) >>>>>>>>>>>>> at >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114) >>>>>>>>>>>>> at >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100) >>>>>>>>>>>>> at >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100) >>>>>>>>>>>>> at >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) >>>>>>>>>>>>> at >>>>>>>>>>>> >>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) >>>>>>>>>>>>> at >>>>>> kafka.producer.ProducerPool.send(ProducerPool.scala:100) >>>>>>>>>>>>> at >>> kafka.producer.Producer.zkSend(Producer.scala:137) >>>>>>>>>>>>> at kafka.producer.Producer.send(Producer.scala:99) >>>>>>>>>>>>> at >>>>>> kafka.javaapi.producer.Producer.send(Producer.scala:103) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> In kafka server: >>>>>>>>>>>>> >>>>>>>>>>>>> [2013-03-16 06:59:49,491] ERROR Closing socket for >>>>>>>>>> /10.2.201.201because >>>>>>>>>>>>> of >>>>>>>>>>>>> error (kafka.network.Processor) >>>>>>>>>>>>> java.io.IOException: Connection reset by peer >>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.transferTo0(Native >>>> Method) >>>>>>>>>>>>> at >>>>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456) >>>>>>>>>>>>> at >>>>>>>>> >> sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557) >>>>>>>>>>>>> at >>>>>>>>> >> kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102) >>>>>>>>>>>>> at >>>>>>>> kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53) >>>>>>>>>>>>> at >>>>> kafka.network.MultiSend.writeTo(Transmission.scala:91) >>>>>>>>>>>>> at >>>> kafka.network.Processor.write(SocketServer.scala:339) >>>>>>>>>>>>> at >>> kafka.network.Processor.run(SocketServer.scala:216) >>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:679) >>>>>>>>>>>>> >>>>>>>>>>>>> Have you ever seen this exception before, what's the >>> root >>>>>> cause? >>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>