Guozhang, Thank you very much for your reply. It is good to know that I have not overlooked something simple in the interface.
How to I "open a jira". The major change that I would like to see is to have a "KafkaProducer.abort()" method that closes the producer immediately, aborting attempts to send any buffered messages. There are other nice to haves (such as "KafkaProducer.close(timeout)", and the returning of an abort indicator from the callback). Thanks once more, Andrew Stein On Wed, Sep 24, 2014 at 7:48 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Andrew, > > Sorry for the late reply. I will just reply in this thread (I saw you have > asked in a different thread also). > > The way callbacks can be triggered is the following: > > 1. The caller thread may throw any ApiException of producer.send(), which > will be captured to trigger the callback function; this includes exceptions > 1) record too large, 2) metadata refresh timed out, etc; in general any > ApiException that will cause the produce request to not be sent to the > broker at all will make the caller thread to trigger the callback. > > 2. Besides ApiException, a KafkaException can also be thrown in > producer.send(), this type of exception covers all kafka logic errors, and > will be re-thrown in the caller thread directly. > > 3. When the produce request does get sent to the broker and an error > response is received, the producer's send thread will trigger the callback > as well. This includes 1) leader not available, 2) message size too large, > 3) request timed out, etc. > > So from the error handling point of view that we want to stop future > sending when some errors encountered, for case 2) you can catch the > exception and call producer.close(); for 1) and 3) it will be a bit tricky > as you observed, since it can be triggered by different threads, and you > cannot directly call producer.close() in case 3). I cannot think of a > better way than your current approach either, and forcing the thread to die > directly will also lose data that is already buffered inside the producer. > > I think this is a valid question of new producer's error handling, and we > can open a jira in order to improve on it. > > Guozhang > > > > On Fri, Sep 19, 2014 at 2:17 PM, Andrew Stein < > andrew.st...@quantumretail.com> wrote: > >> Hi Guozhang, >> >> So In my callback above, I have been logging the thread. Sometimes it is >> my thread (that is, the producer thread) and sometimes it is >> "kafka-producer-network-thread" >> (that is the ioThread created by KafkaProducer). I cannot kill my thread >> but I do kill "kafka-producer-network-thread" >> >> This is in the 0.8.2 code built off git >> commit c892c08df01c3dfc2d466bd37a6838072d390819 >> >> Andrew Stein >> >> Email: andrew.st...@quantumretail.com >> Skype: AndrewStein9 >> >> On Fri, Sep 19, 2014 at 4:12 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >>> I see. For your code, is it possible that the callback can be triggered >>> outside the producer thread? As I understands it the callback should only >>> be called by the producer thread. >>> >>> Guozhang >>> >>> On Fri, Sep 19, 2014 at 1:47 PM, Andrew Stein < >>> andrew.st...@quantumretail.com> wrote: >>> >>>> Hi Guozhang, >>>> >>>> I specifically to not want to do a get() on every future. This is >>>> equivalent to going "sync"ed, which is a performance killer for us. >>>> >>>> What I am looking for is some way to tell a producer to forget about >>>> queued messages. I have done some more testing on my solution and it needs >>>> some work. That said, the reliance on a thread named >>>> "kafka-producer-network-thread" is something I don't like -- too much >>>> knowledge of the KafkaProducer's internals. >>>> >>>> Hoping for other suggestions... >>>> >>>> >>>> Andrew Stein >>>> >>>> >>>> On Fri, Sep 19, 2014 at 2:31 PM, Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>> >>>>> Hello Andrew, >>>>> >>>>> I think you would want a sync producer for your use case? You can try >>>>> to >>>>> call get() on the returned metadata future of the send() call instead >>>>> of >>>>> using a callback; the pattern is something like: >>>>> >>>>> for (message in messages) >>>>> producer.send(message).get() >>>>> >>>>> The get() call will block until the message response has received, and >>>>> will >>>>> throw an exception if the response is "failure", you the then catch the >>>>> exception and pause the producer. >>>>> >>>>> Guozhang >>>>> >>>>> On Thu, Sep 18, 2014 at 6:30 PM, Andrew Stein < >>>>> andrew.st...@quantumretail.com> wrote: >>>>> >>>>> > I am trying to understand the best practices for working with the new >>>>> > (0.8.2) Producer interface. >>>>> > >>>>> > We have a process in a large server that writes a lot of data to >>>>> Kafka. >>>>> > However, this data is not mission critical. When a problem arises >>>>> writing >>>>> > to Kafka, most specifically network issues, but also full Producer >>>>> buffers, >>>>> > we want the server to continue working, but to stop sending data to >>>>> Kafka, >>>>> > allowing other tasks to continue. The issue I have is handling >>>>> messages >>>>> > that have been "sent" to the producer but are waiting to go to >>>>> Kafka. These >>>>> > messages remain long after my processing is over, timing out, >>>>> writing to >>>>> > the logs, and >>>>> > preventing me from moving forward. I am looking for some way to tell >>>>> the >>>>> > client to stop forwarding messages to Kafka. >>>>> > >>>>> > This is what I have so far: >>>>> > >>>>> > class ErrorCallback implements Callback { >>>>> > @Override >>>>> > public void onCompletion(RecordMetadata metadata, Exception >>>>> > exception) { >>>>> > if (exception == null) { // The message was sent, >>>>> > return; >>>>> > } >>>>> > >>>>> > stopProducerSendAndClose(); >>>>> > String threadName = Thread.currentThread().getName(); >>>>> > if (!threadName.equals("kafka-producer-network-thread")) >>>>> { // >>>>> > Some of the callbacks happen on my thread >>>>> > } else { // We are in KafkaProducer's ioThread ==> commit >>>>> > suicide. >>>>> > Thread.currentThread().interrupt(); >>>>> > throw new ThreadDeath(); // Cannot throw an >>>>> Exception as is >>>>> > will just be caught and logged. >>>>> > } >>>>> > } >>>>> > } >>>>> > >>>>> > My question is, is this the correct approach, or is there some other >>>>> way to >>>>> > stop sending messages (short of going "sync"ed). >>>>> > >>>>> > Andrew Stein >>>>> > >>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> > > > -- > -- Guozhang >