I have seen the zk consumers directory getting corrupted due to running your consumer again and again with some settings changed and causing this issue. I will do a removal of zk directory from kafka server's zk like below
rmr /consumers/hello and exit zookeeper shell. Then run your consumer and it should work clean always as expected Regards Sai On Mon, Jun 29, 2015 at 10:10 AM, JIEFU GONG <jg...@berkeley.edu> wrote: > Yes, try this command replacing the bracketed things with the correct > identification for your project: > > *bin/kafka-console-consumer.sh --zookeeper [zookeeper info] --topic > [your-topic-here] --from-beginning* > > On Mon, Jun 29, 2015 at 9:56 AM, shivam tiwari <bigbang...@gmail.com> > wrote: > > > It turns out to be the code mentioned does not read topic from beginning > > and actually waits for producer to produce something fresh. After > starting > > producer and sending messages I was able to retrieve all the messages. Is > > there a way to get messages on topic from beginning? > > > > On 26 June 2015 at 14:32, Gwen Shapira <gshap...@cloudera.com> wrote: > > > > > Zookeeper actually doesn't show any errors - it shows a warning, which > > > is pretty normal. > > > > > > What does your consumer and Kafka broker show? Are there any errors in > > > the consumer? Or is it just hanging? > > > > > > You may want to consult our FAQ: > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata > > > ? > > > > > > Gwen > > > > > > On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari <bigbang...@gmail.com> > > > wrote: > > > > Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able > to > > > run > > > > both producer and consumer within VM successfully using > > > > kafka-console-producer.sh and kafka-console-consumer.sh respectively. > > > Even > > > > I was able to consume Kafka messages from host machine using > > > > kafka-console-consumer.sh. But when I tried to run the consumer using > > > java > > > > from eclipse then zookeeper logs following error > > > > > > > > 2015-06-26 03:06:26,323 - INFO > > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - > > > > Closed socket connection for client /192.168.1.12:59549 (no session > > > > established for client) > > > > 2015-06-26 03:07:26,225 - INFO > > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] > - > > > > Accepted socket connection from /192.168.1.12:59617 > > > > 2015-06-26 03:07:26,226 - WARN > > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - > caught > > > > end of stream exception > > > > EndOfStreamException: Unable to read additional data from client > > > > sessionid 0x0, likely client has closed socket > > > > at > > > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) > > > > at > > > > > > org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > Below is my Kafka consume code > > > > > > > > package com.truckevent.producer; > > > > > > > > > > > > import java.util.HashMap; > > > > import java.util.List; > > > > import java.util.Map; > > > > import java.util.Properties; > > > > > > > > import kafka.consumer.Consumer; > > > > import kafka.consumer.ConsumerConfig; > > > > import kafka.consumer.ConsumerIterator; > > > > import kafka.consumer.KafkaStream; > > > > import kafka.javaapi.consumer.ConsumerConnector; > > > > > > > > > > > > public class KafkaConsumer { > > > > > > > > public static void main(String[] args) throws Exception { > > > > > > > > String group = "hello" ; > > > > > > > > > > > > Properties props = new Properties(); > > > > props.put("zookeeper.connect", "192.168.1.12:2181"); > > > > props.put("group.id", group); > > > > props.put("zookeeper.session.timeout.ms", "20000"); > > > > props.put("zookeeper.sync.time.ms", "2030"); > > > > props.put("auto.commit.interval.ms", "10000"); > > > > props.put("auto.offset.reset", "smallest"); > > > > > > > > ConsumerConfig cf = new ConsumerConfig(props) ; > > > > > > > > ConsumerConnector consumer = > > > Consumer.createJavaConsumerConnector(cf) ; > > > > > > > > String topic = "event" ; > > > > > > > > Map<String, Integer> topicCountMap = new HashMap<String, > > > Integer>(); > > > > topicCountMap.put(topic, new Integer(1)); > > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > > > > consumer.createMessageStreams(topicCountMap); > > > > List<KafkaStream<byte[], byte[]>> streams = > > > consumerMap.get(topic); > > > > > > > > > > > > KafkaStream<byte[],byte[]> stream = streams.get(0) ; > > > > > > > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > > > int i = 1 ; > > > > while (it.hasNext()) { > > > > > > > > System.out.println(i + ": " + new > > > String(it.next().message())); > > > > ++i; > > > > } > > > > consumer.shutdown(); > > > > } > > > > } > > > > > > > > > > > > I am not sure why I am not able to consume messages from java code. > > Kafka > > > > is running on port 6667 and zookeeper on 2181. > > > > > > > > > > > > -- > > > > Regards > > > > > > > > Shivam Tiwari > > > > > > > > > > > -- > > Regards > > > > Shivam Tiwari > > > > > > -- > > Jiefu Gong > University of California, Berkeley | Class of 2017 > B.A Computer Science | College of Letters and Sciences > > jg...@berkeley.edu <elise...@berkeley.edu> | (925) 400-3427 >