cadonna commented on code in PR #18551:
URL: https://github.com/apache/kafka/pull/18551#discussion_r1937615571


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -0,0 +1,1249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
+import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Tracks state the state of a single member in relationship to a group:
+ * <p/>
+ * Responsible for:
+ * <ul>
+ *   <li>Keeping member state</li>
+ *   <li>Keeping assignment for the member</li>
+ *   <li>Reconciling assignment, for example if tasks need to be revoked 
before other tasks can be assigned</li>
+ *   <li>Calling the assignment and revocation callbacks on the Streams 
client</li>
+ * </ul>
+ */
+public class StreamsMembershipManager implements RequestManager {
+
+    /**
+     * A data structure to represent the current task assignment, and current 
target task assignment of a member in a
+     * streams group.
+     * <p/>
+     * Besides the assigned tasks, it contains a local epoch that is bumped 
whenever the assignment changes, to ensure
+     * that two assignments with the same tasks but different local epochs are 
not considered equal.
+     */
+    private static class LocalAssignment {
+        public static final long NONE_EPOCH = -1;
+        public static final LocalAssignment NONE = new LocalAssignment(
+            NONE_EPOCH,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        public final long localEpoch;
+        public final Map<String, SortedSet<Integer>> activeTasks;
+        public final Map<String, SortedSet<Integer>> standbyTasks;
+        public final Map<String, SortedSet<Integer>> warmupTasks;
+
+        public LocalAssignment(final long localEpoch,
+                               final Map<String, SortedSet<Integer>> 
activeTasks,
+                               final Map<String, SortedSet<Integer>> 
standbyTasks,
+                               final Map<String, SortedSet<Integer>> 
warmupTasks) {
+            this.localEpoch = localEpoch;
+            this.activeTasks = activeTasks;
+            this.standbyTasks = standbyTasks;
+            this.warmupTasks = warmupTasks;
+            if (localEpoch == NONE_EPOCH &&
+                    (!activeTasks.isEmpty() || !standbyTasks.isEmpty() || 
!warmupTasks.isEmpty())) {
+                throw new IllegalArgumentException("Local epoch must be set if 
tasks are assigned.");
+            }
+        }
+
+        Optional<LocalAssignment> updateWith(final Map<String, 
SortedSet<Integer>> activeTasks,
+                                             final Map<String, 
SortedSet<Integer>> standbyTasks,
+                                             final Map<String, 
SortedSet<Integer>> warmupTasks) {
+            if (localEpoch != NONE_EPOCH &&
+                    activeTasks.equals(this.activeTasks) &&
+                    standbyTasks.equals(this.standbyTasks) &&
+                    warmupTasks.equals(this.warmupTasks)) {
+                return Optional.empty();
+            }
+
+            long nextLocalEpoch = localEpoch + 1;
+            return Optional.of(new LocalAssignment(nextLocalEpoch, 
activeTasks, standbyTasks, warmupTasks));
+        }
+
+        @Override
+        public String toString() {
+            return "LocalAssignment{" +
+                "localEpoch=" + localEpoch +
+                ", activeTasks=" + activeTasks +
+                ", standbyTasks=" + standbyTasks +
+                ", warmupTasks=" + warmupTasks +
+                '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            LocalAssignment that = (LocalAssignment) o;
+            return localEpoch == that.localEpoch &&
+                Objects.equals(activeTasks, that.activeTasks) &&
+                Objects.equals(standbyTasks, that.standbyTasks) &&
+                Objects.equals(warmupTasks, that.warmupTasks);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(localEpoch, activeTasks, standbyTasks, 
warmupTasks);
+        }
+    }
+
+    /**
+     * TopicPartition comparator based on topic name and partition.
+     */
+    static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = 
new Utils.TopicPartitionComparator();
+
+    private final Logger log;
+
+    /**
+     * The processor that handles events of the Streams rebalance protocol.
+     * For example, requests for invocation of assignment/revocation callbacks.
+     */
+    private final StreamsRebalanceEventsProcessor 
streamsRebalanceEventsProcessor;
+
+    /**
+     * Data needed to participate in the Streams rebalance protocol.
+     */
+    private final StreamsRebalanceData streamsRebalanceData;
+
+    /**
+     * Subscription state object holding the current assignment the member has 
for the topology
+     * of the Streams application.
+     */
+    private final SubscriptionState subscriptionState;
+
+    /**
+     * Current state of this member as part of the consumer group, as defined 
in {@link MemberState}
+     */
+    private MemberState state;
+
+    /**
+     * Group ID of the Streams group the member will be part of, provided when 
creating the current
+     * membership manager.
+     */
+    private final String groupId;
+
+    /**
+     * Member ID generated by the consumer at startup, which is unique within 
the group and remains consistent
+     * for the entire lifetime of the process. This ID acts as an incarnation 
identifier for the consumer process
+     * and does not reset or change, even if the consumer leaves and rejoins 
the group.
+     * The Member ID remains the same until the process is completely stopped 
or terminated.
+     */
+    private final String memberId = Uuid.randomUuid().toString();
+
+    /**
+     * Group instance ID to be used by a static member, provided when creating 
the current membership manager.
+     */
+    private final Optional<String> groupInstanceId = Optional.empty();
+
+    /**
+     * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+     * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+     * incremented as the member reconciles and acknowledges the assignments 
it receives. It will
+     * be reset to 0 if the member gets fenced.
+     */
+    private int memberEpoch = 0;
+
+    /**
+     * If the member is currently leaving the group after a call to {@link 
#leaveGroup()} or
+     * {@link #leaveGroupOnClose()}, this will have a future that will 
complete when the ongoing leave operation
+     * completes (callbacks executed and heartbeat request to leave is sent 
out). This will be empty if the
+     * member is not leaving.
+     */
+    private Optional<CompletableFuture<Void>> leaveGroupInProgress = 
Optional.empty();
+
+    /**
+     * Future that will complete when a stale member completes releasing its 
assignment after
+     * leaving the group due to poll timer expired. Used to make sure that the 
member rejoins
+     * when the timer is reset, only when it completes releasing its 
assignment.
+     */
+    private CompletableFuture<Void> staleMemberAssignmentRelease;
+
+    /**
+     * If there is a reconciliation running (callbacks).
+     * This will be true if {@link #maybeReconcile()} has been triggered
+     * after receiving a heartbeat response, or a metadata update.
+     */
+    private boolean reconciliationInProgress;
+
+    /**
+     * True if a reconciliation is in progress and the member rejoined the 
group since the start
+     * of the reconciliation. Used to know that the reconciliation in progress 
should be
+     * interrupted and not be applied.
+     */
+    private boolean rejoinedWhileReconciliationInProgress;
+
+    /**
+     * Registered listeners that will be notified whenever the member epoch 
gets updated
+     * (valid values received from the broker, or values cleared due to member 
leaving
+     * the group, getting fenced or failing).
+     */
+    private final List<MemberStateListener> stateUpdatesListeners = new 
ArrayList<>();
+
+    /**
+     * Tasks received in the last target assignment, together with its local 
epoch.
+     *
+     * This member variable is reassigned every time a new assignment is 
received.
+     * It is equal to LocalAssignment.NONE whenever we are not in a group.
+     */
+    private LocalAssignment targetAssignment = LocalAssignment.NONE;
+
+    /**
+     * Assignment that the member received from the server and successfully 
processed, together with
+     * its local epoch.
+     *
+     * This is equal to LocalAssignment.NONE when we are not in a group, or we 
haven't reconciled any assignment yet.
+     */
+    private LocalAssignment currentAssignment = LocalAssignment.NONE;
+
+    /**
+     * AtomicBoolean to track whether the subscription is updated.
+     * If it's true and subscription state is UNSUBSCRIBED, the next {@link 
#onConsumerPoll()} will change member state to JOINING.
+     */
+    private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false);
+
+    /**
+     * Measures successful rebalance latency and number of failed rebalances.
+     */
+    private final RebalanceMetricsManager metricsManager;
+
+    private final Time time;
+
+    /**
+     * True if the poll timer has expired, signaled by a call to
+     * {@link #transitionToSendingLeaveGroup(boolean)} with 
dueToExpiredPollTimer param true. This
+     * will be used to determine that the member should transition to STALE 
after leaving the
+     * group, to release its assignment and wait for the timer to be reset.
+     */
+    private boolean isPollTimerExpired;
+
+    /**
+     * Constructs the Streams membership manager.
+     *
+     * @param groupId                           The ID of the group.
+     * @param streamsRebalanceEventsProcessor   The processor that handles 
Streams rebalance events like requests for
+     *                                          invocation of 
assignment/revocation callbacks.
+     * @param streamsRebalanceData              Data needed to participate in 
the Streams rebalance protocol.
+     * @param subscriptionState                 The subscription state of the 
member.
+     * @param logContext                        The log context.
+     * @param time                              The time.
+     * @param metrics                           The metrics.
+     */
+    public StreamsMembershipManager(final String groupId,
+                                    final StreamsRebalanceEventsProcessor 
streamsRebalanceEventsProcessor,
+                                    final StreamsRebalanceData 
streamsRebalanceData,
+                                    final SubscriptionState subscriptionState,
+                                    final LogContext logContext,
+                                    final Time time,
+                                    final Metrics metrics) {
+        log = logContext.logger(StreamsMembershipManager.class);
+        this.state = MemberState.UNSUBSCRIBED;
+        this.groupId = groupId;
+        this.streamsRebalanceEventsProcessor = streamsRebalanceEventsProcessor;
+        this.streamsRebalanceData = streamsRebalanceData;
+        this.subscriptionState = subscriptionState;
+        metricsManager = new ConsumerRebalanceMetricsManager(metrics);
+        this.time = time;
+    }
+
+    /**
+     * @return Group ID of the group the member is part of (or wants to be 
part of).
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * @return Member ID that is generated at startup and remains unchanged 
for the entire lifetime of the process.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return Instance ID used by the member when joining the group. If 
non-empty, it will indicate that
+     * this is a static member.
+     */
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    /**
+     * @return Current epoch of the member, maintained by the server.
+     */
+    public int memberEpoch() {
+        return memberEpoch;
+    }
+
+    /**
+     * @return Current state of this member in relationship to a group, as 
defined in
+     * {@link MemberState}.
+     */
+    public MemberState state() {
+        return state;
+    }
+
+    /**
+     * @return True if the member is preparing to leave the group (waiting for 
callbacks), or
+     * leaving (sending last heartbeat). This is used to skip proactively 
leaving the group when
+     * the poll timer expires.
+     */
+    public boolean isLeavingGroup() {
+        return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
+    }
+
+    private boolean isNotInGroup() {
+        return state == MemberState.UNSUBSCRIBED ||
+            state == MemberState.FENCED ||
+            state == MemberState.FATAL ||
+            state == MemberState.STALE;
+    }
+
+    /**
+     * Register a new listener that will be invoked whenever the member state 
changes, or a new
+     * member ID or epoch is received.
+     *
+     * @param listener Listener to invoke.
+     */
+    public void registerStateListener(MemberStateListener listener) {
+        Objects.requireNonNull(listener, "State updates listener cannot be 
null");
+        for (MemberStateListener registeredListener : stateUpdatesListeners) {
+            if (registeredListener == listener) {
+                throw new IllegalArgumentException("Listener is already 
registered.");
+            }
+        }
+        stateUpdatesListeners.add(listener);
+    }
+
+    /**
+     * Call all listeners that are registered to get notified when the member 
epoch is updated.
+     * This also includes the member ID in the notification. If the member 
fails or leaves
+     * the group, this will be invoked with empty epoch.
+     */
+    private void notifyEpochChange(Optional<Integer> epoch) {
+        stateUpdatesListeners.forEach(stateListener -> 
stateListener.onMemberEpochUpdated(epoch, memberId));
+    }
+
+    /**
+     * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+     * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+     * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+     * Visible for testing.
+     */
+    private void transitionToJoining() {
+        if (state == MemberState.FATAL) {
+            log.warn("No action taken to join the group with the updated 
subscription because " +
+                "the member is in FATAL state");
+            return;
+        }
+        if (reconciliationInProgress) {
+            rejoinedWhileReconciliationInProgress = true;
+        }
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearCurrentTaskAssignment();
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat 
request is sent
+     * out with it.
+     *
+     * @param dueToExpiredPollTimer True if the leave group is due to an 
expired poll timer. This
+     *                              will indicate that the member must remain 
STALE after leaving,
+     *                              until it releases its assignment and the 
timer is reset.
+     */
+    private void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
+        if (state == MemberState.FATAL) {
+            log.warn("Member {} with epoch {} won't send leave group request 
because it is in " +
+                "FATAL state", memberId, memberEpoch);
+            return;
+        }
+        if (state == MemberState.UNSUBSCRIBED) {
+            log.warn("Member {} won't send leave group request because it is 
already out of the group.",
+                memberId);
+            return;
+        }
+
+        if (dueToExpiredPollTimer) {
+            isPollTimerExpired = true;
+            // Briefly transition through prepare leaving. The member does not 
have to release
+            // any assignment before sending the leave group given that is 
stale. It will invoke
+            // onAllTasksLost after sending the leave group on the STALE state.
+            transitionTo(MemberState.PREPARE_LEAVING);
+        }
+        finalizeLeaving();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    private void finalizeLeaving() {
+        
updateMemberEpoch(StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH);
+        clearCurrentTaskAssignment();
+    }
+
+    /**
+     * Transition to STALE to release assignments because the member has left 
the group due to
+     * expired poll timer. This will trigger the onAllTasksLost callback. Once 
the callback
+     * completes, the member will remain stale until the poll timer is reset 
by an application
+     * poll event. See {@link #maybeRejoinStaleMember()}.
+     */
+    private void transitionToStale() {
+        transitionTo(MemberState.STALE);
+
+        final CompletableFuture<Void> onAllTasksLostCallbackExecution =
+            
streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation();
+        staleMemberAssignmentRelease = 
onAllTasksLostCallbackExecution.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("Task revocation callback invocation failed " +
+                    "after member left group due to expired poll timer.", 
error);
+            }
+            clearTaskAndPartitionAssignment();
+            log.debug("Member {} sent leave group heartbeat and released its 
assignment. It will remain " +
+                    "in {} state until the poll timer is reset, and it will 
then rejoin the group",
+                memberId, MemberState.STALE);
+        });
+    }
+
+    /**
+     * Transition the member to the FATAL state and update the member info as 
required. This is
+     * invoked when un-recoverable errors occur (ex. when the heartbeat 
returns a non-retriable
+     * error)
+     */
+    public void transitionToFatal() {
+        MemberState previousState = state;
+        transitionTo(MemberState.FATAL);
+        log.error("Member {} with epoch {} transitioned to fatal state", 
memberId, memberEpoch);
+        notifyEpochChange(Optional.empty());
+
+        if (previousState == MemberState.UNSUBSCRIBED) {
+            log.debug("Member {} with epoch {} got fatal error from the broker 
but it already " +
+                "left the group, so onAllTasksLost callback won't be 
triggered.", memberId, memberEpoch);
+            return;
+        }
+
+        if (previousState == MemberState.LEAVING || previousState == 
MemberState.PREPARE_LEAVING) {
+            log.info("Member {} with epoch {} was leaving the group with state 
{} when it got a " +
+                "fatal error from the broker. It will discard the ongoing 
leave and remain in " +
+                "fatal state.", memberId, memberEpoch, previousState);
+            maybeCompleteLeaveInProgress();
+            return;
+        }
+
+        CompletableFuture<Void> onAllTasksLostCallbackExecuted = 
streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation();
+        onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onAllTasksLost callback invocation failed while 
releasing assignment " +
+                    "after member failed with fatal error.", error);
+            }
+            clearTaskAndPartitionAssignment();
+        });
+    }
+
+    /**
+     * Notify when the heartbeat request is skipped.
+     * Transition out of the {@link MemberState#LEAVING} state even if the 
heartbeat was not sent.
+     * This will ensure that the member is not blocked on {@link 
MemberState#LEAVING} (best
+     * effort to send the request, without any response handling or retry 
logic)
+     */
+    public void onHeartbeatRequestSkipped() {
+        if (state == MemberState.LEAVING) {
+            log.warn("Heartbeat to leave group cannot be sent (most probably 
due to coordinator " +
+                    "not known/available). Member {} with epoch {} will 
transition to {}.",
+                memberId, memberEpoch, MemberState.UNSUBSCRIBED);
+            transitionTo(MemberState.UNSUBSCRIBED);
+            maybeCompleteLeaveInProgress();
+        }
+    }
+
+    /**
+     * Update the member state, setting it to the nextState only if it is a 
valid transition.
+     *
+     * @throws IllegalStateException If transitioning from the member {@link 
#state} to the
+     *                               nextState is not allowed as defined in 
{@link MemberState}.
+     */
+    private void transitionTo(MemberState nextState) {
+        if (!state.equals(nextState) && 
!nextState.getPreviousValidStates().contains(state)) {
+            throw new IllegalStateException(String.format("Invalid state 
transition from %s to %s",
+                state, nextState));
+        }
+
+        if (isCompletingRebalance(state, nextState)) {
+            metricsManager.recordRebalanceEnded(time.milliseconds());
+        }
+        if (isStartingRebalance(state, nextState)) {
+            metricsManager.recordRebalanceStarted(time.milliseconds());
+        }
+
+        log.info("Member {} with epoch {} transitioned from {} to {}.", 
memberId, memberEpoch, state, nextState);
+        this.state = nextState;
+    }
+
+    private static boolean isCompletingRebalance(MemberState currentState, 
MemberState nextState) {
+        return currentState == MemberState.RECONCILING &&
+            (nextState == MemberState.STABLE || nextState == 
MemberState.ACKNOWLEDGING);
+    }
+
+    private static boolean isStartingRebalance(MemberState currentState, 
MemberState nextState) {
+        return currentState != MemberState.RECONCILING && nextState == 
MemberState.RECONCILING;
+    }
+
+    private void resetEpoch() {
+        
updateMemberEpoch(StreamsGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH);
+    }
+
+    private void updateMemberEpoch(int newEpoch) {
+        boolean newEpochReceived = this.memberEpoch != newEpoch;
+        this.memberEpoch = newEpoch;
+        if (newEpochReceived) {
+            if (memberEpoch > 0) {
+                notifyEpochChange(Optional.of(memberEpoch));
+            } else {
+                notifyEpochChange(Optional.empty());
+            }
+        }
+    }
+
+    /**
+     * Discard assignments received that have not been reconciled yet (waiting 
for metadata
+     * or the next reconciliation loop).
+     */
+    private void clearCurrentTaskAssignment() {
+        currentAssignment = LocalAssignment.NONE;
+    }
+
+    /**
+     * Clear the assigned partitions in the member subscription, pending 
assignments and metadata cache.
+     */
+    private void clearTaskAndPartitionAssignment() {
+        subscriptionState.assignFromSubscribed(Collections.emptySet());
+        currentAssignment = LocalAssignment.NONE;
+        targetAssignment = LocalAssignment.NONE;
+    }
+
+    /**
+     * @return True if the member should not send heartbeats, which is the 
case when it is in a
+     * state where it is not an active member of the group.
+     */
+    public boolean shouldSkipHeartbeat() {
+        return isNotInGroup();
+    }
+
+    /**
+     * @return True if the member should send heartbeat to the coordinator 
without waiting for
+     * the interval.
+     */
+    public boolean shouldHeartbeatNow() {
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING || state == MemberState.JOINING;
+    }
+
+    /**
+     * Set {@link #subscriptionUpdated} to true to indicate that the 
subscription has been updated.
+     * The next {@link #onConsumerPoll()} will join the group with the updated 
subscription, if the member is not part of it yet.
+     * If the member is already part of the group, this will only ensure that 
the updated subscription
+     * is included in the next heartbeat request.
+     * <p/>
+     * Note that list of topics of the subscription is taken from the shared 
subscription state.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to