when I list down the partition info ;

List<PartitionInfo> info = consumer.partitionsFor(topic);

it returns info like;

[Partition(topic = *MY_TOPIC*, partition = 0, leader = 1012, replicas =
[1012,], isr = [1012,]]

My communication with the broker looks like fine. I see more than 1000
messages for the topic using kafka tool (UI tool to view topics, consumer..)
But I could not figure out what blocks my consumer to poll() the message.
Anyone can figure out my issue?

> Hi all;
> Im publishing to the remote kafka server and try to consume messages from
> that remote server. (Kafka v 0.90.1)
> Publishing works fine but nor the consuming.
> *Publisher*
> package org.test;
> *import java.io.IOException;*
> *import java.util.Properties;*
> *import org.apache.kafka.clients.producer.KafkaProducer;*
> *import org.apache.kafka.clients.producer.ProducerRecord;*
> *public class Producer {*
> * private void generateMessgaes() throws IOException {*
> * String topic = "MY_TOPIC";*
> * Properties props = new Properties();*
> * props.put("bootstrap.servers", "kafka.xx.com:9092
> <http://kafka.xx.com:9092>");*
> * props.put("acks", "all");*
> * props.put("retries", 0);*
> * props.put("batch.size", 16384);*
> * props.put("linger.ms <http://linger.ms>", 1);*
> * props.put("buffer.memory", 33554432);*
> * props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");*
> * props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");*
> * props.put("serializer.class",
> "org.apache.kafka.common.serialization.StringSerializer");*
> * KafkaProducer<String, String> producer = null;*
> * try {*
> * producer = new KafkaProducer<>(props);*
> * for (int i = 0; i < 10; i++) {*
> * producer.send(new ProducerRecord<String, String>(topic, "test msg"));*
> * System.out.println("producing---");*
> * }*
> * } catch (Throwable e) {*
> * e.printStackTrace();*
> * System.out.println("Error in publishing messages to the topic : " +
> topic);*
> * } finally {*
> * producer.close();*
> * }*
> * }*
> * public static void main(String[] args) throws IOException {*
> * Producer producer = new Producer();*
> * producer.generateMessgaes();*
> * System.out.println("$$$$$");*
> * }*
> *}*
> I can see "*producing--- and $$$$ *prints. But when i try to consume, i
> do not see "polling " print messages.. It got stuck at poll(timeout).
> Any clue? (I have asked this question before, asking again :()
> *Consumer*
> *package org.test;*
> *import java.util.Arrays;*
> *import java.util.List;*
> *import java.util.Properties;*
> *import org.apache.kafka.clients.consumer.ConsumerRecord;*
> *import org.apache.kafka.clients.consumer.ConsumerRecords;*
> *import org.apache.kafka.clients.consumer.KafkaConsumer;*
> *public class Listener {*
> * public void start() throws CoreException {*
> * String topic = "MY_TOPIC";*
> * List<String> topics = Arrays.asList(topic);*
> * Properties props = new Properties();*
> * props.put("bootstrap.servers", "kafka.xx.com:9092
> <http://kafka.xx.com:9092>");*
> * props.put("enable.auto.commit", true);*
> * props.put("receive.buffer.bytes", 262144);*
> * props.put("consumer.timeout.ms <http://consumer.timeout.ms>", 10000);*
> * props.put("session.timeout.ms <http://session.timeout.ms>", 7000);*
> * props.put("heartbeat.interval.ms <http://heartbeat.interval.ms>", 1000);*
> * props.put("auto.offset.reset", "earliest");*
> * props.put("group.id <http://group.id>", "test");*
> * props.put("fetch.min.bytes", 1);*
> * props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");*
> * props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");*
> * props.put("serializer.class",
> "org.apache.kafka.common.serialization.StringDeserializer");*
> * KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
> String>(props);*
> * consumer.subscribe(topics);*
> * try {*
> * while (true) {*
> * ConsumerRecords<String, String> records = consumer.poll(100);*
> * System.out.println("polling msges : " + records.count());*
> * for (ConsumerRecord<String, String> record : records) {*
> * System.out.println("kafka record : " + record.value());*
> * }*
> * }*
> * } catch (Throwable e) {*
> * e.printStackTrace();*
> * System.out.println("eror in polling");*
> * } finally {*
> * consumer.close();*
> * }*
> * }*
> * public static void main(String args[]) throws CoreException {*
> * Listener listener = new Listener();*
> * listener.start();*
> * }*
> *}*
> Thanks
