lianetm commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1419165645
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -971,12 +989,59 @@ private CompletableFuture<Void>
invokeOnPartitionsLostCallback(Set<TopicPartitio
// behaviour.
Optional<ConsumerRebalanceListener> listener =
subscriptions.rebalanceListener();
if (!partitionsLost.isEmpty() && listener.isPresent()) {
- throw new UnsupportedOperationException("User-defined callbacks
not supported yet");
+ return
enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_LOST, partitionsLost);
} else {
return CompletableFuture.completedFuture(null);
}
}
+ /**
+ * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to
trigger the execution of the
+ * appropriate {@link ConsumerRebalanceListener} {@link
ConsumerRebalanceListenerMethodName method} on the
+ * application thread.
+ *
+ * <p/>
+ *
+ * Because the reconciliation process (run in the background thread) will
be blocked by the application thread
+ * until it completes this, we need to provide a {@link CompletableFuture}
by which to remember where we left off.
+ *
+ * @param methodName Callback method that needs to be executed on the
application thread
+ * @param partitions Partitions to supply to the callback method
+ * @return Future that will be chained within the rest of the
reconciliation logic
+ */
+ private CompletableFuture<Void>
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName
methodName,
+
Set<TopicPartition> partitions) {
+ SortedSet<TopicPartition> sortedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ sortedPartitions.addAll(partitions);
+ CompletableBackgroundEvent<Void> event = new
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+ backgroundEventHandler.add(event);
+ log.debug("The event to trigger the {} method execution was enqueued
successfully", methodName.fullyQualifiedMethodName());
+ return event.future();
+ }
+
+ @Override
+ public void
consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent
event) {
+ ConsumerRebalanceListenerMethodName methodName = event.methodName();
+ Optional<KafkaException> error = event.error();
+ CompletableFuture<Void> future = event.future();
+
+ if (error.isPresent()) {
+ String message = error.get().getMessage();
+ log.warn(
+ "The {} method completed with an error ({}); signaling to
continue to the next phase of rebalance",
+ methodName.fullyQualifiedMethodName(),
+ message
+ );
+ } else {
+ log.debug(
+ "The {} method completed successfully; signaling to continue
to the next phase of rebalance",
+ methodName.fullyQualifiedMethodName()
+ );
+ }
+
+ future.complete(null);
Review Comment:
I agree we're missing the error here. Here goes the full picture as I see it:
When callbacks fail it means the client was not able to take the new
assignment, so 2 things need to happen (one in the background, one in the
foreground):
1. The state machine needs to know the error (expected
[here](https://github.com/apache/kafka/blob/964e73178b5b8363cd2685ce6872905ef0c04dee/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L697))
to abort the reconciliation and not send the ACK to the broker. We would
achieve this by propagating the error here.
2. the client should get the error: the ongoing `consumer.poll` loop should
get interrupted with a `KafkaException` indicating that the callbacks failed
(this is the behaviour in the legacy coordinator). I expect we should do that
when processing the `CallbacksCompletedEvent` in the app thread
[here](https://github.com/apache/kafka/blob/3f7074355b8e593b4216e7ca50b305fee0c4fac7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L226)
but seems we're only notifying the state machine, so I may be missing
something but seems we are not propagating the failure to the user either?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]