cadonna commented on code in PR #18551: URL: https://github.com/apache/kafka/pull/18551#discussion_r1935731250
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java: ########## @@ -0,0 +1,1249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent; +import org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager; +import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Tracks state the state of a single member in relationship to a group: + * <p/> + * Responsible for: + * <ul> + * <li>Keeping member state</li> + * <li>Keeping assignment for the member</li> + * <li>Reconciling assignment, for example if tasks need to be revoked before other tasks can be assigned</li> + * <li>Calling the assignment and revocation callbacks on the Streams client</li> + * </ul> + */ +public class StreamsMembershipManager implements RequestManager { + + /** + * A data structure to represent the current task assignment, and current target task assignment of a member in a + * streams group. + * <p/> + * Besides the assigned tasks, it contains a local epoch that is bumped whenever the assignment changes, to ensure + * that two assignments with the same tasks but different local epochs are not considered equal. + */ + private static class LocalAssignment { + public static final long NONE_EPOCH = -1; + public static final LocalAssignment NONE = new LocalAssignment( + NONE_EPOCH, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + public final long localEpoch; + public final Map<String, SortedSet<Integer>> activeTasks; + public final Map<String, SortedSet<Integer>> standbyTasks; + public final Map<String, SortedSet<Integer>> warmupTasks; + + public LocalAssignment(final long localEpoch, + final Map<String, SortedSet<Integer>> activeTasks, + final Map<String, SortedSet<Integer>> standbyTasks, + final Map<String, SortedSet<Integer>> warmupTasks) { + this.localEpoch = localEpoch; + this.activeTasks = activeTasks; + this.standbyTasks = standbyTasks; + this.warmupTasks = warmupTasks; + if (localEpoch == NONE_EPOCH && + (!activeTasks.isEmpty() || !standbyTasks.isEmpty() || !warmupTasks.isEmpty())) { + throw new IllegalArgumentException("Local epoch must be set if tasks are assigned."); + } + } + + Optional<LocalAssignment> updateWith(final Map<String, SortedSet<Integer>> activeTasks, + final Map<String, SortedSet<Integer>> standbyTasks, + final Map<String, SortedSet<Integer>> warmupTasks) { + if (localEpoch != NONE_EPOCH && + activeTasks.equals(this.activeTasks) && + standbyTasks.equals(this.standbyTasks) && + warmupTasks.equals(this.warmupTasks)) { + return Optional.empty(); + } + + long nextLocalEpoch = localEpoch + 1; + return Optional.of(new LocalAssignment(nextLocalEpoch, activeTasks, standbyTasks, warmupTasks)); + } + + @Override + public String toString() { + return "LocalAssignment{" + + "localEpoch=" + localEpoch + + ", activeTasks=" + activeTasks + + ", standbyTasks=" + standbyTasks + + ", warmupTasks=" + warmupTasks + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LocalAssignment that = (LocalAssignment) o; + return localEpoch == that.localEpoch && + Objects.equals(activeTasks, that.activeTasks) && + Objects.equals(standbyTasks, that.standbyTasks) && + Objects.equals(warmupTasks, that.warmupTasks); + } + + @Override + public int hashCode() { + return Objects.hash(localEpoch, activeTasks, standbyTasks, warmupTasks); + } + } + + /** + * TopicPartition comparator based on topic name and partition. + */ + static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator(); + + private final Logger log; + + /** + * The processor that handles events of the Streams rebalance protocol. + * For example, requests for invocation of assignment/revocation callbacks. + */ + private final StreamsRebalanceEventsProcessor streamsRebalanceEventsProcessor; + + /** + * Data needed to participate in the Streams rebalance protocol. + */ + private final StreamsRebalanceData streamsRebalanceData; + + /** + * Subscription state object holding the current assignment the member has for the topology + * of the Streams application. + */ + private final SubscriptionState subscriptionState; + + /** + * Current state of this member as part of the consumer group, as defined in {@link MemberState} + */ + private MemberState state; + + /** + * Group ID of the Streams group the member will be part of, provided when creating the current + * membership manager. + */ + private final String groupId; + + /** + * Member ID generated by the consumer at startup, which is unique within the group and remains consistent + * for the entire lifetime of the process. This ID acts as an incarnation identifier for the consumer process + * and does not reset or change, even if the consumer leaves and rejoins the group. + * The Member ID remains the same until the process is completely stopped or terminated. + */ + private final String memberId = Uuid.randomUuid().toString(); + + /** + * Group instance ID to be used by a static member, provided when creating the current membership manager. + */ + private final Optional<String> groupInstanceId = Optional.empty(); + + /** + * Current epoch of the member. It will be set to 0 by the member, and provided to the server + * on the heartbeat request, to join the group. It will be then maintained by the server, + * incremented as the member reconciles and acknowledges the assignments it receives. It will + * be reset to 0 if the member gets fenced. + */ + private int memberEpoch = 0; + + /** + * If the member is currently leaving the group after a call to {@link #leaveGroup()} or + * {@link #leaveGroupOnClose()}, this will have a future that will complete when the ongoing leave operation + * completes (callbacks executed and heartbeat request to leave is sent out). This will be empty if the + * member is not leaving. + */ + private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty(); + + /** + * Future that will complete when a stale member completes releasing its assignment after + * leaving the group due to poll timer expired. Used to make sure that the member rejoins + * when the timer is reset, only when it completes releasing its assignment. + */ + private CompletableFuture<Void> staleMemberAssignmentRelease; + + /** + * If there is a reconciliation running (callbacks). + * This will be true if {@link #maybeReconcile()} has been triggered + * after receiving a heartbeat response, or a metadata update. + */ + private boolean reconciliationInProgress; + + /** + * True if a reconciliation is in progress and the member rejoined the group since the start + * of the reconciliation. Used to know that the reconciliation in progress should be + * interrupted and not be applied. + */ + private boolean rejoinedWhileReconciliationInProgress; + + /** + * Registered listeners that will be notified whenever the member epoch gets updated + * (valid values received from the broker, or values cleared due to member leaving + * the group, getting fenced or failing). + */ + private final List<MemberStateListener> stateUpdatesListeners = new ArrayList<>(); + + /** + * Tasks received in the last target assignment, together with its local epoch. + * + * This member variable is reassigned every time a new assignment is received. + * It is equal to LocalAssignment.NONE whenever we are not in a group. + */ + private LocalAssignment targetAssignment = LocalAssignment.NONE; + + /** + * Assignment that the member received from the server and successfully processed, together with + * its local epoch. + * + * This is equal to LocalAssignment.NONE when we are not in a group, or we haven't reconciled any assignment yet. + */ + private LocalAssignment currentAssignment = LocalAssignment.NONE; + + /** + * AtomicBoolean to track whether the subscription is updated. + * If it's true and subscription state is UNSUBSCRIBED, the next {@link #onConsumerPoll()} will change member state to JOINING. + */ + private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false); + + /** + * Measures successful rebalance latency and number of failed rebalances. + */ + private final RebalanceMetricsManager metricsManager; + + private final Time time; + + /** + * True if the poll timer has expired, signaled by a call to + * {@link #transitionToSendingLeaveGroup(boolean)} with dueToExpiredPollTimer param true. This + * will be used to determine that the member should transition to STALE after leaving the + * group, to release its assignment and wait for the timer to be reset. + */ + private boolean isPollTimerExpired; + + /** + * Constructs the Streams membership manager. + * + * @param groupId The ID of the group. + * @param streamsRebalanceEventsProcessor The processor that handles Streams rebalance events like requests for + * invocation of assignment/revocation callbacks. + * @param streamsRebalanceData Data needed to participate in the Streams rebalance protocol. + * @param subscriptionState The subscription state of the member. + * @param logContext The log context. + * @param time The time. + * @param metrics The metrics. + */ + public StreamsMembershipManager(final String groupId, + final StreamsRebalanceEventsProcessor streamsRebalanceEventsProcessor, + final StreamsRebalanceData streamsRebalanceData, + final SubscriptionState subscriptionState, + final LogContext logContext, + final Time time, + final Metrics metrics) { + log = logContext.logger(StreamsMembershipManager.class); + this.state = MemberState.UNSUBSCRIBED; + this.groupId = groupId; + this.streamsRebalanceEventsProcessor = streamsRebalanceEventsProcessor; + this.streamsRebalanceData = streamsRebalanceData; + this.subscriptionState = subscriptionState; + metricsManager = new ConsumerRebalanceMetricsManager(metrics); + this.time = time; + } + + /** + * @return Group ID of the group the member is part of (or wants to be part of). + */ + public String groupId() { + return groupId; + } + + /** + * @return Member ID that is generated at startup and remains unchanged for the entire lifetime of the process. + */ + public String memberId() { + return memberId; + } + + /** + * @return Instance ID used by the member when joining the group. If non-empty, it will indicate that + * this is a static member. + */ + public Optional<String> groupInstanceId() { + return groupInstanceId; + } + + /** + * @return Current epoch of the member, maintained by the server. + */ + public int memberEpoch() { + return memberEpoch; + } + + /** + * @return Current state of this member in relationship to a group, as defined in + * {@link MemberState}. + */ + public MemberState state() { + return state; + } + + /** + * @return True if the member is preparing to leave the group (waiting for callbacks), or + * leaving (sending last heartbeat). This is used to skip proactively leaving the group when + * the poll timer expires. + */ + public boolean isLeavingGroup() { + return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING; + } + + private boolean isNotInGroup() { + return state == MemberState.UNSUBSCRIBED || + state == MemberState.FENCED || + state == MemberState.FATAL || + state == MemberState.STALE; + } + + /** + * Register a new listener that will be invoked whenever the member state changes, or a new + * member ID or epoch is received. + * + * @param listener Listener to invoke. + */ + public void registerStateListener(MemberStateListener listener) { + Objects.requireNonNull(listener, "State updates listener cannot be null"); + for (MemberStateListener registeredListener : stateUpdatesListeners) { + if (registeredListener == listener) { + throw new IllegalArgumentException("Listener is already registered."); + } + } + stateUpdatesListeners.add(listener); + } + + /** + * Call all listeners that are registered to get notified when the member epoch is updated. + * This also includes the member ID in the notification. If the member fails or leaves + * the group, this will be invoked with empty epoch. + */ + private void notifyEpochChange(Optional<Integer> epoch) { + stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId)); + } + + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + private void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + if (reconciliationInProgress) { + rejoinedWhileReconciliationInProgress = true; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearCurrentTaskAssignment(); + } + + /** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#LEAVING} state so that a heartbeat request is sent + * out with it. + * + * @param dueToExpiredPollTimer True if the leave group is due to an expired poll timer. This + * will indicate that the member must remain STALE after leaving, + * until it releases its assignment and the timer is reset. + */ + private void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) { + if (state == MemberState.FATAL) { + log.warn("Member {} with epoch {} won't send leave group request because it is in " + + "FATAL state", memberId, memberEpoch); + return; + } + if (state == MemberState.UNSUBSCRIBED) { + log.warn("Member {} won't send leave group request because it is already out of the group.", + memberId); + return; + } + + if (dueToExpiredPollTimer) { + isPollTimerExpired = true; + // Briefly transition through prepare leaving. The member does not have to release + // any assignment before sending the leave group given that is stale. It will invoke + // onAllTasksLost after sending the leave group on the STALE state. + transitionTo(MemberState.PREPARE_LEAVING); + } + finalizeLeaving(); + transitionTo(MemberState.LEAVING); + } + + private void finalizeLeaving() { + updateMemberEpoch(StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH); + clearCurrentTaskAssignment(); + } + + /** + * Transition to STALE to release assignments because the member has left the group due to + * expired poll timer. This will trigger the onAllTasksLost callback. Once the callback + * completes, the member will remain stale until the poll timer is reset by an application + * poll event. See {@link #maybeRejoinStaleMember()}. + */ + private void transitionToStale() { + transitionTo(MemberState.STALE); + + final CompletableFuture<Void> onAllTasksLostCallbackExecution = + streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation(); + staleMemberAssignmentRelease = onAllTasksLostCallbackExecution.whenComplete((result, error) -> { + if (error != null) { + log.error("Task revocation callback invocation failed " + + "after member left group due to expired poll timer.", error); + } + clearTaskAndPartitionAssignment(); + log.debug("Member {} sent leave group heartbeat and released its assignment. It will remain " + + "in {} state until the poll timer is reset, and it will then rejoin the group", + memberId, MemberState.STALE); + }); + } + + /** + * Transition the member to the FATAL state and update the member info as required. This is + * invoked when un-recoverable errors occur (ex. when the heartbeat returns a non-retriable + * error) + */ + public void transitionToFatal() { + MemberState previousState = state; + transitionTo(MemberState.FATAL); + log.error("Member {} with epoch {} transitioned to fatal state", memberId, memberEpoch); + notifyEpochChange(Optional.empty()); + + if (previousState == MemberState.UNSUBSCRIBED) { + log.debug("Member {} with epoch {} got fatal error from the broker but it already " + + "left the group, so onAllTasksLost callback won't be triggered.", memberId, memberEpoch); + return; + } + + if (previousState == MemberState.LEAVING || previousState == MemberState.PREPARE_LEAVING) { + log.info("Member {} with epoch {} was leaving the group with state {} when it got a " + + "fatal error from the broker. It will discard the ongoing leave and remain in " + + "fatal state.", memberId, memberEpoch, previousState); + maybeCompleteLeaveInProgress(); + return; + } + + CompletableFuture<Void> onAllTasksLostCallbackExecuted = streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation(); + onAllTasksLostCallbackExecuted.whenComplete((result, error) -> { + if (error != null) { + log.error("onAllTasksLost callback invocation failed while releasing assignment " + + "after member failed with fatal error.", error); + } + clearTaskAndPartitionAssignment(); + }); + } + + /** + * Notify when the heartbeat request is skipped. + * Transition out of the {@link MemberState#LEAVING} state even if the heartbeat was not sent. + * This will ensure that the member is not blocked on {@link MemberState#LEAVING} (best + * effort to send the request, without any response handling or retry logic) + */ + public void onHeartbeatRequestSkipped() { + if (state == MemberState.LEAVING) { + log.warn("Heartbeat to leave group cannot be sent (most probably due to coordinator " + + "not known/available). Member {} with epoch {} will transition to {}.", + memberId, memberEpoch, MemberState.UNSUBSCRIBED); + transitionTo(MemberState.UNSUBSCRIBED); + maybeCompleteLeaveInProgress(); + } + } + + /** + * Update the member state, setting it to the nextState only if it is a valid transition. + * + * @throws IllegalStateException If transitioning from the member {@link #state} to the + * nextState is not allowed as defined in {@link MemberState}. + */ + private void transitionTo(MemberState nextState) { + if (!state.equals(nextState) && !nextState.getPreviousValidStates().contains(state)) { + throw new IllegalStateException(String.format("Invalid state transition from %s to %s", + state, nextState)); + } + + if (isCompletingRebalance(state, nextState)) { + metricsManager.recordRebalanceEnded(time.milliseconds()); + } + if (isStartingRebalance(state, nextState)) { + metricsManager.recordRebalanceStarted(time.milliseconds()); + } + + log.info("Member {} with epoch {} transitioned from {} to {}.", memberId, memberEpoch, state, nextState); + this.state = nextState; + } + + private static boolean isCompletingRebalance(MemberState currentState, MemberState nextState) { + return currentState == MemberState.RECONCILING && + (nextState == MemberState.STABLE || nextState == MemberState.ACKNOWLEDGING); + } + + private static boolean isStartingRebalance(MemberState currentState, MemberState nextState) { + return currentState != MemberState.RECONCILING && nextState == MemberState.RECONCILING; + } + + private void resetEpoch() { + updateMemberEpoch(StreamsGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH); + } + + private void updateMemberEpoch(int newEpoch) { + boolean newEpochReceived = this.memberEpoch != newEpoch; + this.memberEpoch = newEpoch; + if (newEpochReceived) { + if (memberEpoch > 0) { + notifyEpochChange(Optional.of(memberEpoch)); + } else { + notifyEpochChange(Optional.empty()); + } + } + } + + /** + * Discard assignments received that have not been reconciled yet (waiting for metadata + * or the next reconciliation loop). + */ + private void clearCurrentTaskAssignment() { + currentAssignment = LocalAssignment.NONE; + } + + /** + * Clear the assigned partitions in the member subscription, pending assignments and metadata cache. + */ + private void clearTaskAndPartitionAssignment() { + subscriptionState.assignFromSubscribed(Collections.emptySet()); + currentAssignment = LocalAssignment.NONE; + targetAssignment = LocalAssignment.NONE; + } + + /** + * @return True if the member should not send heartbeats, which is the case when it is in a + * state where it is not an active member of the group. + */ + public boolean shouldSkipHeartbeat() { + return isNotInGroup(); + } + + /** + * @return True if the member should send heartbeat to the coordinator without waiting for + * the interval. + */ + public boolean shouldHeartbeatNow() { + return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING; + } + + /** + * Set {@link #subscriptionUpdated} to true to indicate that the subscription has been updated. + * The next {@link #onConsumerPoll()} will join the group with the updated subscription, if the member is not part of it yet. + * If the member is already part of the group, this will only ensure that the updated subscription + * is included in the next heartbeat request. + * <p/> + * Note that list of topics of the subscription is taken from the shared subscription state. + */ + public void onSubscriptionUpdated() { Review Comment: Actually, we do subscribe: https://github.com/apache/kafka/blob/ddfadf72ada052b22ea905f63cec5cc47b0ae65a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1046 The async consumer passes the subscription back to the network thread by means of an event. Processing that event calls `onSubscriptionUpdated()`. In future, we could consider removing the explicit subscription and let the `StreamsRebalanceData` or `StreamsRebalanceEventsProcessor` subscribe depending on the topology. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org