I have seen the zk consumers directory getting corrupted due to running
your consumer again and again with some settings changed and causing this
issue. I will do a removal of zk directory from kafka server's zk like below

rmr /consumers/hello and exit zookeeper shell.

Then run your consumer and it should work clean always as expected


Regards
Sai

On Mon, Jun 29, 2015 at 10:10 AM, JIEFU GONG <jg...@berkeley.edu> wrote:

> Yes, try this command replacing the bracketed things with the correct
> identification for your project:
>
> *bin/kafka-console-consumer.sh --zookeeper [zookeeper info] --topic
> [your-topic-here] --from-beginning*
>
> On Mon, Jun 29, 2015 at 9:56 AM, shivam tiwari <bigbang...@gmail.com>
> wrote:
>
> > It turns out to be the code mentioned does not read topic from beginning
> > and actually waits for producer to produce something fresh. After
> starting
> > producer and sending messages I was able to retrieve all the messages. Is
> > there a way to get messages on topic from beginning?
> >
> > On 26 June 2015 at 14:32, Gwen Shapira <gshap...@cloudera.com> wrote:
> >
> > > Zookeeper actually doesn't show any errors - it shows a warning, which
> > > is pretty normal.
> > >
> > > What does your consumer and Kafka broker show? Are there any errors in
> > > the consumer? Or is it just hanging?
> > >
> > > You may want to consult our FAQ:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> > > ?
> > >
> > > Gwen
> > >
> > > On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari <bigbang...@gmail.com>
> > > wrote:
> > > > Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able
> to
> > > run
> > > > both producer and consumer within VM successfully using
> > > > kafka-console-producer.sh and kafka-console-consumer.sh respectively.
> > > Even
> > > > I was able to consume Kafka messages from host machine using
> > > > kafka-console-consumer.sh. But when I tried to run the consumer using
> > > java
> > > > from eclipse then zookeeper logs following error
> > > >
> > > > 2015-06-26 03:06:26,323 - INFO
> > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] -
> > > > Closed socket connection for client /192.168.1.12:59549 (no session
> > > > established for client)
> > > > 2015-06-26 03:07:26,225 - INFO
> > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197]
> -
> > > > Accepted socket connection from /192.168.1.12:59617
> > > > 2015-06-26 03:07:26,226 - WARN
> > > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] -
> caught
> > > > end of stream exception
> > > > EndOfStreamException: Unable to read additional data from client
> > > > sessionid 0x0, likely client has closed socket
> > > >     at
> > > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
> > > >     at
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > > >     at java.lang.Thread.run(Thread.java:745)
> > > >
> > > > Below is my Kafka consume code
> > > >
> > > > package com.truckevent.producer;
> > > >
> > > >
> > > > import java.util.HashMap;
> > > > import java.util.List;
> > > > import java.util.Map;
> > > > import java.util.Properties;
> > > >
> > > > import kafka.consumer.Consumer;
> > > > import kafka.consumer.ConsumerConfig;
> > > > import kafka.consumer.ConsumerIterator;
> > > > import kafka.consumer.KafkaStream;
> > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > >
> > > >
> > > > public class KafkaConsumer {
> > > >
> > > >     public static void main(String[] args) throws Exception {
> > > >
> > > >         String group = "hello" ;
> > > >
> > > >
> > > >         Properties props = new Properties();
> > > >         props.put("zookeeper.connect", "192.168.1.12:2181");
> > > >         props.put("group.id", group);
> > > >         props.put("zookeeper.session.timeout.ms", "20000");
> > > >         props.put("zookeeper.sync.time.ms", "2030");
> > > >         props.put("auto.commit.interval.ms", "10000");
> > > >         props.put("auto.offset.reset", "smallest");
> > > >
> > > >         ConsumerConfig cf = new ConsumerConfig(props) ;
> > > >
> > > >         ConsumerConnector consumer =
> > > Consumer.createJavaConsumerConnector(cf) ;
> > > >
> > > >         String topic = "event" ;
> > > >
> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > > Integer>();
> > > >         topicCountMap.put(topic, new Integer(1));
> > > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > consumer.createMessageStreams(topicCountMap);
> > > >         List<KafkaStream<byte[], byte[]>> streams =
> > > consumerMap.get(topic);
> > > >
> > > >
> > > >         KafkaStream<byte[],byte[]> stream = streams.get(0) ;
> > > >
> > > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > >         int i = 1 ;
> > > >         while (it.hasNext()) {
> > > >
> > > >             System.out.println(i + ": " + new
> > > String(it.next().message()));
> > > >             ++i;
> > > >         }
> > > >         consumer.shutdown();
> > > >     }
> > > > }
> > > >
> > > >
> > > > I am not sure why I am not able to consume messages from java code.
> > Kafka
> > > > is running on port 6667 and zookeeper on 2181.
> > > >
> > > >
> > > > --
> > > > Regards
> > > >
> > > > Shivam Tiwari
> > >
> >
> >
> >
> > --
> > Regards
> >
> > Shivam Tiwari
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu <elise...@berkeley.edu> | (925) 400-3427
>

Reply via email to