for (final KafkaStream stream : streams) {
        ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
        while (consumerIte.hasNext())
            System.out.println("Message from Single Topic :: "
                       + new String(consumerIte.next().
message()));
        }

Besides what Guozhang suggested, this code has a bug. Since each of the
streams is blocking, you will have to start each stream in a separate
thread. Please take a look at
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Thanks,
Neha


On Mon, Jun 23, 2014 at 8:05 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Did you start the consumer after the producer? The default behavior of the
> consumer is to "consume from the tail of the log", and hence if there is no
> new messages coming in after the consumer started, it will get nothing. You
> may set
>
> auto.offset.reset="smallest"
>
> and try again.
>
> Guozhang
>
>
> On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fancye...@gmail.com> wrote:
>
> > hi all,
> >    I am reading the book "apache kafka" and write a simple producer
> > and consumer class. the producer works but the consumer hangs.
> >    The producer class:
> > public static void main(String[] args) {
> >     String topic="test-topic";
> >     Properties props = new Properties();
> >     props.put("metadata.broker.list","linux157:9092");
> >     props.put("serializer.class","kafka.serializer.StringEncoder");
> >     props.put("request.required.acks", "1");
> >     ProducerConfig config = new ProducerConfig(props);
> >     Producer<Integer, String> producer = new
> > Producer<Integer,String>(config);
> >     for(int i=0;i<100;i++){
> >         KeyedMessage<Integer, String> data = new
> > KeyedMessage<Integer,String>(topic, "msg"+i);
> >         producer.send(data);
> >     }
> >     producer.close();
> >
> > }
> >
> > public class TestKafkaConsumer {
> > private final ConsumerConnector consumer;
> > private final String topic;
> >
> > public TestKafkaConsumer(String zookeeper, String groupId, String topic)
> {
> > Properties props = new Properties();
> > props.put("zookeeper.connect", zookeeper);
> > props.put("group.id", groupId);
> > props.put("zookeeper.session.timeout.ms", "500");
> > props.put("zookeeper.sync.time.ms", "250");
> > props.put("auto.commit.interval.ms", "1000");
> > consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
> > props));
> > this.topic = topic;
> > }
> >
> > public void testConsumer() {
> >     Map<String, Integer> topicCount = new HashMap<String, Integer>();
> >     // Define single thread for topic
> >     topicCount.put(topic, new Integer(1));
> >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
> > consumer
> >              .createMessageStreams(topicCount);
> >     List<KafkaStream<byte[], byte[]>> streams =
> consumerStreams.get(topic);
> >     for (final KafkaStream stream : streams) {
> >         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
> >         while (consumerIte.hasNext())
> >             System.out.println("Message from Single Topic :: "
> >                        + new String(consumerIte.next().message()));
> >         }
> >     if (consumer != null)
> >         consumer.shutdown();
> > }
> >
> > public static void main(String[] args) {
> >     String topic = "test-topic";
> >     TestKafkaConsumer simpleHLConsumer = new
> > TestKafkaConsumer("linux157:2181",testgroup22", topic);
> >     simpleHLConsumer.testConsumer();
> >
> > }
> >
> > }
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to