Hi, I see the following scenario: 1. Send messages under some topic X, able to see the log folder in Kafka Broker with name X-0 (Zeroth partition) and having files xxx.log and xxx.index under them. So guess this is fine
2. THen I fire up the consumer for topic X, it is able to find two streams (mapping to two partitions I have defined). Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = null; Iterator<String> it = topicCountMap.keySet().iterator(); int threadNumber= 0; while(it.hasNext()) { String topic = it.next(); streams = consumerMap.get(topic); for (KafkaStream stream : streams) { System.out.println("threadNo =" + threadNumber + " for topic = " + topic ); new ConsumerThreadRunnable(stream, threadNumber, topic)); threadNumber++; } } However I don't get any messages in the CounsmerTHreanRunnable here ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext() ) { byte[] nextMessageByteArray = it.next().message(); } If I start the consumer first and then restart the producer thread, sending the messages for topic X then consumer is able to receive the messages. >From kafka docs the high-level consumer thread does long polling till the message is available. What is wrong I'm doing? Any idea to get around the problem. thanks! -- Kind Regards, Shafaq