No,  I use java consumer connector, and set 10 seconds timeout.

   ConsumerConfig consumerConfig = new ConsumerConfig(props);
    _consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
   Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(_topic, 1);
    Map<String, List<KafkaStream<Message>>> topicMessageStreams =
_consumerConnector
        .createMessageStreams(topicCountMap);
    List<KafkaStream<Message>> streams = topicMessageStreams.get(_topic);
    KafkaStream<Message> KafkaStream = streams.iterator().next();
    _consumerIterator = KafkaStream.iterator();

2013/3/21 Jun Rao <jun...@gmail.com>

> So, it seems that your consume thread was interrupted and therefore the
> socket channel was closed. Are you using SimpleConsumer?
>
> Thanks,
>
> Jun
>
> On Wed, Mar 20, 2013 at 9:25 PM, Yonghui Zhao <zhaoyong...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > I didn't find any error in producer log.
> > I did another test,  first I injected data to kafka server, then stop
> > producer, and start consumer.
> > The exception still happened, so the exception is not related with
> > producer.
> >
> > From the log below,  it seems consumer exception happened first.
> > *
> > Exceptions in consumers:*
> >
> > 2013/03/21* 12:07:17.940 *INFO [SimpleConsumer] [] Reconnect in
> multifetch
> > due to socket error:
> > java.nio.channels.ClosedByInterruptException
> >         at
> >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:201)
> >         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:281)
> >         at kafka.utils.Utils$.read(Utils.scala:538)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >         at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >         at
> > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
> >         at
> > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
> >         at
> > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
> >         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
> >
> > 2013/03/21* 12:07:18.176* INFO [SimpleConsumer] [] Reconnect in
> multifetch
> > due to socket error:
> > java.nio.channels.ClosedByInterruptException
> >         at
> >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:201)
> >         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:281)
> >         at kafka.utils.Utils$.read(Utils.scala:538)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> >         at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >         at
> > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
> >         at
> > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
> >         at
> > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
> >         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
> >
> >
> > *Exceptions in kafka server:*
> >
> >  [2013-03-21 *12:07:18,128*] ERROR Closing socket for /127.0.0.1 because
> > of
> > error (kafka.network.Processor)
> >  java.io.IOException: Connection reset by peer
> >         at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> >         at
> > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456)
> >         at
> sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557)
> >         at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102)
> >         at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53)
> >         at kafka.network.MultiSend.writeTo(Transmission.scala:91)
> >         at kafka.network.Processor.write(SocketServer.scala:339)
> >         at kafka.network.Processor.run(SocketServer.scala:216)
> >         at java.lang.Thread.run(Thread.java:679)
> > [2013-03-21 *12:07:19,263*] INFO Socket connection established to
> > localhost/
> > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> >  [2013-03-21* 12:07:18,055*] ERROR Closing socket for /127.0.0.1 because
> > of
> > error (kafka.network.Processor)
> >  java.io.IOException: Broken pipe
> >         at sun.nio.ch.FileDispatcher.write0(Native Method)
> >         at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> >         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:122)
> >         at sun.nio.ch.IOUtil.write(IOUtil.java:93)
> >         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:352)
> >         at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
> >         at kafka.network.MultiSend.writeTo(Transmission.scala:91)
> >         at kafka.network.Processor.write(SocketServer.scala:339)
> >         at kafka.network.Processor.run(SocketServer.scala:216)
> >         at java.lang.Thread.run(Thread.java:679)
> >
> > *
> > *
> >
> > 2013/3/20 Jun Rao <jun...@gmail.com>
> >
> > > "Connect reset by peer" means the other side of the socket has closed
> the
> > > connection for some reason. Could you provide the error/exception in
> both
> > > the producer and the broker when a produce request fails?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Mar 19, 2013 at 1:34 AM, Yonghui Zhao <zhaoyong...@gmail.com>
> > > wrote:
> > >
> > > >  Connection reset exception reproed.
> > > >
> > > > [2013-03-19 16:30:45,814] INFO Closing socket connection to /
> 127.0.0.1
> > .
> > > > (kafka.network.Processor)
> > > > [2013-03-19 16:30:55,253] ERROR Closing socket for /127.0.0.1because
> > of
> > > > error (kafka.network.Processor)
> > > > java.io.IOException: Connection reset by peer
> > > >     at sun.nio.ch.FileDispatcher.read0(Native Method)
> > > >     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > >     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
> > > >     at sun.nio.ch.IOUtil.read(IOUtil.java:224)
> > > >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
> > > >     at kafka.utils.Utils$.read(Utils.scala:538)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > >     at kafka.network.Processor.read(SocketServer.scala:311)
> > > >     at kafka.network.Processor.run(SocketServer.scala:214)
> > > >     at java.lang.Thread.run(Thread.java:679)
> > > > [2013-03-19 16:31:02,476] ERROR Closing socket for /127.0.0.1because
> > of
> > > > error (kafka.network.Processor)
> > > > java.io.IOException: Connection reset by peer
> > > >     at sun.nio.ch.FileDispatcher.read0(Native Method)
> > > >     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > >     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
> > > >     at sun.nio.ch.IOUtil.read(IOUtil.java:224)
> > > >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
> > > >     at kafka.utils.Utils$.read(Utils.scala:538)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > >     at kafka.network.Processor.read(SocketServer.scala:311)
> > > >     at kafka.network.Processor.run(SocketServer.scala:214)
> > > >     at java.lang.Thread.run(Thread.java:679)
> > > >
> > > >
> > > > 2013/3/19 Yonghui Zhao <zhaoyong...@gmail.com>
> > > >
> > > > > Thanks Jun.
> > > > >
> > > > > Now I use onebox to test kafka, kafka server ip on zk is 127.0.0.1,
> > > > > network is not affected by external factors.
> > > > >
> > > > > Reset connection is not reproed, but I still find Broken pipe
> > > exceptions
> > > > > and a few zk exceptions.
> > > > >
> > > > > [2013-03-19 15:23:28,660] INFO Closed socket connection for client
> /
> > > > > 127.0.0.1:51902 which had sessionid 0x13d8152007b002c
> > > > > (org.apache.zookeeper.server.NIOServerCnxn)
> > > > > [2013-03-19 15:23:28,672] ERROR Unexpected Exception:
> > > > > (org.apache.zookeeper.server.NIOServerCnxn)
> > > > > java.nio.channels.CancelledKeyException
> > > > >     at
> > > sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
> > > > >     at
> > > sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
> > > > >     at
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:418)
> > > > >     at
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1509)
> > > > >     at
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:171)
> > > > >     at
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.SyncRequestProcessor.run(SyncRequestProcessor.java:135)
> > > > >
> > > > >
> > > > >
> > > > > [2013-03-19 15:15:58,355] INFO Closing socket connection to /
> > 127.0.0.1
> > > .
> > > > > (kafka.network.Processor)
> > > > > [2013-03-19 15:16:00,161] INFO Closing socket connection to /
> > 127.0.0.1
> > > .
> > > > > (kafka.network.Processor)
> > > > > [2013-03-19 15:16:01,784] INFO Closing socket connection to /
> > 127.0.0.1
> > > .
> > > > > (kafka.network.Processor)
> > > > > [2013-03-19 15:16:04,751] INFO Closing socket connection to /
> > 127.0.0.1
> > > .
> > > > > (kafka.network.Processor)
> > > > > [2013-03-19 15:16:07,734] ERROR Closing socket for
> /127.0.0.1because
> > > of
> > > > > error (kafka.network.Processor)
> > > > > java.io.IOException: Broken pipe
> > > > >
> > > > >     at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> > > > >     at
> > > > >
> > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456)
> > > > >     at
> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557)
> > > > >     at
> kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102)
> > > > >     at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53)
> > > > >     at kafka.network.MultiSend.writeTo(Transmission.scala:91)
> > > > >     at kafka.network.Processor.write(SocketServer.scala:339)
> > > > >     at kafka.network.Processor.run(SocketServer.scala:216)
> > > > >     at java.lang.Thread.run(Thread.java:679)
> > > > >
> > > > >
> > > > > 2013/3/19 Jun Rao <jun...@gmail.com>
> > > > >
> > > > >> The error you saw on the broker is for consumer requests, not for
> > > > >> producer.
> > > > >> For the issues in the producer, are you using a VIP? Is there any
> > > > firewall
> > > > >> btw producer and broker? The typical issues with "connection
> reset"
> > > that
> > > > >> we
> > > > >> have seen are caused by the load balancer or the firewall killing
> > idle
> > > > >> connections.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >> On Sun, Mar 17, 2013 at 8:24 PM, Yonghui Zhao <
> > zhaoyong...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > In kafka 0.7.2, I use a producer to send 200 million message to
> > > kafka
> > > > >> > server.
> > > > >> >
> > > > >> > After sent 100 million this exception happend:
> > > > >> >
> > > > >> > In producer:
> > > > >> >
> > > > >> > Exception in thread "main" java.io.IOException: Connection reset
> > by
> > > > peer
> > > > >> >     at sun.nio.ch.FileDispatcher.writev0(Native Method)
> > > > >> >     at
> > sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
> > > > >> >     at sun.nio.ch.IOUtil.write(IOUtil.java:182)
> > > > >> >     at
> > > sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:383)
> > > > >> >     at
> > > sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:406)
> > > > >> >     at
> > java.nio.channels.SocketChannel.write(SocketChannel.java:384)
> > > > >> >     at
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:49)
> > > > >> >     at
> > > kafka.network.Send$class.writeCompletely(Transmission.scala:73)
> > > > >> >     at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:25)
> > > > >> >     at
> > > > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:95)
> > > > >> >     at kafka.producer.SyncProducer.send(SyncProducer.scala:94)
> > > > >> >     at kafka.producer.SyncProducer.send(SyncProducer.scala:125)
> > > > >> >     at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114)
> > > > >> >     at
> > > > >> >
> > > > >>
> > > >
> > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
> > > > >> >     at
> > > > >> >
> > > > >>
> > > >
> > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
> > > > >> >     at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > > >> >     at
> > > > >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > > >> >     at kafka.producer.ProducerPool.send(ProducerPool.scala:100)
> > > > >> >     at kafka.producer.Producer.zkSend(Producer.scala:137)
> > > > >> >     at kafka.producer.Producer.send(Producer.scala:99)
> > > > >> >     at kafka.javaapi.producer.Producer.send(Producer.scala:103)
> > > > >> >
> > > > >> >
> > > > >> > In kafka server:
> > > > >> >
> > > > >> > [2013-03-16 06:59:49,491] ERROR Closing socket for
> > > > /10.2.201.201because
> > > > >> > of
> > > > >> > error (kafka.network.Processor)
> > > > >> > java.io.IOException: Connection reset by peer
> > > > >> >     at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> > > > >> >     at
> > > > >> >
> > > >
> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:456)
> > > > >> >     at
> > > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:557)
> > > > >> >     at
> > > kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102)
> > > > >> >     at
> > kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53)
> > > > >> >     at kafka.network.MultiSend.writeTo(Transmission.scala:91)
> > > > >> >     at kafka.network.Processor.write(SocketServer.scala:339)
> > > > >> >     at kafka.network.Processor.run(SocketServer.scala:216)
> > > > >> >     at java.lang.Thread.run(Thread.java:679)
> > > > >> >
> > > > >> > Have you ever seen this exception before, what's the root cause?
> > > > Thanks
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to