Hi all; Im publishing to the remote kafka server and try to consume messages from that remote server. (Kafka v 0.90.1) Publishing works fine but nor the consuming.
*Publisher* package org.test; *import java.io.IOException;* *import java.util.Properties;* *import org.apache.kafka.clients.producer.KafkaProducer;* *import org.apache.kafka.clients.producer.ProducerRecord;* *public class Producer {* * private void generateMessgaes() throws IOException {* * String topic = "MY_TOPIC";* * Properties props = new Properties();* * props.put("bootstrap.servers", "kafka.xx.com:9092 <http://kafka.xx.com:9092>");* * props.put("acks", "all");* * props.put("retries", 0);* * props.put("batch.size", 16384);* * props.put("linger.ms <http://linger.ms>", 1);* * props.put("buffer.memory", 33554432);* * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");* * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");* * props.put("serializer.class", "org.apache.kafka.common.serialization.StringSerializer");* * KafkaProducer<String, String> producer = null;* * try {* * producer = new KafkaProducer<>(props);* * for (int i = 0; i < 10; i++) {* * producer.send(new ProducerRecord<String, String>(topic, "test msg"));* * System.out.println("producing---");* * }* * } catch (Throwable e) {* * e.printStackTrace();* * System.out.println("Error in publishing messages to the topic : " + topic);* * } finally {* * producer.close();* * }* * }* * public static void main(String[] args) throws IOException {* * Producer producer = new Producer();* * producer.generateMessgaes();* * System.out.println("$$$$$");* * }* *}* I can see "*producing--- and $$$$ *prints. But when i try to consume, i do not see "polling " print messages.. It got stuck at poll(timeout). Any clue? (I have asked this question before, asking again :() *Consumer* *package org.test;* *import java.util.Arrays;* *import java.util.List;* *import java.util.Properties;* *import org.apache.kafka.clients.consumer.ConsumerRecord;* *import org.apache.kafka.clients.consumer.ConsumerRecords;* *import org.apache.kafka.clients.consumer.KafkaConsumer;* *public class Listener {* * public void start() throws CoreException {* * String topic = "MY_TOPIC";* * List<String> topics = Arrays.asList(topic);* * Properties props = new Properties();* * props.put("bootstrap.servers", "kafka.xx.com:9092 <http://kafka.xx.com:9092>");* * props.put("enable.auto.commit", true);* * props.put("receive.buffer.bytes", 262144);* * props.put("consumer.timeout.ms <http://consumer.timeout.ms>", 10000);* * props.put("session.timeout.ms <http://session.timeout.ms>", 7000);* * props.put("heartbeat.interval.ms <http://heartbeat.interval.ms>", 1000);* * props.put("auto.offset.reset", "earliest");* * props.put("group.id <http://group.id>", "test");* * props.put("fetch.min.bytes", 1);* * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");* * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");* * props.put("serializer.class", "org.apache.kafka.common.serialization.StringDeserializer");* * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);* * consumer.subscribe(topics);* * try {* * while (true) {* * ConsumerRecords<String, String> records = consumer.poll(100);* * System.out.println("polling msges : " + records.count());* * for (ConsumerRecord<String, String> record : records) {* * System.out.println("kafka record : " + record.value());* * }* * }* * } catch (Throwable e) {* * e.printStackTrace();* * System.out.println("eror in polling");* * } finally {* * consumer.close();* * }* * }* * public static void main(String args[]) throws CoreException {* * Listener listener = new Listener();* * listener.start();* * }* *}* Thanks -- -Ratha http://vvratha.blogspot.com/