lianetm commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1415963877


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -979,12 +1026,124 @@ private CompletableFuture<Void> 
invokeOnPartitionsLostCallback(Set<TopicPartitio
         // behaviour.
         Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
         if (!partitionsLost.isEmpty() && listener.isPresent()) {
-            throw new UnsupportedOperationException("User-defined callbacks 
not supported yet");
+            return enqueueConsumerRebalanceListenerCallback(onPartitionsLost, 
partitionsLost);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
+    /**
+     * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to 
trigger the execution of the
+     * appropriate {@link ConsumerRebalanceListener} {@link 
ConsumerRebalanceListenerMethodName method} on the
+     * application thread.
+     *
+     * <p/>
+     *
+     * This method is essentially "giving" the baton from the background 
thread to the application thread for
+     * processing of the reconciliation logic. It will "receive" the "baton" 
back via the
+     * {@link 
#consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName,
 Optional)} method.
+     *
+     * <p/>
+     *
+     * Because the reconciliation process (run in the background thread) will 
be blocked by the application thread
+     * until it completes this, we need to leave a {@link 
ConsumerRebalanceListenerCallbackBreadcrumb breadcrumb}
+     * by which to remember where we left off.
+     *
+     * @param methodName Callback method that needs to be executed on the 
application thread
+     * @param partitions Partitions to supply to the callback method
+     * @return Future that will be chained within the rest of the 
reconciliation logic
+     */
+    private CompletableFuture<Void> 
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName 
methodName,
+                                                                             
Set<TopicPartition> partitions) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ConsumerRebalanceListenerCallbackBreadcrumb newBreadcrumb = new 
ConsumerRebalanceListenerCallbackBreadcrumb(
+            methodName,
+            future
+        );
+
+        if (breadcrumbRef.compareAndSet(null, newBreadcrumb)) {

Review Comment:
   Agree. During the reconciliation the state machine ensures that only one 
callback is triggered at a time. But member could leave (triggers 
`onPartitionsRevoked`), or get fenced (triggering `onPartitionsLost`) at any 
time (ex. during a reconciliation). Thus we could have multiple callbacks being 
executed at the same time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to