lianetm commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1415963877
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -979,12 +1026,124 @@ private CompletableFuture<Void>
invokeOnPartitionsLostCallback(Set<TopicPartitio
// behaviour.
Optional<ConsumerRebalanceListener> listener =
subscriptions.rebalanceListener();
if (!partitionsLost.isEmpty() && listener.isPresent()) {
- throw new UnsupportedOperationException("User-defined callbacks
not supported yet");
+ return enqueueConsumerRebalanceListenerCallback(onPartitionsLost,
partitionsLost);
} else {
return CompletableFuture.completedFuture(null);
}
}
+ /**
+ * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to
trigger the execution of the
+ * appropriate {@link ConsumerRebalanceListener} {@link
ConsumerRebalanceListenerMethodName method} on the
+ * application thread.
+ *
+ * <p/>
+ *
+ * This method is essentially "giving" the baton from the background
thread to the application thread for
+ * processing of the reconciliation logic. It will "receive" the "baton"
back via the
+ * {@link
#consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName,
Optional)} method.
+ *
+ * <p/>
+ *
+ * Because the reconciliation process (run in the background thread) will
be blocked by the application thread
+ * until it completes this, we need to leave a {@link
ConsumerRebalanceListenerCallbackBreadcrumb breadcrumb}
+ * by which to remember where we left off.
+ *
+ * @param methodName Callback method that needs to be executed on the
application thread
+ * @param partitions Partitions to supply to the callback method
+ * @return Future that will be chained within the rest of the
reconciliation logic
+ */
+ private CompletableFuture<Void>
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName
methodName,
+
Set<TopicPartition> partitions) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ConsumerRebalanceListenerCallbackBreadcrumb newBreadcrumb = new
ConsumerRebalanceListenerCallbackBreadcrumb(
+ methodName,
+ future
+ );
+
+ if (breadcrumbRef.compareAndSet(null, newBreadcrumb)) {
Review Comment:
Agree. During the reconciliation the state machine ensures that only one
callback is triggered at a time. But member could leave (triggers
`onPartitionsRevoked`), or get fenced (triggering `onPartitionsLost`) at any
time (ex. during a reconciliation). Thus we could have multiple callbacks being
executed at the same time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]