Hi Team!! I am working on a POC for my project, where I want to replace RabbitMQ with Kafka I am using the 0.9.0.1 version of kafka on my Ubuntu 15.10 version.I have been able to setup the basic setup, where from the console producer I can send messages and from the console consumer, I can consume them as well.But, when I try to create a java program to send the messages to the same topic, I get an exception stacktrace in my IDE console and my console consumer shows the same message 4 times. The exception that I get is the following:-Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.Producer.send(Producer.scala:77) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.bhargo.main.MainClass.publishMessage(MainClass.java:47) at com.bhargo.main.MainClass.main(MainClass.java:58) I am not sure what the issue is, I have made sure that server.properties file that is used to startup my broker has the correct configurations:-zookeeper.connect=crazybox:2181advertised.port=9092advertised.host.name=crazyboxhost.name=crazyboxport=9092 On running the command hostname , I get the result as "crazybox" and hostname -i, gives me 127.0.0.1 I am attaching the code for your reference.Please have a look once. Any help here is much appreciated. Thanks,Bhargo
package com.main;
import java.util.Date; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MainClass { static Properties props = new Properties(); static Producer createProducer() { props.put("zookeeper.connect", "crazybox:2181"); props.put("metadata.broker.list", "crazybox:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); // props.put("partitioner.class", "kafka.producer.Partitioner"); props.put("request.required.acks", "1"); ProducerConfig producerConfig = new ProducerConfig(props); return new Producer<String, String>(producerConfig); } private static void publishMessage(Producer<String, String> producer, String topic, int messageCount) { for (int mCount = 0; mCount < messageCount; mCount++) { String runtime = new Date().toString(); String msg = "Message Publishing Time - " + runtime; System.out.println(msg); // Creates a KeyedMessage instance KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg); // Publish the message producer.send(data); } // Close producer connection with broker. producer.close(); } public static void main(String[] args) { // TODO Auto-generated method stub Producer<String, String> producer = createProducer(); publishMessage(producer, "kafkatopic", 3); System.out.println("sent"); } }