Hi Team!!
I am working on a POC for my project, where I want to replace RabbitMQ with
Kafka
I am using the 0.9.0.1 version of kafka on my Ubuntu 15.10 version.I have been
able to setup the basic setup, where from the console producer I can send
messages and from the console consumer, I can consume them as well.But, when I
try to create a java program to send the messages to the same topic, I get an
exception stacktrace in my IDE console and my console consumer shows the same
message 4 times.
The exception that I get is the following:-Exception in thread "main"
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.bhargo.main.MainClass.publishMessage(MainClass.java:47)
at com.bhargo.main.MainClass.main(MainClass.java:58)
I am not sure what the issue is, I have made sure that server.properties file
that is used to startup my broker has the correct
configurations:-zookeeper.connect=crazybox:2181advertised.port=9092advertised.host.name=crazyboxhost.name=crazyboxport=9092
On running the command hostname , I get the result as "crazybox" and hostname
-i, gives me 127.0.0.1
I am attaching the code for your reference.Please have a look once.
Any help here is much appreciated.
Thanks,Bhargo
package com.main;
import java.util.Date;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class MainClass {
static Properties props = new Properties();
static Producer createProducer() {
props.put("zookeeper.connect", "crazybox:2181");
props.put("metadata.broker.list", "crazybox:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// props.put("partitioner.class", "kafka.producer.Partitioner");
props.put("request.required.acks", "1");
ProducerConfig producerConfig = new ProducerConfig(props);
return new Producer<String, String>(producerConfig);
}
private static void publishMessage(Producer<String, String> producer, String topic, int messageCount) {
for (int mCount = 0; mCount < messageCount; mCount++) {
String runtime = new Date().toString();
String msg = "Message Publishing Time - " + runtime;
System.out.println(msg);
// Creates a KeyedMessage instance
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);
// Publish the message
producer.send(data);
}
// Close producer connection with broker.
producer.close();
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Producer<String, String> producer = createProducer();
publishMessage(producer, "kafkatopic", 3);
System.out.println("sent");
}
}