philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1384076305


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
     @Override
     public void unsubscribe() {
         fetchBuffer.retainAll(Collections.emptySet());
-        subscriptions.unsubscribe();
+        UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+        applicationEventHandler.add(unsubscribeApplicationEvent);
+        unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+            if (error != null) {
+                // Callback failed - Keeping same exception message thrown by 
the legacy consumer
+                throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   I think this KafkaException might not be in the right place because the 
rebalance listener needs to be invoked on the mainthread, probably before 
sending out the event.  I wonder if we could just remove this whenComplete and 
rely on the background thread to log the failures during the leave group event. 
 If there's a fatal exception being thrown there, it seems the sensible way is 
to enqueue to the BackgroundEventQueue and handle in the poll.  wdyt?
   
   In the current code path, I think only exceptions can only be thrown in 
`onLeavePrepare`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to