Yes, try this command replacing the bracketed things with the correct identification for your project:
*bin/kafka-console-consumer.sh --zookeeper [zookeeper info] --topic [your-topic-here] --from-beginning* On Mon, Jun 29, 2015 at 9:56 AM, shivam tiwari <bigbang...@gmail.com> wrote: > It turns out to be the code mentioned does not read topic from beginning > and actually waits for producer to produce something fresh. After starting > producer and sending messages I was able to retrieve all the messages. Is > there a way to get messages on topic from beginning? > > On 26 June 2015 at 14:32, Gwen Shapira <gshap...@cloudera.com> wrote: > > > Zookeeper actually doesn't show any errors - it shows a warning, which > > is pretty normal. > > > > What does your consumer and Kafka broker show? Are there any errors in > > the consumer? Or is it just hanging? > > > > You may want to consult our FAQ: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata > > ? > > > > Gwen > > > > On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari <bigbang...@gmail.com> > > wrote: > > > Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able to > > run > > > both producer and consumer within VM successfully using > > > kafka-console-producer.sh and kafka-console-consumer.sh respectively. > > Even > > > I was able to consume Kafka messages from host machine using > > > kafka-console-consumer.sh. But when I tried to run the consumer using > > java > > > from eclipse then zookeeper logs following error > > > > > > 2015-06-26 03:06:26,323 - INFO > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - > > > Closed socket connection for client /192.168.1.12:59549 (no session > > > established for client) > > > 2015-06-26 03:07:26,225 - INFO > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - > > > Accepted socket connection from /192.168.1.12:59617 > > > 2015-06-26 03:07:26,226 - WARN > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught > > > end of stream exception > > > EndOfStreamException: Unable to read additional data from client > > > sessionid 0x0, likely client has closed socket > > > at > > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) > > > at > > > org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > Below is my Kafka consume code > > > > > > package com.truckevent.producer; > > > > > > > > > import java.util.HashMap; > > > import java.util.List; > > > import java.util.Map; > > > import java.util.Properties; > > > > > > import kafka.consumer.Consumer; > > > import kafka.consumer.ConsumerConfig; > > > import kafka.consumer.ConsumerIterator; > > > import kafka.consumer.KafkaStream; > > > import kafka.javaapi.consumer.ConsumerConnector; > > > > > > > > > public class KafkaConsumer { > > > > > > public static void main(String[] args) throws Exception { > > > > > > String group = "hello" ; > > > > > > > > > Properties props = new Properties(); > > > props.put("zookeeper.connect", "192.168.1.12:2181"); > > > props.put("group.id", group); > > > props.put("zookeeper.session.timeout.ms", "20000"); > > > props.put("zookeeper.sync.time.ms", "2030"); > > > props.put("auto.commit.interval.ms", "10000"); > > > props.put("auto.offset.reset", "smallest"); > > > > > > ConsumerConfig cf = new ConsumerConfig(props) ; > > > > > > ConsumerConnector consumer = > > Consumer.createJavaConsumerConnector(cf) ; > > > > > > String topic = "event" ; > > > > > > Map<String, Integer> topicCountMap = new HashMap<String, > > Integer>(); > > > topicCountMap.put(topic, new Integer(1)); > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > > > consumer.createMessageStreams(topicCountMap); > > > List<KafkaStream<byte[], byte[]>> streams = > > consumerMap.get(topic); > > > > > > > > > KafkaStream<byte[],byte[]> stream = streams.get(0) ; > > > > > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > > int i = 1 ; > > > while (it.hasNext()) { > > > > > > System.out.println(i + ": " + new > > String(it.next().message())); > > > ++i; > > > } > > > consumer.shutdown(); > > > } > > > } > > > > > > > > > I am not sure why I am not able to consume messages from java code. > Kafka > > > is running on port 6667 and zookeeper on 2181. > > > > > > > > > -- > > > Regards > > > > > > Shivam Tiwari > > > > > > -- > Regards > > Shivam Tiwari > -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu <elise...@berkeley.edu> | (925) 400-3427