[ https://issues.apache.org/jira/browse/KAFKA-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ewen Cheslack-Postava updated KAFKA-4832: ----------------------------------------- Fix Version/s: (was: 0.8.2.2) Removing invalid fix version, we'll need to revisit when this should be fixed/released. > kafka producer send Async message to the wrong IP cannot to stop > producer.close() > --------------------------------------------------------------------------------- > > Key: KAFKA-4832 > URL: https://issues.apache.org/jira/browse/KAFKA-4832 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.8.2.2 > Environment: JDK8 Eclipse Mars Win7 > Reporter: Wang Hong > > 1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches. > 2.I use javaapi.kafkaproducer designed by Factory. > 3.1 of 10 batches I take Producer.Connected() and Producer.Closed(). > 4.I know I send msg to a wrong IP finally, But I noticed the terminal was > blocking. It can't close normally. > function just like that : > public static void go(int s) throws Exception { > KafkaService kf = new KafkaServiceImpl();//init properties > for (int i = 0; i < 1400; i++) { > String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + > i; > System.out.println(msg); > kf.push(msg); //producer.send() > } > kf.closeProducerFactory();//producer.closed() > System.out.println(s); > Thread.sleep(1000); > } > kf.closeProducerFactory() is used by producer.closed(), > But Async send was always waiting for kafka server .I gave it a wrong IP. > I think it waits for long time Will bring problem with whole system.it occupy > resources. > And another problem was I sending kafka msg with true IP and Runnable > ,Threadpools, all is right .Also use ↑ examples for loop. > It take error that said wait for 3 tries. > I also configered > advertised.host.name=xxx.xxx.xxx.xxx > advertised.port=9092 > Now I think it maybe cannot get so much concurrent volume in a time. > Our System is over 1000tps. > Thank you . > Resource Code part: > package kafka.baseconfig; > import java.util.Properties; > import com.travelsky.util.ConFile; > import kafka.javaapi.producer.Producer; > import kafka.producer.KeyedMessage; > import kafka.producer.ProducerConfig; > /** > * kafka工厂模式 > * > * 1.替代Producer方法.//多线程效率不适合. > * 2.使用三部: > * ProducerFactory fac = new ProducerFactory(); > * fac.openProducer(); ->初始化对象 > * fac.push(msg); ->发消息主体 > * fac.closeProducer(); ->关闭对象 > * @author 王宏 > * > */ > public class ProducerFactory { > protected Producer<String, String> producer = null; > protected ConFile conf = null; > private Properties props = new Properties(); > private String topic = null; > { > try { > conf = new ConFile("KafkaProperties.conf"); > topic = conf.getString("topic"); > if (conf == null) { > throw new Exception("kafka配置文件有问题"); > } > } catch (Exception e) { > e.printStackTrace(); > } > } > > /** > * 发送消息方法 > * @param msg > */ > public void push(String msg) { > if (producer == null) { > throw new RuntimeException("producer实例为空"); > } > KeyedMessage<String, String> messageForSend = new > KeyedMessage<String, String>(topic, msg); > producer.send(messageForSend); > } > > /** > * 打开生产者 > */ > public void openProducer() { > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("metadata.broker.list", > conf.getString("kafkaserverurl")); > // 异步发送 > props.put("producer.type", conf.getString("synctype")); > // 每次发送多少条 > props.put("batch.num.messages", conf.getString("batchmsgnums")); > > // > props.put("request.required.acks", "1"); > // > props.put("queue.enqueue.timeout.ms", "1"); > // > props.put("request.timeout.ms", "1"); > // > props.put("timeout.ms", "1"); > // > props.put("reconnect.backoff.ms", "1"); > // > props.put("retry.backoff.ms", "1"); > // > props.put("message.send.max.retries", "1"); > // > props.put("retry.backoff.ms", "1"); > // > props.put("linger.ms", "1"); > // > props.put("max.block.ms", "1"); > // > props.put("metadata.fetch.timeout.ms", "1"); > // > props.put("metadata.max.age.ms", "1"); > // > props.put("metrics.sample.window.ms ", "1"); > producer = new Producer<String, String>(new > ProducerConfig(props)); > if (producer == null) { > throw new RuntimeException("kafka producer 打开失败"); > } > } > /** > * 关闭生产对象 > */ > public void closeProducer() { > if (producer != null) { > producer.close(); > } > } > /** > * 判断producer是否开启 > * @return > */ > public boolean isOpenProduer() { > return producer != null; > } > } > package kafka.service.impl; > import kafka.baseconfig.ProducerFactory; > import kafka.service.KafkaService; > public class KafkaServiceImpl implements KafkaService { > private ProducerFactory factory = null; > > public KafkaServiceImpl() { > factory = new ProducerFactory(); > factory.openProducer(); > } > > /** > * 往卡呼卡灌装数据并且可以修改topic > * @param msg 数据 > * @param topic 发送的主题 > * > * @Deprecated 這個方法已經過期.不建議使用. > */ > @Override > @Deprecated > public void push(String msg) throws Exception { > //new Producer(msg).start(); > if (factory.isOpenProduer()) { > factory.push(msg); > }else { > throw new RuntimeException("factory沒有初始化"); > } > } > > /** > * 过期方法 > * > * @param msg > * @param topic > * @throws Exception > */ > @Override > public void push(String msg, String topic) throws Exception { > //new Producer(msg, topic).start(); > if (factory.isOpenProduer()) { > factory.push(msg); > }else { > throw new RuntimeException("factory沒有初始化"); > } > } > > /** > * 释放资源. > */ > @Override > public void closeProducerFactory()throws Exception{ > if (factory.isOpenProduer()) { > factory.closeProducer(); > } > } > } > public static void main(String[] args) throws Exception { > long l = System.currentTimeMillis(); > for (int i = 0; i < 10; i++) { > go(i); > } > System.out.println(System.currentTimeMillis() - l); > } > public static void go(int s) throws Exception { > for (int i = 0; i < 1400; i++) { > KafkaService kf = new KafkaServiceImpl(); > String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + > i; > System.out.println(msg); > kf.push(msg); > kf.closeProducerFactory(); > } > System.out.println(s); > Thread.sleep(1000); > } -- This message was sent by Atlassian JIRA (v6.3.15#6346)