>From the consumer logs, it seems senseidb is interrupting the simple
consumer thread. This causes the socket to close which then shows up as a
broken pipe on the server. I don't know senseidb to say if this thread
interruption makes sense. But there are better ways to close the consumer
properly. I would post this question on sensei db mailing list.

Thanks,
Neha


On Wed, Mar 20, 2013 at 1:35 AM, Yonghui Zhao <zhaoyong...@gmail.com> wrote:

> Thanks Neha,
>
> After enable INFO log in consumer,
> I find 2 exceptions in consumer side, any idea?
>
>
>
> 2013/03/20 14:52:00.585 INFO [SimpleConsumer] [] Reconnect in multifetch
> due to socket error:
> java.nio.channels.ClosedChannelException
>     at
> sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:120)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:163)
>     at kafka.utils.Utils$.read(Utils.scala:538)
>     at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>     at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
>     at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>     at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
>     at
> kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
>     at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
>     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
>
>
> 2013/03/20 14:52:03.678 INFO [SimpleConsumer] [] Reconnect in multifetch
> due to socket error:
> java.nio.channels.ClosedByInterruptException
>     at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270)
>     at kafka.utils.Utils$.read(Utils.scala:538)
>     at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>     at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
>     at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>     at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
>     at
> kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
>     at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
>     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
>
>
>
> 2013/3/19 Neha Narkhede <neha.narkh...@gmail.com>
>
> > Modify the log4j properties for senseidb and set kafka.consumer to INFO.
> > you can check the senseidb startup scripts on how they configure their
> > log4j.
> >
> > Thanks,
> > Neha
> >
> > On Tuesday, March 19, 2013, Yonghui Zhao wrote:
> >
> > > Hi Neha,
> > >
> > > How can I enable all kafka consumer log in senseidb?
> > > Btw: I am using kafka 0.7.2 java client.
> > >
> > > 2013/3/19 Neha Narkhede <neha.narkh...@gmail.com <javascript:;>>
> > >
> > > > The logs show that senseidb is prematurely closing the socket
> > connection
> > > to
> > > > the Kafka broker. I would enable atleast INFLO logging for Kafka in
> > > > Senseidb to see what the issue is.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Monday, March 18, 2013, Yonghui Zhao wrote:
> > > >
> > > > > Thanks Neha,
> > > > >
> > > > > I use one kafka server with 4 partitions and 3 consumers(senseidb).
> > > > >
> > > > > Kafka server producer input rate is about 10k.
> > > > > And each consumer consuming rate is about 3k.
> > > > >
> > > > > I see this exceptions many times, kafka has this exception on each
> > > > > consumers, but I didn't find error log in consumer side,
> > > > consumer(senseidb)
> > > > > is alive all the time.
> > > > > Is it possible the exception is related with high input/output
> rate?
> > > > >
> > > > > And some times another exception(*Connection reset by peer*)
> > happened.
> > > > >
> > > > > [2013-03-18 21:18:29,107] ERROR Closing socket for
> > /10.2.201.203because
> > > > > of error (kafka.network.Processor)
> > > > > java.io.IOException: Connection reset by peer
> > > > >     at sun.nio.ch.FileChannelImpl.**transferTo0(Native Method)
> > > > >     at sun.nio.ch.FileChannelImpl.**transferToDirectly(**
> > > > > FileChannelImpl.java:456)
> > > > >     at
> > > > sun.nio.ch.FileChannelImpl.**transferTo(FileChannelImpl.**java:557)
> > > > >     at
> > > kafka.message.FileMessageSet.**writeTo(FileMessageSet.scala:**102)
> > > > >     at
> > kafka.server.MessageSetSend.**writeTo(MessageSetSend.scala:**53)
> > > > >     at kafka.network.MultiSend.**writeTo(Transmission.scala:91)
> > > > >     at kafka.network.Processor.write(**SocketServer.scala:339)
> > > > >     at kafka.network.Processor.run(**SocketServer.scala:216)
> > > > >     at java.lang.Thread.run(Thread.**java:679)
> > > > >
> > > > >
> > > > >
> > > > > Btw: the log is full of "*Closing socket connection*", is it
> normal?
> > > > >
> > > > > From the code each "Closing" log should  correspond to one
> exception
> > > > >
> > > > > In SocketServer.scala:
> > > > >
> > > > > *catch {**
> > > > > **            case e: EOFException => {**
> > > > > **              logger.info("Closing socket connection to
> > > > > %s.".format(channelFor(key).**socket.getInetAddress))**
> > > > > **              close(key)**
> > > > > **        }**
> > > > > *
> > > > >
> > > > > [2013-03-18 21:28:42,791] INFO *Closing socket connection* to /
> > > > > 10.2.201.201. (kafka.network.Processor)
> > > > > [2013-03-18 21:28:43,954] INFO *Closing socket connection* to /
> > > > > 10.2.201.201. (kafka.network.Processor)
> > > > > [2013-03-18 21:28:45,322] INFO *Closing socket connection* to /
> > > > > 10.2.201.201. (kafka.network.Processor)
> > > > > [2013-03-18 21:28:47,045] INFO *Closing socket connection *to /
> > > > > 10.2.201.201. (kafka.network.Processor)
> > > > > [2013-03-18 21:28:50,110] ERROR Closing socket for
> > /10.2.201.201because
> > > > > of error (kafka.network.Processor)
> > > > > java.io.IOException: Broken pipe
> > > > >     at sun.nio.ch.FileChannelImpl.**transferTo0(Native Method)
> > > > >     at sun.nio.ch.FileChannelImpl.**transferToDirectly(**
> > > > > FileChannelImpl.java:456)
> > > > >     at
> > > > sun.nio.ch.FileChannelImpl.**transferTo(FileChannelImpl.**java:557)
> > > > >     at
> > > kafka.message.FileMessageSet.**writeTo(FileMessageSet.scala:**102)
> > > > >     at
> > kafka.server.MessageSetSend.**writeTo(MessageSetSend.scala:**53)
> > > > >     at kafka.network.MultiSend.**writeTo(Transmission.scala:91)
> > > > >     at kafka.network.Processor.write(**SocketServer.scala:339)
> > > > >     at kafka.network.Processor.run(**SocketServer.scala:216)
> > > > >     at java.lang.Thread.run(Thread.**java:679)
> > > > >
> > > > >
> > > > >
> > > > > From the error it seems that the server was not able to write the
> > fetch
> > > > > response on the socket since the socket was closed. Do you see any
> > > errors
> > > > > on the consumer on 10.2.201.20 <http://10.2.201.202/> ? The
> consumer
> > > > > could've
> > > > > closed the socket due to some error or could've died.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > > - Hide quoted text - <http://grokbase.com/t/kafka/**
> > > > > users/133fwd3r91/java-io-**ioexception-broken-pipe#<
> > > >
> > >
> >
> http://grokbase.com/t/kafka/users/133fwd3r91/java-io-ioexception-broken-pipe#
> > > > >>
> > > > >
> > > > > On Friday, March 15, 2013, Yonghui Zhao wrote:
> > > > >
> > > > > This exception happened many times in high through put in kafka
> 0.7.2
> > > > >
> > > > > [2013-03-15 17:27:16,691] ERROR Closing socket for
> > /10.2.201.202because
> > > > > of
> > > > > error (kafka.network.Processor)
> > > > > java.io.IOException: Broken pipe
> > > > > at sun.nio.ch.FileChannelImpl.**transferTo0(Native Method)
> > > > > at
> > > > > sun.nio.ch.FileChannelImpl.**transferToDirectly(**
> > > > > FileChannelImpl.java:456)
> > > > > at
> > sun.nio.ch.FileChannelImpl.**transferTo(FileChannelImpl.**java:557)
> > > > > at
> kafka.message.FileMessageSet.**writeTo(FileMessageSet.scala:**102)
> > > > > at kafka.server.MessageSetSend.**writeTo(MessageSetSend.scala:**53)
> > > > > at kafka.network.MultiSend.**writeTo(Transmission.scala:91)
> > > > > at kafka.network.Processor.write(**SocketServer.scala:339)
> > > > > at kafka.network.Processor.run(**SocketServer.scala:216)
> > > > > at java.lang.Thread.run(Thread.**java:679)
> > > > >
> > > > >
> > > > > It seems it doesn't matter much.
> > > > >
> > > > > I want to know if we need take actions if we see this exception.
> > > > >
> > > >
> > >
> >
>

Reply via email to