for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Single Topic :: " + new String(consumerIte.next(). message())); }
Besides what Guozhang suggested, this code has a bug. Since each of the streams is blocking, you will have to start each stream in a separate thread. Please take a look at https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example Thanks, Neha On Mon, Jun 23, 2014 at 8:05 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Did you start the consumer after the producer? The default behavior of the > consumer is to "consume from the tail of the log", and hence if there is no > new messages coming in after the consumer started, it will get nothing. You > may set > > auto.offset.reset="smallest" > > and try again. > > Guozhang > > > On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fancye...@gmail.com> wrote: > > > hi all, > > I am reading the book "apache kafka" and write a simple producer > > and consumer class. the producer works but the consumer hangs. > > The producer class: > > public static void main(String[] args) { > > String topic="test-topic"; > > Properties props = new Properties(); > > props.put("metadata.broker.list","linux157:9092"); > > props.put("serializer.class","kafka.serializer.StringEncoder"); > > props.put("request.required.acks", "1"); > > ProducerConfig config = new ProducerConfig(props); > > Producer<Integer, String> producer = new > > Producer<Integer,String>(config); > > for(int i=0;i<100;i++){ > > KeyedMessage<Integer, String> data = new > > KeyedMessage<Integer,String>(topic, "msg"+i); > > producer.send(data); > > } > > producer.close(); > > > > } > > > > public class TestKafkaConsumer { > > private final ConsumerConnector consumer; > > private final String topic; > > > > public TestKafkaConsumer(String zookeeper, String groupId, String topic) > { > > Properties props = new Properties(); > > props.put("zookeeper.connect", zookeeper); > > props.put("group.id", groupId); > > props.put("zookeeper.session.timeout.ms", "500"); > > props.put("zookeeper.sync.time.ms", "250"); > > props.put("auto.commit.interval.ms", "1000"); > > consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig( > > props)); > > this.topic = topic; > > } > > > > public void testConsumer() { > > Map<String, Integer> topicCount = new HashMap<String, Integer>(); > > // Define single thread for topic > > topicCount.put(topic, new Integer(1)); > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = > > consumer > > .createMessageStreams(topicCount); > > List<KafkaStream<byte[], byte[]>> streams = > consumerStreams.get(topic); > > for (final KafkaStream stream : streams) { > > ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); > > while (consumerIte.hasNext()) > > System.out.println("Message from Single Topic :: " > > + new String(consumerIte.next().message())); > > } > > if (consumer != null) > > consumer.shutdown(); > > } > > > > public static void main(String[] args) { > > String topic = "test-topic"; > > TestKafkaConsumer simpleHLConsumer = new > > TestKafkaConsumer("linux157:2181",testgroup22", topic); > > simpleHLConsumer.testConsumer(); > > > > } > > > > } > > > > > > -- > -- Guozhang >