Hi Team!!
I am working on a POC for my project, where I want to replace RabbitMQ with 
Kafka
I am using the 0.9.0.1 version of kafka on my Ubuntu 15.10  version.I have been 
able to setup the basic setup, where from the console producer I can send 
messages and from the console consumer, I can consume them as well.But, when I 
try to create a java program to send the messages to the same topic, I get an 
exception stacktrace in my IDE console and my console consumer shows the same 
message 4 times.
The exception that I get is the following:-Exception in thread "main" 
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
    at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.bhargo.main.MainClass.publishMessage(MainClass.java:47)
    at com.bhargo.main.MainClass.main(MainClass.java:58)
I am not sure what the issue is, I have made sure that server.properties file 
that is used to startup my broker has the correct 
configurations:-zookeeper.connect=crazybox:2181advertised.port=9092advertised.host.name=crazyboxhost.name=crazyboxport=9092
On running the command hostname , I get the result as "crazybox" and hostname 
-i, gives me 127.0.0.1
I am attaching the code for your reference.Please have a look once.
Any help here is much appreciated.
Thanks,Bhargo
package com.main;

import java.util.Date;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MainClass {

	static Properties props = new Properties();

	static Producer createProducer() {
		props.put("zookeeper.connect", "crazybox:2181");
		props.put("metadata.broker.list", "crazybox:9092");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		// props.put("partitioner.class", "kafka.producer.Partitioner");
		props.put("request.required.acks", "1");
		ProducerConfig producerConfig = new ProducerConfig(props);
		return new Producer<String, String>(producerConfig);
	}

	private static void publishMessage(Producer<String, String> producer, String topic, int messageCount) {
		for (int mCount = 0; mCount < messageCount; mCount++) {
			String runtime = new Date().toString();
			String msg = "Message	Publishing	Time	-	" + runtime;
			System.out.println(msg);
			// Creates a KeyedMessage instance
			KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);

			// Publish the message
			producer.send(data);
		}
		// Close producer connection with broker.
		producer.close();
	}

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Producer<String, String> producer = createProducer();
		publishMessage(producer, "kafkatopic", 3);
		System.out.println("sent");
	}

}

Reply via email to