Hi , We are evaluating kafka 0.8 for our product as a queue system. Our architecture remains simple. Our producer (single) will send mesages to any of the topics in broker. Thread will be running for each of the topic for every 10 secs,which in turn consume from its corresponding topic. For each stream we will be using ThreadPool to increase the degree of parallelism. But when shutting down the Consumer, i am getting following exception in broker,
[2014-02-01 13:06:43,240] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233) at sun.nio.ch.IOUtil.read(IOUtil.java:206) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236) at kafka.utils.Utils$.read(Utils.scala:395) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:619) I am using Highlevel consumer, and the configuration for consumer is given below, Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", groupid); props.put("zookeeper.session.timeout.ms", "12000"); props.put("zookeeper.sync.time.ms", "200"); // props.put("auto.commit.interval.ms", "1000"); props.put("auto.commit.enable", "false"); //props.put("consumer.timeout.ms", "10000"); props.put("auto.offset.reset", "smallest"); props.put("socket.timeout.ms", "10000"); Am i missing something ? Or this is a expected exception??? PS: We are using Java Client Thanks in advance Ranjith Venkatesan