philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1384016549


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
     @Override
     public void unsubscribe() {
         fetchBuffer.retainAll(Collections.emptySet());
-        subscriptions.unsubscribe();
+        UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+        applicationEventHandler.add(unsubscribeApplicationEvent);
+        unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+            if (error != null) {
+                // Callback failed - Keeping same exception message thrown by 
the legacy consumer
+                throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   I think unsubscribe() actually blocks on callback invocation and throw if 
possible.  Instead of putting the logic in whenComplete, it seems like we 
should try to wait till the callback completes then throw if needed.  I assume 
we want to maintain this behavior for the async consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to