[ https://issues.apache.org/jira/browse/KAFKA-2060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bill Sobel resolved KAFKA-2060. ------------------------------- Resolution: Fixed Fix Version/s: 0.9.0.2 Per notes this was fixed with the later Kafka libs > Async onCompletion callback may not be called > --------------------------------------------- > > Key: KAFKA-2060 > URL: https://issues.apache.org/jira/browse/KAFKA-2060 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.8.1.2 > Environment: All > Reporter: Bill Sobel > Priority: Critical > Labels: easyfix > Fix For: 0.9.0.2 > > Original Estimate: 1h > Remaining Estimate: 1h > > The 'done' function in RecordBatch.java attempts to enumerate and call each > onCompletion() callback. However the call to thunk.future.get() can throw an > exception. When this occurs the callback is not invoked. This appears to be > the only place where a callback per async send would not occur and the > callback orphaned. > The call to thunk.future.get() appears to need to occur in its own try/catch > and then the onCompletion called with the results if it doesn't throw an > exception or thunk.callback.onCompletion(null, recordException) if it does. > e.g. > /** > * Complete the request > * > * @param baseOffset The base offset of the messages assigned by the > server > * @param exception The exception that occurred (or null if the request > was successful) > */ > public void done(long baseOffset, RuntimeException exception) { > this.produceFuture.done(topicPartition, baseOffset, exception); > log.trace("Produced messages to topic-partition {} with base offset > offset {} and error: {}.", > topicPartition, > baseOffset, > exception); > // execute callbacks > for (int i = 0; i < this.thunks.size(); i++) { > try { > Thunk thunk = this.thunks.get(i); > if (exception == null) { > RecordMetadata rc = null; > try { > rc = thunk.future.get(); > } > catch(Exception recordException) { > thunk.callback.onCompletion(null, > recordException); > } > if(rc != null) { > thunk.callback.onCompletion(rc, null); > } > } > else { > thunk.callback.onCompletion(null, exception); > } > } catch (Exception e) { > log.error("Error executing user-provided callback on message > for topic-partition {}:", topicPartition, e); > } > } > } -- This message was sent by Atlassian JIRA (v6.3.15#6346)