kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1412693090
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -979,12 +1026,124 @@ private CompletableFuture<Void>
invokeOnPartitionsLostCallback(Set<TopicPartitio
// behaviour.
Optional<ConsumerRebalanceListener> listener =
subscriptions.rebalanceListener();
if (!partitionsLost.isEmpty() && listener.isPresent()) {
- throw new UnsupportedOperationException("User-defined callbacks
not supported yet");
+ return enqueueConsumerRebalanceListenerCallback(onPartitionsLost,
partitionsLost);
} else {
return CompletableFuture.completedFuture(null);
}
}
+ /**
+ * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to
trigger the execution of the
+ * appropriate {@link ConsumerRebalanceListener} {@link
ConsumerRebalanceListenerMethodName method} on the
+ * application thread.
+ *
+ * <p/>
+ *
+ * This method is essentially "giving" the baton from the background
thread to the application thread for
+ * processing of the reconciliation logic. It will "receive" the "baton"
back via the
+ * {@link
#consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName,
Optional)} method.
+ *
+ * <p/>
+ *
+ * Because the reconciliation process (run in the background thread) will
be blocked by the application thread
+ * until it completes this, we need to leave a {@link
ConsumerRebalanceListenerCallbackBreadcrumb breadcrumb}
+ * by which to remember where we left off.
+ *
+ * @param methodName Callback method that needs to be executed on the
application thread
+ * @param partitions Partitions to supply to the callback method
+ * @return Future that will be chained within the rest of the
reconciliation logic
+ */
+ private CompletableFuture<Void>
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName
methodName,
+
Set<TopicPartition> partitions) {
+ if (breadcrumb != null) {
+ // In this case, there was already an existing breadcrumb, so we
need to report the matter back to the user.
+ String s = "An internal error occurred; an attempt to schedule the
" +
+ methodName + " method for execution during rebalancing
failed because " +
+ breadcrumb.methodName + " was already scheduled";
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(new KafkaException(s));
+ return future;
+ }
+
+ // This is the happy path—there isn't an existing breadcrumb, so we
can schedule our new event
+ // without hesitation.
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ breadcrumb = new
ConsumerRebalanceListenerCallbackBreadcrumb(methodName, future);
+ SortedSet<TopicPartition> sortedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ sortedPartitions.addAll(partitions);
+ BackgroundEvent event = new
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+ backgroundEventHandler.add(event);
+ log.debug("The event to trigger the {} method execution was enqueued
successfully", methodName);
+
+ return future;
+ }
+
+ /**
+ * Signals that a {@link ConsumerRebalanceListener} callback has
completed. This is invoked when the
+ * application thread has completed the callback and has submitted a
+ * {@link ConsumerRebalanceListenerCallbackCompletedEvent} to the network
I/O thread. At this point, we
+ * notify the state machine that it's complete so that it can move to the
next appropriate step of the
+ * rebalance process.
+ *
+ * <p/>
+ *
+ * This method is "receiving" the baton back from the application thread
after having "given" it to the
+ * application thread via the
+ * {@link
#enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName,
Set)} method.
+ *
+ * @param methodName Method name of the callback that was executed
+ * @param error Optional error that was thrown by the callback, captured,
and forwarded here
+ */
+ @Override
+ public void
consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName
methodName,
+
Optional<KafkaException> error) {
+ if (breadcrumb == null) {
+ // In this case, we're somehow completing a callback for which we
don't have a recorded breadcrumb.
+ // Because of that, we don't have a Future that can be completed,
so we're left having to report it
+ // back to the user asynchronously.
+ String s = "An internal error occurred; the " + methodName + "
method was executed " +
+ "during rebalancing, but there was no record of it being
scheduled";
+ backgroundEventHandler.add(new ErrorBackgroundEvent(new
KafkaException(s)));
+ return;
+ }
Review Comment:
Reviewer note: Any suggestions on a better way to handle this (hopefully
unlikely) case is welcomed.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -650,7 +693,7 @@ boolean reconcile() {
"\tCurrent owned partitions: {}\n" +
"\tAdded partitions (assigned - owned): {}\n" +
"\tRevoked partitions (owned - assigned): {}\n",
- assignedTopicIdPartitions,
+ assignedTopicPartitions,
Review Comment:
Reviewer note: I made this change so that all of the output used
`TopicPartition` instead of the one use of the `TopicIdPartition`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]