Hi,
If you are using the Scala Producer then yes it will drop messages. It will
try up to num retries times and then throw a FailedToSendMessageException.
This is caught in the ProducerSendThread and logged, you'd see something
like:
"Error in handling batch of 10 events ..."

If you don't want to drop messages (who does?) then i suggest using the
sync producer and doing your own batching.

Cheers,
Damian

On 8 November 2015 at 08:16, Li Tao <ahumbleco...@gmail.com> wrote:

> Hi, according to my undersanding, your scenario does not apply here. Async
> does not mean it buffers message when connection is lost(you killed the
> broker). If the connection is down, the producer should detect it as a
> exceptional condition, and throw this exception to application level to
> handle it.
>
> Correct me if I am wrong.
>
> On Sat, Oct 31, 2015 at 4:25 AM, Ilya Goberman <igober...@kcg.com> wrote:
>
> > I am new to kafka and apologize if this is already answered. I am testing
> > a simple async publisher behavior when broker is down. I use kafka
> version
> > 8.2.2.
> >
> >
> > I have set up "queue.buffering.max.messages" to 200 and "
> > queue.enqueue.timeout.ms" set to -1. My understanding is that if "
> > queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send'
> > should block when queue of 200 is reached. But this is not what I am
> seeing.
> >
> >
> > My publisher has these properties.
> >         Properties props = new Properties();
> >         props.put("metadata.broker.list", "cno-d-igoberman2:9092");
> >         props.put("serializer.class", "kafka.serializer.StringEncoder");
> >         props.put("producer.type", "async");
> >         props.put("partitioner.class",
> > "com.kcg.kafka.test.SimplePartitioner");
> >         props.put("request.required.acks", "1");
> >         props.put("queue.buffering.max.messages", "200");
> >         props.put("queue.enqueue.timeout.ms", "-1");
> >
> >
> > This is scenario I am testing:
> >
> > 1) start broker.
> >
> > 2) start publishing in a loop.
> >
> > 3) kill broker.
> >
> >
> > At this point my producer keeps calling 'producer.send' without blocking
> > (but slows down considerably). I suspect that messages are lost - but
> this
> > is not what I want. Is this a known limitation of producers in kafka?
> >
> > Any help in clarifying it will be appreciated. Also, I understand that
> > producers are in the process of being redesigned in the next release.
> When
> > will it be available? Should I even bother with the current version?
> >
> > Thanks
> >
> >
> > This is what I am seeing in the log:
> >
> >
> > 2015-10-30 14:50:29 INFO  SyncProducer:68 - Disconnecting from
> > cno-d-igoberman2:9092
> > 2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics
> > [Set(test)] from broker
> [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> > failed
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test)] from broker
> [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> > failed
> >     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> >     at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >     at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> >     at kafka.utils.Utils$.swallow(Utils.scala:172)
> >     at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> >     at kafka.utils.Utils$.swallowError(Utils.scala:45)
> >     at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> >     at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> >     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >     at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> >     at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.nio.channels.ClosedChannelException
> >     at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> >     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> >     at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> >     at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> >     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> >     ... 12 more
> > 2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send
> requests
> > for topics test with correlation ids in [0,8]
> > 2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling
> batch
> > of 5 events
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> >     at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> >     at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> >     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >     at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> >     at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > 2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic
> > test, partition key: 5, data: 1446234628741: 5
> > 2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event:
> > KeyedMessage(test,6,6,1446234629741: 6)
> > 2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200
> >
> >
> >
> > This e-mail and its attachments are intended only for the individual or
> > entity to whom it is addressed and may contain information that is
> > confidential, privileged, inside information, or subject to other
> > restrictions on use or disclosure. Any unauthorized use, dissemination or
> > copying of this transmission or the information in it is prohibited and
> may
> > be unlawful. If you have received this transmission in error, please
> notify
> > the sender immediately by return e-mail, and permanently delete or
> destroy
> > this e-mail, any attachments, and all copies (digital or paper). Unless
> > expressly stated in this e-mail, nothing in this message should be
> > construed as a digital or electronic signature. For additional important
> > disclaimers and disclosures regarding KCG's products and services, please
> > click on the following link:
> >
> > http://www.kcg.com/legal/global-disclosures
> >
>

Reply via email to