Igor Khomenko created KAFKA-1913:
------------------------------------
Summary: App hungs when calls producer.send to wrong IP of Kafka
broker
Key: KAFKA-1913
URL: https://issues.apache.org/jira/browse/KAFKA-1913
Project: Kafka
Issue Type: Bug
Components: producer
Affects Versions: 0.8.1.1
Environment: OS X 10.10.1, Java 7
Reporter: Igor Khomenko
Assignee: Jun Rao
Fix For: 0.8.1.2
I have next test code to check the Kafka functionality:
{code}
package com.company;
import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.9.3:9092");
props.put("serializer.class", "com.company.KafkaMessageSerializer");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
// The first is the type of the Partition key, the second the type of
the message.
Producer<String, String> messagesProducer = new Producer<String,
String>(config);
// Send
String topicName = "my_messages";
String message = "hello world";
KeyedMessage<String, String> data = new KeyedMessage<String,
String>(topicName, message);
try {
System.out.println(new Date() + ": sending...");
messagesProducer.send(data);
System.out.println(new Date() + ": sent");
}catch (FailedToSendMessageException e){
System.out.println("e: " + e);
e.printStackTrace();
}catch (Exception exc){
System.out.println("e: " + exc);
exc.printStackTrace();
}
}
}
{code}
{code}
package com.company;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
/**
* Created by igorkhomenko on 2/2/15.
*/
public class KafkaMessageSerializer implements Encoder<String> {
public KafkaMessageSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(String entity) {
byte [] serializedMessage = doCustomSerialization(entity);
return serializedMessage;
}
private byte[] doCustomSerialization(String entity) {
return entity.getBytes();
}
}
{code}
Here is also GitHub version https://github.com/soulfly/Kafka-java-producer
So it just hungs on next line:
{code}
messagesProducer.send(data)
{code}
When I replaced the brokerlist to
{code}
props.put("metadata.broker.list", "localhost:9092");
{code}
then I got an exception:
{code}
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
{code}
so it's okay
Why it hungs with wrong brokerlist? Any ideas?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)