dajac commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1414403961
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -979,12 +1026,124 @@ private CompletableFuture<Void>
invokeOnPartitionsLostCallback(Set<TopicPartitio
// behaviour.
Optional<ConsumerRebalanceListener> listener =
subscriptions.rebalanceListener();
if (!partitionsLost.isEmpty() && listener.isPresent()) {
- throw new UnsupportedOperationException("User-defined callbacks
not supported yet");
+ return enqueueConsumerRebalanceListenerCallback(onPartitionsLost,
partitionsLost);
} else {
return CompletableFuture.completedFuture(null);
}
}
+ /**
+ * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to
trigger the execution of the
+ * appropriate {@link ConsumerRebalanceListener} {@link
ConsumerRebalanceListenerMethodName method} on the
+ * application thread.
+ *
+ * <p/>
+ *
+ * This method is essentially "giving" the baton from the background
thread to the application thread for
+ * processing of the reconciliation logic. It will "receive" the "baton"
back via the
+ * {@link
#consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName,
Optional)} method.
+ *
+ * <p/>
+ *
+ * Because the reconciliation process (run in the background thread) will
be blocked by the application thread
+ * until it completes this, we need to leave a {@link
ConsumerRebalanceListenerCallbackBreadcrumb breadcrumb}
+ * by which to remember where we left off.
+ *
+ * @param methodName Callback method that needs to be executed on the
application thread
+ * @param partitions Partitions to supply to the callback method
+ * @return Future that will be chained within the rest of the
reconciliation logic
+ */
+ private CompletableFuture<Void>
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName
methodName,
+
Set<TopicPartition> partitions) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ConsumerRebalanceListenerCallbackBreadcrumb newBreadcrumb = new
ConsumerRebalanceListenerCallbackBreadcrumb(
+ methodName,
+ future
+ );
+
+ if (breadcrumbRef.compareAndSet(null, newBreadcrumb)) {
Review Comment:
I have doubts about this. My understanding is that you are trying to prevent
the state machine from scheduling multiple callbacks. Is my understanding right?
For the context, I think that the current implementation of the state
machine can actually schedule multiple callbacks. It does not do it in the
reconciliation process but it could happen when the member is fenced or when
the user leaves/unsubscribe. So, I think that this won't work. @lianetm It
would be great if you could look into this as well.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Event that signifies that the network I/O thread wants to invoke one of the
callback methods on the
+ * {@link ConsumerRebalanceListener}. This event will be processed by the
application thread when the next
+ * {@link Consumer#poll(Duration)} call is performed by the user. When
processed, the application thread should
+ * invoke the appropriate callback method (based on {@link #methodName()})
with the given partitions.
+ */
+public class ConsumerRebalanceListenerCallbackNeededEvent extends
BackgroundEvent {
+
+ private final ConsumerRebalanceListenerMethodName methodName;
+ private final SortedSet<TopicPartition> partitions;
+
+ public
ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName
methodName,
Review Comment:
There is actually a desire to have the ability to execute multiple callbacks
(e.g. onAssigned, onRevoked) with a single event. Would it be possible to
handle this here? @lianetm Could provide more details.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##########
@@ -66,15 +66,15 @@ Exception invokePartitionsAssigned(final
SortedSet<TopicPartition> assignedParti
throw e;
} catch (Exception e) {
log.error("User provided listener {} failed on invocation of
onPartitionsAssigned for partitions {}",
- listener.getClass().getName(), assignedPartitions, e);
+ listener.get().getClass().getName(),
assignedPartitions, e);
Review Comment:
Should we get this change in a separate PR as this touches the legacy
consumer as well?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -183,6 +183,10 @@ public NetworkClientDelegate.PollResult poll(long
currentTimeMs) {
return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs,
Collections.singletonList(request));
}
+ public MembershipManager membershipManager() {
+ return membershipManager;
+ }
Review Comment:
Why do we need this?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerMethodName.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+/**
+ * This class just provides a static name for the methods in the {@link
ConsumerRebalanceListener} interface
+ * for a bit more compile time assurance.
+ */
+public enum ConsumerRebalanceListenerMethodName {
+
+ onPartitionsRevoked, onPartitionsAssigned, onPartitionsLost;
Review Comment:
Enums should be all caps, like constants.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -650,7 +693,7 @@ boolean reconcile() {
"\tCurrent owned partitions: {}\n" +
"\tAdded partitions (assigned - owned): {}\n" +
"\tRevoked partitions (owned - assigned): {}\n",
- assignedTopicIdPartitions,
+ assignedTopicPartitions,
Review Comment:
Hum... I would keep it as it was as the topic ids may be useful. @lianetm
What do you think?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection<String>
topics, Optional<ConsumerRebal
}
}
+ /**
+ * This method can be used by cases where the caller has an event that
needs to both block for completion but
+ * also process background events. For some events, in order to fully
process the associated logic, the
+ * {@link ConsumerNetworkThread background thread} needs assistance from
the application thread to complete.
+ * If the application thread simply blocked on the event after submitting
it, the processing would deadlock.
+ * The logic herein is basically a loop that performs two tasks in each
iteration:
+ *
+ * <ol>
+ * <li>Process background events, if any</li>
+ * <li><em>Briefly</em> wait for {@link CompletableApplicationEvent an
event} to complete</li>
+ * </ol>
+ *
+ * <p/>
+ *
+ * Each iteration gives the application thread an opportunity to process
background events, which may be
+ * necessary to complete the overall processing.
+ *
+ * <p/>
+ *
+ * As an example, take {@link #unsubscribe()}. To start unsubscribing, the
application thread enqueues an
+ * {@link UnsubscribeApplicationEvent} on the application event queue.
That event will eventually trigger the
+ * rebalancing logic in the background thread. Critically, as part of this
rebalancing work, the
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}
callback needs to be invoked. However,
+ * this callback must be executed on the application thread. To achieve
this, the background thread enqueues a
+ * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background
event queue. That event queue is
+ * periodically queried by the application thread to see if there's work
to be done. When the application thread
+ * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is
processed, and then a
+ * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then
enqueued by the application thread on the
+ * background event queue. Moments later, the background thread will see
that event, process it, and continue
+ * execution of the rebalancing logic. The rebalancing logic cannot
complete until the
+ * {@link ConsumerRebalanceListener} callback is performed.
+ *
+ * @param event Event that contains a {@link CompletableFuture}; it is on
this future that the application thread
+ * will wait for completion
+ * @param timer Overall timer that bounds how long the application thread
will wait for the event to complete
+ * @return {@code true} if the event completed within the timeout, {@code
false} otherwise
+ */
+ private boolean processBackgroundEvents(CompletableApplicationEvent<?>
event, Timer timer) {
+ log.trace("Enqueuing event {} for processing; will wait up to {} ms to
complete", event, timer.remainingMs());
+
+ do {
+ backgroundEventProcessor.process();
Review Comment:
Does this call process all the pending events or only one event?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]