philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383998319


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
     @Override
     public void unsubscribe() {
         fetchBuffer.retainAll(Collections.emptySet());
-        subscriptions.unsubscribe();
+        UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+        applicationEventHandler.add(unsubscribeApplicationEvent);
+        unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+            if (error != null) {
+                // Callback failed - Keeping same exception message thrown by 
the legacy consumer
+                throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   The current consumer throws KafkaException on unsubscribe(), I think 
throwing in the whenComplete doesn't actually throw.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to