Zookeeper actually doesn't show any errors - it shows a warning, which
is pretty normal.

What does your consumer and Kafka broker show? Are there any errors in
the consumer? Or is it just hanging?

You may want to consult our FAQ:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?

Gwen

On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari <bigbang...@gmail.com> wrote:
> Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able to run
> both producer and consumer within VM successfully using
> kafka-console-producer.sh and kafka-console-consumer.sh respectively. Even
> I was able to consume Kafka messages from host machine using
> kafka-console-consumer.sh. But when I tried to run the consumer using java
> from eclipse then zookeeper logs following error
>
> 2015-06-26 03:06:26,323 - INFO
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] -
> Closed socket connection for client /192.168.1.12:59549 (no session
> established for client)
> 2015-06-26 03:07:26,225 - INFO
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] -
> Accepted socket connection from /192.168.1.12:59617
> 2015-06-26 03:07:26,226 - WARN
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught
> end of stream exception
> EndOfStreamException: Unable to read additional data from client
> sessionid 0x0, likely client has closed socket
>     at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
>     at 
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
>     at java.lang.Thread.run(Thread.java:745)
>
> Below is my Kafka consume code
>
> package com.truckevent.producer;
>
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
>
>
> public class KafkaConsumer {
>
>     public static void main(String[] args) throws Exception {
>
>         String group = "hello" ;
>
>
>         Properties props = new Properties();
>         props.put("zookeeper.connect", "192.168.1.12:2181");
>         props.put("group.id", group);
>         props.put("zookeeper.session.timeout.ms", "20000");
>         props.put("zookeeper.sync.time.ms", "2030");
>         props.put("auto.commit.interval.ms", "10000");
>         props.put("auto.offset.reset", "smallest");
>
>         ConsumerConfig cf = new ConsumerConfig(props) ;
>
>         ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) 
> ;
>
>         String topic = "event" ;
>
>         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>         topicCountMap.put(topic, new Integer(1));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
>
>         KafkaStream<byte[],byte[]> stream = streams.get(0) ;
>
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         int i = 1 ;
>         while (it.hasNext()) {
>
>             System.out.println(i + ": " + new String(it.next().message()));
>             ++i;
>         }
>         consumer.shutdown();
>     }
> }
>
>
> I am not sure why I am not able to consume messages from java code. Kafka
> is running on port 6667 and zookeeper on 2181.
>
>
> --
> Regards
>
> Shivam Tiwari

Reply via email to