It turns out to be the code mentioned does not read topic from beginning and actually waits for producer to produce something fresh. After starting producer and sending messages I was able to retrieve all the messages. Is there a way to get messages on topic from beginning?
On 26 June 2015 at 14:32, Gwen Shapira <gshap...@cloudera.com> wrote: > Zookeeper actually doesn't show any errors - it shows a warning, which > is pretty normal. > > What does your consumer and Kafka broker show? Are there any errors in > the consumer? Or is it just hanging? > > You may want to consult our FAQ: > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata > ? > > Gwen > > On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari <bigbang...@gmail.com> > wrote: > > Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able to > run > > both producer and consumer within VM successfully using > > kafka-console-producer.sh and kafka-console-consumer.sh respectively. > Even > > I was able to consume Kafka messages from host machine using > > kafka-console-consumer.sh. But when I tried to run the consumer using > java > > from eclipse then zookeeper logs following error > > > > 2015-06-26 03:06:26,323 - INFO > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - > > Closed socket connection for client /192.168.1.12:59549 (no session > > established for client) > > 2015-06-26 03:07:26,225 - INFO > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - > > Accepted socket connection from /192.168.1.12:59617 > > 2015-06-26 03:07:26,226 - WARN > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught > > end of stream exception > > EndOfStreamException: Unable to read additional data from client > > sessionid 0x0, likely client has closed socket > > at > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) > > at > org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) > > at java.lang.Thread.run(Thread.java:745) > > > > Below is my Kafka consume code > > > > package com.truckevent.producer; > > > > > > import java.util.HashMap; > > import java.util.List; > > import java.util.Map; > > import java.util.Properties; > > > > import kafka.consumer.Consumer; > > import kafka.consumer.ConsumerConfig; > > import kafka.consumer.ConsumerIterator; > > import kafka.consumer.KafkaStream; > > import kafka.javaapi.consumer.ConsumerConnector; > > > > > > public class KafkaConsumer { > > > > public static void main(String[] args) throws Exception { > > > > String group = "hello" ; > > > > > > Properties props = new Properties(); > > props.put("zookeeper.connect", "192.168.1.12:2181"); > > props.put("group.id", group); > > props.put("zookeeper.session.timeout.ms", "20000"); > > props.put("zookeeper.sync.time.ms", "2030"); > > props.put("auto.commit.interval.ms", "10000"); > > props.put("auto.offset.reset", "smallest"); > > > > ConsumerConfig cf = new ConsumerConfig(props) ; > > > > ConsumerConnector consumer = > Consumer.createJavaConsumerConnector(cf) ; > > > > String topic = "event" ; > > > > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > > topicCountMap.put(topic, new Integer(1)); > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > > consumer.createMessageStreams(topicCountMap); > > List<KafkaStream<byte[], byte[]>> streams = > consumerMap.get(topic); > > > > > > KafkaStream<byte[],byte[]> stream = streams.get(0) ; > > > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > int i = 1 ; > > while (it.hasNext()) { > > > > System.out.println(i + ": " + new > String(it.next().message())); > > ++i; > > } > > consumer.shutdown(); > > } > > } > > > > > > I am not sure why I am not able to consume messages from java code. Kafka > > is running on port 6667 and zookeeper on 2181. > > > > > > -- > > Regards > > > > Shivam Tiwari > -- Regards Shivam Tiwari