hi all, I am reading the book "apache kafka" and write a simple producer and consumer class. the producer works but the consumer hangs. The producer class: public static void main(String[] args) { String topic="test-topic"; Properties props = new Properties(); props.put("metadata.broker.list","linux157:9092"); props.put("serializer.class","kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<Integer, String> producer = new Producer<Integer,String>(config); for(int i=0;i<100;i++){ KeyedMessage<Integer, String> data = new KeyedMessage<Integer,String>(topic, "msg"+i); producer.send(data); } producer.close();
} public class TestKafkaConsumer { private final ConsumerConnector consumer; private final String topic; public TestKafkaConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig( props)); this.topic = topic; } public void testConsumer() { Map<String, Integer> topicCount = new HashMap<String, Integer>(); // Define single thread for topic topicCount.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer .createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } if (consumer != null) consumer.shutdown(); } public static void main(String[] args) { String topic = "test-topic"; TestKafkaConsumer simpleHLConsumer = new TestKafkaConsumer("linux157:2181",testgroup22", topic); simpleHLConsumer.testConsumer(); } }