Hi, If you are using the Scala Producer then yes it will drop messages. It will try up to num retries times and then throw a FailedToSendMessageException. This is caught in the ProducerSendThread and logged, you'd see something like: "Error in handling batch of 10 events ..."
If you don't want to drop messages (who does?) then i suggest using the sync producer and doing your own batching. Cheers, Damian On 8 November 2015 at 08:16, Li Tao <ahumbleco...@gmail.com> wrote: > Hi, according to my undersanding, your scenario does not apply here. Async > does not mean it buffers message when connection is lost(you killed the > broker). If the connection is down, the producer should detect it as a > exceptional condition, and throw this exception to application level to > handle it. > > Correct me if I am wrong. > > On Sat, Oct 31, 2015 at 4:25 AM, Ilya Goberman <igober...@kcg.com> wrote: > > > I am new to kafka and apologize if this is already answered. I am testing > > a simple async publisher behavior when broker is down. I use kafka > version > > 8.2.2. > > > > > > I have set up "queue.buffering.max.messages" to 200 and " > > queue.enqueue.timeout.ms" set to -1. My understanding is that if " > > queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send' > > should block when queue of 200 is reached. But this is not what I am > seeing. > > > > > > My publisher has these properties. > > Properties props = new Properties(); > > props.put("metadata.broker.list", "cno-d-igoberman2:9092"); > > props.put("serializer.class", "kafka.serializer.StringEncoder"); > > props.put("producer.type", "async"); > > props.put("partitioner.class", > > "com.kcg.kafka.test.SimplePartitioner"); > > props.put("request.required.acks", "1"); > > props.put("queue.buffering.max.messages", "200"); > > props.put("queue.enqueue.timeout.ms", "-1"); > > > > > > This is scenario I am testing: > > > > 1) start broker. > > > > 2) start publishing in a loop. > > > > 3) kill broker. > > > > > > At this point my producer keeps calling 'producer.send' without blocking > > (but slows down considerably). I suspect that messages are lost - but > this > > is not what I want. Is this a known limitation of producers in kafka? > > > > Any help in clarifying it will be appreciated. Also, I understand that > > producers are in the process of being redesigned in the next release. > When > > will it be available? Should I even bother with the current version? > > > > Thanks > > > > > > This is what I am seeing in the log: > > > > > > 2015-10-30 14:50:29 INFO SyncProducer:68 - Disconnecting from > > cno-d-igoberman2:9092 > > 2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics > > [Set(test)] from broker > [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] > > failed > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(test)] from broker > [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] > > failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > > at > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > Caused by: java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > > at > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 12 more > > 2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send > requests > > for topics test with correlation ids in [0,8] > > 2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling > batch > > of 5 events > > kafka.common.FailedToSendMessageException: Failed to send messages after > 3 > > tries. > > at > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > > at > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > > at > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > > at > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > at > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > > at > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > 2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic > > test, partition key: 5, data: 1446234628741: 5 > > 2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event: > > KeyedMessage(test,6,6,1446234629741: 6) > > 2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200 > > > > > > > > This e-mail and its attachments are intended only for the individual or > > entity to whom it is addressed and may contain information that is > > confidential, privileged, inside information, or subject to other > > restrictions on use or disclosure. Any unauthorized use, dissemination or > > copying of this transmission or the information in it is prohibited and > may > > be unlawful. If you have received this transmission in error, please > notify > > the sender immediately by return e-mail, and permanently delete or > destroy > > this e-mail, any attachments, and all copies (digital or paper). Unless > > expressly stated in this e-mail, nothing in this message should be > > construed as a digital or electronic signature. For additional important > > disclaimers and disclosures regarding KCG's products and services, please > > click on the following link: > > > > http://www.kcg.com/legal/global-disclosures > > >