Hi Guozhang, I specifically to not want to do a get() on every future. This is equivalent to going "sync"ed, which is a performance killer for us.
What I am looking for is some way to tell a producer to forget about queued messages. I have done some more testing on my solution and it needs some work. That said, the reliance on a thread named "kafka-producer-network-thread" is something I don't like -- too much knowledge of the KafkaProducer's internals. Hoping for other suggestions... Andrew Stein On Fri, Sep 19, 2014 at 2:31 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Andrew, > > I think you would want a sync producer for your use case? You can try to > call get() on the returned metadata future of the send() call instead of > using a callback; the pattern is something like: > > for (message in messages) > producer.send(message).get() > > The get() call will block until the message response has received, and will > throw an exception if the response is "failure", you the then catch the > exception and pause the producer. > > Guozhang > > On Thu, Sep 18, 2014 at 6:30 PM, Andrew Stein < > andrew.st...@quantumretail.com> wrote: > > > I am trying to understand the best practices for working with the new > > (0.8.2) Producer interface. > > > > We have a process in a large server that writes a lot of data to Kafka. > > However, this data is not mission critical. When a problem arises writing > > to Kafka, most specifically network issues, but also full Producer > buffers, > > we want the server to continue working, but to stop sending data to > Kafka, > > allowing other tasks to continue. The issue I have is handling messages > > that have been "sent" to the producer but are waiting to go to Kafka. > These > > messages remain long after my processing is over, timing out, writing to > > the logs, and > > preventing me from moving forward. I am looking for some way to tell the > > client to stop forwarding messages to Kafka. > > > > This is what I have so far: > > > > class ErrorCallback implements Callback { > > @Override > > public void onCompletion(RecordMetadata metadata, Exception > > exception) { > > if (exception == null) { // The message was sent, > > return; > > } > > > > stopProducerSendAndClose(); > > String threadName = Thread.currentThread().getName(); > > if (!threadName.equals("kafka-producer-network-thread")) { // > > Some of the callbacks happen on my thread > > } else { // We are in KafkaProducer's ioThread ==> commit > > suicide. > > Thread.currentThread().interrupt(); > > throw new ThreadDeath(); // Cannot throw an Exception as > is > > will just be caught and logged. > > } > > } > > } > > > > My question is, is this the correct approach, or is there some other way > to > > stop sending messages (short of going "sync"ed). > > > > Andrew Stein > > > > > > -- > -- Guozhang >