when I list down the partition info ; List<PartitionInfo> info = consumer.partitionsFor(topic);
it returns info like; [Partition(topic = *MY_TOPIC*, partition = 0, leader = 1012, replicas = [1012,], isr = [1012,]] My communication with the broker looks like fine. I see more than 1000 messages for the topic using kafka tool (UI tool to view topics, consumer..) But I could not figure out what blocks my consumer to poll() the message. Anyone can figure out my issue? On 4 April 2016 at 10:58, Ratha v <vijayara...@gmail.com> wrote: > > Hi all; > Im publishing to the remote kafka server and try to consume messages from > that remote server. (Kafka v 0.90.1) > Publishing works fine but nor the consuming. > > *Publisher* > > package org.test; > > *import java.io.IOException;* > *import java.util.Properties;* > > *import org.apache.kafka.clients.producer.KafkaProducer;* > *import org.apache.kafka.clients.producer.ProducerRecord;* > > > *public class Producer {* > > * private void generateMessgaes() throws IOException {* > * String topic = "MY_TOPIC";* > > * Properties props = new Properties();* > * props.put("bootstrap.servers", "kafka.xx.com:9092 > <http://kafka.xx.com:9092>");* > * props.put("acks", "all");* > * props.put("retries", 0);* > * props.put("batch.size", 16384);* > * props.put("linger.ms <http://linger.ms>", 1);* > * props.put("buffer.memory", 33554432);* > * props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer");* > * props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer");* > * props.put("serializer.class", > "org.apache.kafka.common.serialization.StringSerializer");* > > > * KafkaProducer<String, String> producer = null;* > * try {* > * producer = new KafkaProducer<>(props);* > * for (int i = 0; i < 10; i++) {* > * producer.send(new ProducerRecord<String, String>(topic, "test msg"));* > * System.out.println("producing---");* > * }* > > * } catch (Throwable e) {* > * e.printStackTrace();* > * System.out.println("Error in publishing messages to the topic : " + > topic);* > > * } finally {* > * producer.close();* > * }* > * }* > > * public static void main(String[] args) throws IOException {* > * Producer producer = new Producer();* > * producer.generateMessgaes();* > * System.out.println("$$$$$");* > * }* > *}* > > > I can see "*producing--- and $$$$ *prints. But when i try to consume, i > do not see "polling " print messages.. It got stuck at poll(timeout). > > Any clue? (I have asked this question before, asking again :() > > *Consumer* > > *package org.test;* > > > *import java.util.Arrays;* > > *import java.util.List;* > > *import java.util.Properties;* > > *import org.apache.kafka.clients.consumer.ConsumerRecord;* > > *import org.apache.kafka.clients.consumer.ConsumerRecords;* > > *import org.apache.kafka.clients.consumer.KafkaConsumer;* > > > > *public class Listener {* > > > * public void start() throws CoreException {* > > > * String topic = "MY_TOPIC";* > > > * List<String> topics = Arrays.asList(topic);* > > > * Properties props = new Properties();* > > * props.put("bootstrap.servers", "kafka.xx.com:9092 > <http://kafka.xx.com:9092>");* > > * props.put("enable.auto.commit", true);* > > * props.put("receive.buffer.bytes", 262144);* > > * props.put("consumer.timeout.ms <http://consumer.timeout.ms>", 10000);* > > * props.put("session.timeout.ms <http://session.timeout.ms>", 7000);* > > * props.put("heartbeat.interval.ms <http://heartbeat.interval.ms>", 1000);* > > * props.put("auto.offset.reset", "earliest");* > > * props.put("group.id <http://group.id>", "test");* > > * props.put("fetch.min.bytes", 1);* > > * props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer");* > > * props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer");* > > * props.put("serializer.class", > "org.apache.kafka.common.serialization.StringDeserializer");* > > * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, > String>(props);* > > * consumer.subscribe(topics);* > > > * try {* > > * while (true) {* > > * ConsumerRecords<String, String> records = consumer.poll(100);* > > * System.out.println("polling msges : " + records.count());* > > * for (ConsumerRecord<String, String> record : records) {* > > * System.out.println("kafka record : " + record.value());* > > * }* > > * }* > > * } catch (Throwable e) {* > > * e.printStackTrace();* > > * System.out.println("eror in polling");* > > * } finally {* > > * consumer.close();* > > * }* > > * }* > > > * public static void main(String args[]) throws CoreException {* > > > * Listener listener = new Listener();* > > * listener.start();* > > > * }* > > *}* > > Thanks > -- > -Ratha > http://vvratha.blogspot.com/ > -- -Ratha http://vvratha.blogspot.com/