I'm calling ConsumerConnector.shutdown to close a consumer connection and kafka's log reports an error?
I don't see a similar error when using SimpleConsumer. Is there a way to close ConsumerConnector so that the errors aren't reported in the kafka log (this is making it very difficult to sift through the log a find real errors). Found this -- but I didn't see a fix https://github.com/claudemamo/kafka-web-console/issues/37 Here's what the kafka log produces: [2015-03-09 13:47:40,308] ERROR Closing socket for /172.18.251.1 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) Here's how I'm using a consumer connector (in scala) -- did I forget to close something? @Test def consumerCloseTest(): Unit = { val topic = "UserEvent" val consumerConfig = KafkaUtil.makeConsumerConfig("UserEventTest", testProperties) var consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) try { // Read one item val numThreads: Int = 1 val topicCountMap = Map[String, Int](topic -> numThreads); val topicMessageStreams: scala.collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = consumerConnector.createMessageStreams(topicCountMap); val streams = topicMessageStreams.get(topic); val stream = streams.get(0); val it = stream.iterator(); while (it.hasNext()) { logger.info("DONE") return } } finally { consumerConnector.shutdown } }