tombentley commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r662837733



##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -247,17 +138,32 @@ public boolean completeExceptionally(Throwable 
newException) {
      */
     @Override
     public synchronized boolean cancel(boolean mayInterruptIfRunning) {
-        return completeExceptionally(new CancellationException()) || exception 
instanceof CancellationException;
+        return completableFuture.cancel(mayInterruptIfRunning);
+    }
+
+    // We need to deal with differences between KF's historic API and the API 
of CF:
+    // CF#get() does not wrap CancellationException in ExecutionException (nor 
does KF).
+    // CF#get() always wraps the _cause_ of a CompletionException in 
ExecutionException (which KF does not).
+    //
+    // The semantics for KafkaFuture are that all exceptional completions of 
the future (via #completeExceptionally() or exceptions from dependants)
+    // manifest as ExecutionException, as observed via both get() and getNow().
+    private void maybeRewrapAndThrow(Throwable cause) {
+        if (cause instanceof CancellationException) {
+            throw (CancellationException) cause;
+        }
     }
 
     /**
      * Waits if necessary for this future to complete, and then returns its 
result.
      */
     @Override
     public T get() throws InterruptedException, ExecutionException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await();
+        try {
+            return completableFuture.get();
+        } catch (ExecutionException e) {
+            maybeRewrapAndThrow(e.getCause());
+            throw e;

Review comment:
       I agree it's a bit messy, but I don't think that works. The call site in 
`getNow()` differs from the others:
   
   1. It's catching `CompletionException`, not `ExecutionException`
   2. In the case there `maybeRewrapAndThrow` didn't throw it needs to wrap the 
cause in an `ExceptionException`, unlike the other call sites, to conform with 
the historical API of KafkaFuture. 
   
   The first would require `maybeRewrapAndThrow()` to have an `Exception`-typed 
parameter, which, being a checked exception, means we'd need either to declare 
`throws Exception`, or we'd need to type cast within `maybeRewrapAndThrow()` 
and be left with an `else` case to deal with. 
   
   The second is difficult to abstract over nicely for the other call sites: 
We'd need to pass a boolean to indicate whether the `ExceptionException` was 
needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to