[ 
https://issues.apache.org/jira/browse/KAFKA-1659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14320673#comment-14320673
 ] 

Guozhang Wang commented on KAFKA-1659:
--------------------------------------

One big use-case for this function call would be preserving ordering in the 
producer: say we have call send() for message 1, 2 and 3, when we get an error 
in the callback for message 2, we do not want the producer to continue sending 
3 after it finishes this callback, and one way would be aborting the producer 
in the callback. Of course an even better solution would be an API such like 
drop() that cleans up the messages with the corresponding topic/partition in 
the buffer.

If we are pursuing the abort() function, since it may be called in the 
callback, killing the io-thread right away may not work appropriately.

An alternative approach would be adding an aborted flag to Sender, and have

{code}
public void abort() {
  this.sender.abort();
  this.close()
}
{code}

and in the Sender class, after the main loop check if aborted == true, if yes, 
clean up the buffer and return immediately; otherwise flush the buffer and 
return.

> Ability to cleanly abort the KafkaProducer
> ------------------------------------------
>
>                 Key: KAFKA-1659
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1659
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, producer 
>    Affects Versions: 0.8.2.0
>            Reporter: Andrew Stein
>            Assignee: Jun Rao
>             Fix For: 0.8.3
>
>
> I would like the ability to "abort" the Java Client's KafkaProducer. This 
> includes the stopping the writing of buffered records.
> The motivation for this is described 
> [here|http://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3CCAOk4UxB7BJm6HSgLXrR01sksB2dOC3zdt0NHaKHz1EALR6%3DCTQ%40mail.gmail.com%3E].
> A sketch of this method is:
> {code}
> public void abort() {
>         try {
>             ioThread.interrupt();
>             ioThread.stop(new ThreadDeath());
>         } catch (IllegalAccessException e) {
>         }
> }
> {code}
> but of course it is preferable to stop the {{ioThread}} by cooperation, 
> rather than use the deprecated {{Thread.stop(new ThreadDeath())}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to