Hi, I am trying to run this code for consumer. import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset;
public class HelloKafkaConsumer extends Thread { final static String clientId = "SimpleConsumerDemoClient"; final static String TOPIC = "test"; ConsumerConnector consumerConnector; public static void main(String[] argv) throws UnsupportedEncodingException { HelloKafkaConsumer helloKafkaConsumer = new HelloKafkaConsumer(); helloKafkaConsumer.start(); } public HelloKafkaConsumer(){ try{ Properties properties = new Properties(); properties.put("zookeeper.connect","localhost:2181"); System.out.println("Hi"+properties); properties.put("group.id","test-group"); System.out.println("Hi1"+properties); ConsumerConfig consumerConfig = new ConsumerConfig(properties); System.out.println("hi2"+consumerConfig); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); System.out.println("hi3"+consumerConnector); }catch(Exception e){} } @Override public void run() { try{ Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); System.out.println("topic count map "+topicCountMap); topicCountMap.put(TOPIC, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); System.out.println("------------------------->"); KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) System.out.println(new String(it.next().message())); }catch(Exception e){ System.out.print("Inside run"+e); e.printStackTrace(); } } private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException { for(MessageAndOffset messageAndOffset: messageSet) { try{ ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(new String(bytes, "UTF-8")); }catch(Exception e){} } } } I get EndOfStreamException in zookeeper [image: Inline image 2] Need help. Producer works fine. Regards, Vineet Salian