[ 
https://issues.apache.org/jira/browse/KAFKA-2060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Sobel resolved KAFKA-2060.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 0.9.0.2

Per notes this was fixed with the later Kafka libs

> Async onCompletion callback may not be called
> ---------------------------------------------
>
>                 Key: KAFKA-2060
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2060
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.8.1.2
>         Environment: All
>            Reporter: Bill Sobel
>            Priority: Critical
>              Labels: easyfix
>             Fix For: 0.9.0.2
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 'done' function in RecordBatch.java attempts to enumerate and call each 
> onCompletion() callback.  However the call to thunk.future.get() can throw an 
> exception.  When this occurs the callback is not invoked.  This appears to be 
> the only place where a callback per async send would not occur and the 
> callback orphaned.
> The call to thunk.future.get() appears to need to occur in its own try/catch 
> and then the onCompletion called with the results if it doesn't throw an 
> exception or thunk.callback.onCompletion(null, recordException) if it does.
> e.g.
>     /**
>      * Complete the request
>      * 
>      * @param baseOffset The base offset of the messages assigned by the 
> server
>      * @param exception The exception that occurred (or null if the request 
> was successful)
>      */
>     public void done(long baseOffset, RuntimeException exception) {
>         this.produceFuture.done(topicPartition, baseOffset, exception);
>         log.trace("Produced messages to topic-partition {} with base offset 
> offset {} and error: {}.",
>                   topicPartition,
>                   baseOffset,
>                   exception);
>         // execute callbacks
>         for (int i = 0; i < this.thunks.size(); i++) {
>             try {
>                 Thunk thunk = this.thunks.get(i);
>                 if (exception == null) {
>                         RecordMetadata rc = null;
>                         try {
>                                 rc = thunk.future.get();
>                         }
>                          catch(Exception recordException) {
>                                 thunk.callback.onCompletion(null, 
> recordException);
>                         }
>                         if(rc != null) {
>                                 thunk.callback.onCompletion(rc, null);
>                         }
>                 }
>                  else {
>                      thunk.callback.onCompletion(null, exception);
>                  }
>             } catch (Exception e) {
>                 log.error("Error executing user-provided callback on message 
> for topic-partition {}:", topicPartition, e);
>             }
>         }
>     }



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to