Hi,
   I see the following scenario:

1. Send messages under some topic X, able to see the log folder in Kafka
Broker with name X-0 (Zeroth partition) and having files xxx.log and
xxx.index under them. So guess this is fine

2. THen I fire up the consumer for topic X, it is able to find two streams
(mapping to two partitions I have defined).


        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = null;
        Iterator<String> it = topicCountMap.keySet().iterator();
        int threadNumber= 0;
        while(it.hasNext()) {
            String topic = it.next();
            streams = consumerMap.get(topic);
            for (KafkaStream stream : streams) {
                System.out.println("threadNo =" + threadNumber + "  for
topic = " + topic );
                new ConsumerThreadRunnable(stream, threadNumber, topic));
                threadNumber++;
            }
        }

However I don't get any messages in the CounsmerTHreanRunnable here
ConsumerIterator<byte[], byte[]> it = stream.iterator();

        while (it.hasNext() ) {
            byte[] nextMessageByteArray = it.next().message();
       }

If I start the consumer first and then restart the producer thread, sending
the messages for topic X  then consumer is able to receive the messages.

>From kafka docs the high-level  consumer thread does long polling till the
message is available.

What is wrong I'm doing? Any idea to get around the problem.

thanks!

-- 
Kind Regards,
Shafaq

Reply via email to