[ https://issues.apache.org/jira/browse/KAFKA-1913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Igor Khomenko updated KAFKA-1913: --------------------------------- Environment: OS X 10.10.1, Java 7, AWS Linux (was: OS X 10.10.1, Java 7) > App hungs when calls producer.send to wrong IP of Kafka broker > -------------------------------------------------------------- > > Key: KAFKA-1913 > URL: https://issues.apache.org/jira/browse/KAFKA-1913 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.8.1.1 > Environment: OS X 10.10.1, Java 7, AWS Linux > Reporter: Igor Khomenko > Assignee: Jun Rao > Fix For: 0.8.1.2 > > > I have next test code to check the Kafka functionality: > {code} > package com.company; > import kafka.common.FailedToSendMessageException; > import kafka.javaapi.producer.Producer; > import kafka.producer.KeyedMessage; > import kafka.producer.ProducerConfig; > import java.util.Date; > import java.util.Properties; > public class Main { > public static void main(String[] args) { > Properties props = new Properties(); > props.put("metadata.broker.list", "192.168.9.3:9092"); > props.put("serializer.class", "com.company.KafkaMessageSerializer"); > props.put("request.required.acks", "1"); > ProducerConfig config = new ProducerConfig(props); > // The first is the type of the Partition key, the second the type of > the message. > Producer<String, String> messagesProducer = new Producer<String, > String>(config); > // Send > String topicName = "my_messages"; > String message = "hello world"; > KeyedMessage<String, String> data = new KeyedMessage<String, > String>(topicName, message); > try { > System.out.println(new Date() + ": sending..."); > messagesProducer.send(data); > System.out.println(new Date() + ": sent"); > }catch (FailedToSendMessageException e){ > System.out.println("e: " + e); > e.printStackTrace(); > }catch (Exception exc){ > System.out.println("e: " + exc); > exc.printStackTrace(); > } > } > } > {code} > {code} > package com.company; > import kafka.serializer.Encoder; > import kafka.utils.VerifiableProperties; > /** > * Created by igorkhomenko on 2/2/15. > */ > public class KafkaMessageSerializer implements Encoder<String> { > public KafkaMessageSerializer(VerifiableProperties verifiableProperties) { > /* This constructor must be present for successful compile. */ > } > @Override > public byte[] toBytes(String entity) { > byte [] serializedMessage = doCustomSerialization(entity); > return serializedMessage; > } > private byte[] doCustomSerialization(String entity) { > return entity.getBytes(); > } > } > {code} > Here is also GitHub version https://github.com/soulfly/Kafka-java-producer > So it just hungs on next line: > {code} > messagesProducer.send(data) > {code} > When I replaced the brokerlist to > {code} > props.put("metadata.broker.list", "localhost:9092"); > {code} > then I got an exception: > {code} > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > {code} > so it's okay > Why it hungs with wrong brokerlist? Any ideas? -- This message was sent by Atlassian JIRA (v6.3.4#6332)