Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able to run both producer and consumer within VM successfully using kafka-console-producer.sh and kafka-console-consumer.sh respectively. Even I was able to consume Kafka messages from host machine using kafka-console-consumer.sh. But when I tried to run the consumer using java from eclipse then zookeeper logs following error
2015-06-26 03:06:26,323 - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /192.168.1.12:59549 (no session established for client) 2015-06-26 03:07:26,225 - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /192.168.1.12:59617 2015-06-26 03:07:26,226 - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x0, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:745) Below is my Kafka consume code package com.truckevent.producer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer { public static void main(String[] args) throws Exception { String group = "hello" ; Properties props = new Properties(); props.put("zookeeper.connect", "192.168.1.12:2181"); props.put("group.id", group); props.put("zookeeper.session.timeout.ms", "20000"); props.put("zookeeper.sync.time.ms", "2030"); props.put("auto.commit.interval.ms", "10000"); props.put("auto.offset.reset", "smallest"); ConsumerConfig cf = new ConsumerConfig(props) ; ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) ; String topic = "event" ; Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); KafkaStream<byte[],byte[]> stream = streams.get(0) ; ConsumerIterator<byte[], byte[]> it = stream.iterator(); int i = 1 ; while (it.hasNext()) { System.out.println(i + ": " + new String(it.next().message())); ++i; } consumer.shutdown(); } } I am not sure why I am not able to consume messages from java code. Kafka is running on port 6667 and zookeeper on 2181. -- Regards Shivam Tiwari