dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1398998763
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -137,45 +140,71 @@ public HeartbeatRequestManager( } /** - * Determines the maximum wait time until the next poll based on the member's state, and creates a heartbeat - * request. + * This will build a heartbeat request if one must be sent, determined based on the member + * state. A heartbeat is sent in the following situations: + * <ol> + * <li>Member is part of the consumer group or wants to join it.</li> + * <li>The heartbeat interval has expired, or the member is in a state that indicates + * that it should heartbeat without waiting for the interval.</li> + * </ol> + * This will also determine the maximum wait time until the next poll based on the member's + * state. * <ol> - * <li>If the member is without a coordinator or is in a failed state, the timer is set to Long.MAX_VALUE, as there's no need to send a heartbeat.</li> - * <li>If the member cannot send a heartbeat due to either exponential backoff, it will return the remaining time left on the backoff timer.</li> - * <li>If the member's heartbeat timer has not expired, It will return the remaining time left on the - * heartbeat timer.</li> + * <li>If the member is without a coordinator or is in a failed state, the timer is set + * to Long.MAX_VALUE, as there's no need to send a heartbeat.</li> + * <li>If the member cannot send a heartbeat due to either exponential backoff, it will + * return the remaining time left on the backoff timer.</li> + * <li>If the member's heartbeat timer has not expired, It will return the remaining time + * left on the heartbeat timer.</li> * <li>If the member can send a heartbeat, the timer is set to the current heartbeat interval.</li> * </ol> + * + * @return {@link PollResult} that includes a heartbeat request if one must be sent, and the + * time to wait until the next poll. */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { - if (!coordinatorRequestManager.coordinator().isPresent() || !membershipManager.shouldSendHeartbeat()) + if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) { + membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; + } + + boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); - // TODO: We will need to send a heartbeat response after partitions being revoke. This needs to be - // implemented either with or after the partition reconciliation logic. - if (!heartbeatRequestState.canSendRequest(currentTimeMs)) + if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); + } - this.heartbeatRequestState.onSendAttempt(currentTimeMs); + heartbeatRequestState.onSendAttempt(currentTimeMs); + membershipManager.onHeartbeatRequestSent(); NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(); return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); } private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { - // TODO: We only need to send the rebalanceTimeoutMs field once unless the first request failed. + // TODO: extract this logic for building the ConsumerGroupHeartbeatRequestData to a + // stateful builder (HeartbeatState), that will keep the last data sent, and determine + // the fields that changed and need to be included in the next HB (ex. check + // subscriptionState changed from last sent to include assignment). It should also + // ensure that all fields are sent on failure. ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData() .setGroupId(membershipManager.groupId()) .setMemberEpoch(membershipManager.memberEpoch()) - .setMemberId(membershipManager.memberId()) .setRebalanceTimeoutMs(rebalanceTimeoutMs); + if (membershipManager.memberId() != null) { + data.setMemberId(membershipManager.memberId()); + } + membershipManager.groupInstanceId().ifPresent(data::setInstanceId); if (this.subscriptions.hasPatternSubscription()) { // TODO: Pass the string to the GC if server side regex is used. } else { data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitions = + buildTopicPartitionsList(membershipManager.currentAssignment()); + data.setTopicPartitions(topicPartitions); Review Comment: I think that this should actually outside of the `else` branch, isn't it? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -192,6 +221,26 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { }); } + private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> buildTopicPartitionsList(Set<TopicIdPartition> topicIdPartitions) { + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> result = new ArrayList<>(); + Map<Uuid, List<Integer>> partitionsPerTopicId = new HashMap<>(); + for (TopicIdPartition topicIdPartition : topicIdPartitions) { + Uuid topicId = topicIdPartition.topicId(); + if (!partitionsPerTopicId.containsKey(topicId)) { + partitionsPerTopicId.put(topicId, new ArrayList<>()); + } + partitionsPerTopicId.get(topicId).add(topicIdPartition.partition()); + } + for (Map.Entry<Uuid, List<Integer>> entry : partitionsPerTopicId.entrySet()) { + Uuid topicId = entry.getKey(); + List<Integer> partitions = entry.getValue(); + result.add(new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(partitions)); + } Review Comment: nit: If we would use `ConsumerGroupHeartbeatRequestData.TopicPartitions` in the `HashMap` and the `List`, we could skip this step. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -192,6 +221,26 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { }); } + private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> buildTopicPartitionsList(Set<TopicIdPartition> topicIdPartitions) { + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> result = new ArrayList<>(); + Map<Uuid, List<Integer>> partitionsPerTopicId = new HashMap<>(); + for (TopicIdPartition topicIdPartition : topicIdPartitions) { + Uuid topicId = topicIdPartition.topicId(); + if (!partitionsPerTopicId.containsKey(topicId)) { + partitionsPerTopicId.put(topicId, new ArrayList<>()); + } + partitionsPerTopicId.get(topicId).add(topicIdPartition.partition()); Review Comment: nit: We could probably use `computeIfAbsent` to simplify this code. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -168,76 +329,686 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); + if (assignment != null) { - setTargetAssignment(assignment); + transitionTo(MemberState.RECONCILING); + replaceUnresolvedAssignmentWithNewAssignment(assignment); + resolveMetadataForUnresolvedAssignment(); + reconcile(); + } else if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); } - maybeTransitionToStable(); + } + + /** + * Overwrite collection of unresolved topic Ids with the new target assignment. This will + * effectively achieve the following: + * + * - all topics received in assignment will try to be resolved to find their topic names + * + * - any topic received in a previous assignment that was still unresolved, and that is + * not included in the assignment anymore, will be removed from the unresolved collection. + * This should be the case when a topic is sent in an assignment, deleted right after, and + * removed from the assignment the next time a broker sends one to the member. + * + * @param assignment Target assignment received from the broker. + */ + private void replaceUnresolvedAssignmentWithNewAssignment( + ConsumerGroupHeartbeatResponseData.Assignment assignment) { + assignmentUnresolved.clear(); + assignment.topicPartitions().forEach(topicPartitions -> + assignmentUnresolved.put(topicPartitions.topicId(), topicPartitions.partitions())); } /** * {@inheritDoc} */ @Override public void transitionToFenced() { - resetEpoch(); transitionTo(MemberState.FENCED); + resetEpoch(); + log.debug("Member {} with epoch {} transitioned to {} state. It will release its " + + "assignment and rejoin the group.", memberId, memberEpoch, MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + " after member got fenced. Member will rejoin the group anyways.", error); + } + updateSubscription(Collections.emptySet(), true); + transitionToJoining(); + }); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + transitionTo(MemberState.FATAL); + log.error("Member {} with epoch {} transitioned to {} state", memberId, memberEpoch, MemberState.FATAL); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + updateSubscription(Collections.emptySet(), true); + }); } + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + } + + /** + * Update a new assignment by setting the assigned partitions in the member subscription. + * + * @param assignedPartitions Topic partitions to take as the new subscription assignment + * @param clearAssignments True if the + */ + private void updateSubscription(Collection<TopicPartition> assignedPartitions, + boolean clearAssignments) { + subscriptions.assignFromSubscribed(assignedPartitions); + if (clearAssignments) { + clearPendingAssignmentsAndLocalNamesCache(); + } + } + + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + registerForMetadataUpdates(); + } + + /** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ + private void registerForMetadataUpdates() { + if (!isRegisteredForMetadataUpdates) { + this.metadata.addClusterUpdateListener(this); + isRegisteredForMetadataUpdates = true; + } + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public CompletableFuture<Void> leaveGroup() { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + // Member is not part of the group. No-op and return completed future to avoid + // unnecessary transitions. + return CompletableFuture.completedFuture(null); + } + + if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { + // Member already leaving. No-op and return existing leave group future that will + // complete when the ongoing leave operation completes. + return leaveGroupInProgress.get(); + } + + transitionTo(MemberState.PREPARE_LEAVING); + CompletableFuture<Void> leaveResult = new CompletableFuture<>(); + leaveGroupInProgress = Optional.of(leaveResult); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + // Clear the subscription, no matter if the callback execution failed or succeeded. + updateSubscription(Collections.emptySet(), true); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + }); + + // Return future to indicate that the leave group is done when the callbacks + // complete, and the transition to send the heartbeat has been made. + return leaveResult; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * <ul> + * <li>If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe)</li> + * + * <li>If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced .</li> + * </ul> + * + * @return Future that will complete when the callback execution completes. */ - private boolean maybeTransitionToStable() { - if (!hasPendingTargetAssignment()) { - transitionTo(MemberState.STABLE); + private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<Void> callbackResult; + if (droppedPartitions.isEmpty()) { + // No assignment to release + callbackResult = CompletableFuture.completedFuture(null); } else { - transitionTo(MemberState.RECONCILING); + // Release assignment + if (memberEpoch > 0) { + // Member is part of the group. Invoke onPartitionsRevoked. + callbackResult = revokePartitions(droppedPartitions); + } else { + // Member is not part of the group anymore. Invoke onPartitionsLost. + callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); + } + } + return callbackResult; + } + + /** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#LEAVING} state so that a heartbeat + * request is sent out with it. + */ + private void transitionToSendingLeaveGroup() { + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + currentAssignment = new HashSet<>(); + transitionTo(MemberState.LEAVING); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean shouldHeartbeatNow() { + MemberState state = state(); + return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING; + } + + /** + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSent() { + MemberState state = state(); + if (state == MemberState.ACKNOWLEDGING) { + if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); + } else { + log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent " + + "to ack a previous reconciliation. New assignments are ready to " + + "be reconciled.", memberId, memberEpoch, MemberState.RECONCILING); + transitionTo(MemberState.RECONCILING); + } + } else if (state == MemberState.LEAVING) { + transitionToUnsubscribed(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSkipped() { + if (state == MemberState.LEAVING) { + log.debug("Heartbeat for leaving group could not be sent. Member {} with epoch {} will transition to {}.", + memberId, memberEpoch, MemberState.UNSUBSCRIBED); + transitionToUnsubscribed(); + } + } + + private void transitionToUnsubscribed() { + transitionTo(MemberState.UNSUBSCRIBED); + leaveGroupInProgress.get().complete(null); + leaveGroupInProgress = Optional.empty(); + } + + /** + * @return True if there are no assignments waiting to be resolved from metadata or reconciled. + */ + private boolean allPendingAssignmentsReconciled() { + return assignmentUnresolved.isEmpty() && assignmentReadyToReconcile.isEmpty(); + } + + @Override + public boolean shouldSkipHeartbeat() { + MemberState state = state(); + return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; + } + + /** + * Reconcile the assignment that has been received from the server and for which topic names + * are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed, + * trigger the callbacks and update the subscription state. Note that only one reconciliation + * can be in progress at a time. If there is already another one in progress when this is + * triggered, it will be no-op, and the assignment will be reconciled on the next + * reconciliation loop. + */ + boolean reconcile() { + // Make copy of the assignment to reconcile as it could change as new assignments or metadata updates are received + SortedSet<TopicIdPartition> assignedTopicIdPartitions = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); + assignedTopicIdPartitions.addAll(assignmentReadyToReconcile); + + SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + ownedPartitions.addAll(subscriptions.assignedPartitions()); + + // Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are + // being reconciled. Needed for interactions with the centralized subscription state that + // does not support topic IDs yet, and for the callbacks. + SortedSet<TopicPartition> assignedTopicPartition = toTopicPartitionSet(assignedTopicIdPartitions); + + // Check same assignment. Based on topic names for now, until topic IDs are properly + // supported in the centralized subscription state object. + boolean sameAssignmentReceived = assignedTopicPartition.equals(ownedPartitions); + + if (reconciliationInProgress || sameAssignmentReceived) { + String reason; + if (reconciliationInProgress) { + reason = "Another reconciliation is already in progress. Assignment " + + assignmentReadyToReconcile + " will be handled in the next reconciliation loop."; + } else { + reason = "Target assignment ready to reconcile is equals to the member current assignment."; + } + log.debug("Ignoring reconciliation attempt. " + reason); + return false; + } + + markReconciliationInProgress(); + + // Partitions to assign (not previously owned) + SortedSet<TopicPartition> addedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + addedPartitions.addAll(assignedTopicPartition); + addedPartitions.removeAll(ownedPartitions); + + // Partitions to revoke + SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + revokedPartitions.addAll(ownedPartitions); + revokedPartitions.removeAll(assignedTopicPartition); + + log.info("Updating assignment with\n" + + "\tAssigned partitions: {}\n" + + "\tCurrent owned partitions: {}\n" + + "\tAdded partitions (assigned - owned): {}\n" + + "\tRevoked partitions (owned - assigned): {}\n", + assignedTopicIdPartitions, + ownedPartitions, + addedPartitions, + revokedPartitions + ); + + CompletableFuture<Void> revocationResult; + if (!revokedPartitions.isEmpty()) { + revocationResult = revokePartitions(revokedPartitions); + } else { + revocationResult = CompletableFuture.completedFuture(null); + // Reschedule the auto commit starting from now (new assignment received without any + // revocation). + commitRequestManager.resetAutoCommitTimer(); + } + + // Future that will complete when the full reconciliation process completes (revocation + // and assignment, executed sequentially) + CompletableFuture<Void> reconciliationResult = + revocationResult.thenCompose(__ -> { + boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch; + if (state == MemberState.RECONCILING && !memberHasRejoined) { + // Apply assignment + CompletableFuture<Void> assignResult = assignPartitions(assignedTopicPartition, + addedPartitions); + + // Clear topic names cache only for topics that are not in the subscription anymore + for (TopicPartition tp : revokedPartitions) { + if (!subscriptions.subscription().contains(tp.topic())) { + assignedTopicNamesCache.values().remove(tp.topic()); + } + } + return assignResult; + } else { + log.debug("Revocation callback completed but the member already " + + "transitioned out of the reconciling state for epoch {} into " + + "{} state with epoch {}. Interrupting reconciliation as it's " + + "not relevant anymore,", memberEpochOnReconciliationStart, state, memberEpoch); + String reason = interruptedReconciliationErrorMessage(); + CompletableFuture<Void> res = new CompletableFuture<>(); + res.completeExceptionally(new KafkaException("Interrupting reconciliation" + + " after revocation. " + reason)); + return res; + } + }); + + reconciliationResult.whenComplete((result, error) -> { + markReconciliationCompleted(); + if (error != null) { + // Leaving member in RECONCILING state after callbacks fail. The member + // won't send the ack, and the expectation is that the broker will kick the + // member out of the group after the rebalance timeout expires, leading to a + // RECONCILING -> FENCED transition. + log.error("Reconciliation failed.", error); + } else { + boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch; + if (state == MemberState.RECONCILING && !memberHasRejoined) { + // Make assignment effective on the broker by transitioning to send acknowledge. + transitionTo(MemberState.ACKNOWLEDGING); + + // Make assignment effective on the member group manager + currentAssignment = assignedTopicIdPartitions; + + // Indicate that we completed reconciling a subset of the assignment ready to + // reconcile (new assignments might have been received or discovered in + // metadata) + assignmentReadyToReconcile.removeAll(assignedTopicIdPartitions); + } else { + String reason = interruptedReconciliationErrorMessage(); + log.error("Interrupting reconciliation after partitions assigned callback " + + "completed. " + reason); + } + } + }); + + return true; + } + + /** + * Build set of {@link TopicPartition} from the given set of {@link TopicIdPartition}. + */ + private SortedSet<TopicPartition> toTopicPartitionSet(SortedSet<TopicIdPartition> topicIdPartitions) { + SortedSet<TopicPartition> result = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + topicIdPartitions.forEach(topicIdPartition -> result.add(topicIdPartition.topicPartition())); + return result; + } + + /** + * @return Reason for interrupting a reconciliation progress when callbacks complete. + */ + private String interruptedReconciliationErrorMessage() { + String reason; + if (state != MemberState.RECONCILING) { + reason = "The member already transitioned out of the reconciling state into " + state; + } else { + reason = "The member has re-joined the group."; } - return state.equals(MemberState.STABLE); + return reason; + } + + /** + * Visible for testing. + */ + void markReconciliationInProgress() { + reconciliationInProgress = true; + memberEpochOnReconciliationStart = memberEpoch; + } + + /** + * Visible for testing. + */ + void markReconciliationCompleted() { + reconciliationInProgress = false; } /** - * Take new target assignment received from the server and set it as targetAssignment to be - * processed. Following the consumer group protocol, the server won't send a new target - * member while a previous one hasn't been acknowledged by the member, so this will fail - * if a target assignment already exists. + * Build set of TopicPartition (topic name and partition id) from the target assignment + * received from the broker (topic IDs and list of partitions). + * + * <p> + * This will: * - * @throws IllegalStateException If a target assignment already exists. + * <ol type="1"> + * <li>Try to find topic names in the metadata cache</li> + * <li>For topics not found in metadata, try to find names in the local topic names cache + * (contains topic id and names currently assigned and resolved)</li> + * <li>If there are topics that are not in metadata cache or in the local cached + * of topic names assigned to this member, request a metadata update, and continue + * resolving names as the cache is updated. + * </li> + * </ol> */ - private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { - if (!targetAssignment.isPresent()) { - log.info("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); - targetAssignment = Optional.of(newTargetAssignment); + private void resolveMetadataForUnresolvedAssignment() { + // Try to resolve topic names from metadata cache or subscription cache, and move + // assignments from the "unresolved" collection, to the "readyToReconcile" one. + Iterator<Map.Entry<Uuid, List<Integer>>> it = assignmentUnresolved.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Uuid, List<Integer>> e = it.next(); + Uuid topicId = e.getKey(); + List<Integer> topicPartitions = e.getValue(); + + Optional<String> nameFromMetadata = findTopicNameInGlobalOrLocalCache(topicId); + nameFromMetadata.ifPresent(resolvedTopicName -> { + // Name resolved, so assignment is ready for reconciliation. + SortedSet<TopicIdPartition> topicIdPartitions = + buildAssignedPartitionsWithTopicName(topicId, resolvedTopicName, topicPartitions); + assignmentReadyToReconcile.addAll(topicIdPartitions); Review Comment: I am curious here. Is it better to build a `SortedSet` with all elements and then to add it to `assignmentReadyToReconcile` vs adding to `assignmentReadyToReconcile` directly? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -168,76 +329,686 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); + if (assignment != null) { - setTargetAssignment(assignment); + transitionTo(MemberState.RECONCILING); + replaceUnresolvedAssignmentWithNewAssignment(assignment); + resolveMetadataForUnresolvedAssignment(); + reconcile(); + } else if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); } - maybeTransitionToStable(); + } + + /** + * Overwrite collection of unresolved topic Ids with the new target assignment. This will + * effectively achieve the following: + * + * - all topics received in assignment will try to be resolved to find their topic names + * + * - any topic received in a previous assignment that was still unresolved, and that is + * not included in the assignment anymore, will be removed from the unresolved collection. + * This should be the case when a topic is sent in an assignment, deleted right after, and + * removed from the assignment the next time a broker sends one to the member. + * + * @param assignment Target assignment received from the broker. + */ + private void replaceUnresolvedAssignmentWithNewAssignment( + ConsumerGroupHeartbeatResponseData.Assignment assignment) { + assignmentUnresolved.clear(); + assignment.topicPartitions().forEach(topicPartitions -> + assignmentUnresolved.put(topicPartitions.topicId(), topicPartitions.partitions())); } /** * {@inheritDoc} */ @Override public void transitionToFenced() { - resetEpoch(); transitionTo(MemberState.FENCED); + resetEpoch(); + log.debug("Member {} with epoch {} transitioned to {} state. It will release its " + + "assignment and rejoin the group.", memberId, memberEpoch, MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + " after member got fenced. Member will rejoin the group anyways.", error); + } + updateSubscription(Collections.emptySet(), true); + transitionToJoining(); + }); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + transitionTo(MemberState.FATAL); + log.error("Member {} with epoch {} transitioned to {} state", memberId, memberEpoch, MemberState.FATAL); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + updateSubscription(Collections.emptySet(), true); + }); } + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + } + + /** + * Update a new assignment by setting the assigned partitions in the member subscription. + * + * @param assignedPartitions Topic partitions to take as the new subscription assignment + * @param clearAssignments True if the + */ + private void updateSubscription(Collection<TopicPartition> assignedPartitions, + boolean clearAssignments) { + subscriptions.assignFromSubscribed(assignedPartitions); + if (clearAssignments) { + clearPendingAssignmentsAndLocalNamesCache(); + } + } + + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + registerForMetadataUpdates(); + } + + /** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ + private void registerForMetadataUpdates() { + if (!isRegisteredForMetadataUpdates) { + this.metadata.addClusterUpdateListener(this); + isRegisteredForMetadataUpdates = true; + } + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public CompletableFuture<Void> leaveGroup() { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + // Member is not part of the group. No-op and return completed future to avoid + // unnecessary transitions. + return CompletableFuture.completedFuture(null); + } + + if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { + // Member already leaving. No-op and return existing leave group future that will + // complete when the ongoing leave operation completes. + return leaveGroupInProgress.get(); + } + + transitionTo(MemberState.PREPARE_LEAVING); + CompletableFuture<Void> leaveResult = new CompletableFuture<>(); + leaveGroupInProgress = Optional.of(leaveResult); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + // Clear the subscription, no matter if the callback execution failed or succeeded. + updateSubscription(Collections.emptySet(), true); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + }); + + // Return future to indicate that the leave group is done when the callbacks + // complete, and the transition to send the heartbeat has been made. + return leaveResult; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * <ul> + * <li>If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe)</li> + * + * <li>If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced .</li> + * </ul> + * + * @return Future that will complete when the callback execution completes. */ - private boolean maybeTransitionToStable() { - if (!hasPendingTargetAssignment()) { - transitionTo(MemberState.STABLE); + private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<Void> callbackResult; + if (droppedPartitions.isEmpty()) { + // No assignment to release + callbackResult = CompletableFuture.completedFuture(null); } else { - transitionTo(MemberState.RECONCILING); + // Release assignment + if (memberEpoch > 0) { + // Member is part of the group. Invoke onPartitionsRevoked. + callbackResult = revokePartitions(droppedPartitions); + } else { + // Member is not part of the group anymore. Invoke onPartitionsLost. + callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); + } + } + return callbackResult; + } + + /** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#LEAVING} state so that a heartbeat + * request is sent out with it. + */ + private void transitionToSendingLeaveGroup() { + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + currentAssignment = new HashSet<>(); + transitionTo(MemberState.LEAVING); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean shouldHeartbeatNow() { + MemberState state = state(); + return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING; + } + + /** + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSent() { + MemberState state = state(); + if (state == MemberState.ACKNOWLEDGING) { + if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); + } else { + log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent " + + "to ack a previous reconciliation. New assignments are ready to " + + "be reconciled.", memberId, memberEpoch, MemberState.RECONCILING); + transitionTo(MemberState.RECONCILING); + } + } else if (state == MemberState.LEAVING) { + transitionToUnsubscribed(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSkipped() { + if (state == MemberState.LEAVING) { + log.debug("Heartbeat for leaving group could not be sent. Member {} with epoch {} will transition to {}.", + memberId, memberEpoch, MemberState.UNSUBSCRIBED); + transitionToUnsubscribed(); + } + } + + private void transitionToUnsubscribed() { + transitionTo(MemberState.UNSUBSCRIBED); + leaveGroupInProgress.get().complete(null); + leaveGroupInProgress = Optional.empty(); + } + + /** + * @return True if there are no assignments waiting to be resolved from metadata or reconciled. + */ + private boolean allPendingAssignmentsReconciled() { + return assignmentUnresolved.isEmpty() && assignmentReadyToReconcile.isEmpty(); + } + + @Override + public boolean shouldSkipHeartbeat() { + MemberState state = state(); + return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; + } + + /** + * Reconcile the assignment that has been received from the server and for which topic names + * are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed, + * trigger the callbacks and update the subscription state. Note that only one reconciliation + * can be in progress at a time. If there is already another one in progress when this is + * triggered, it will be no-op, and the assignment will be reconciled on the next + * reconciliation loop. + */ + boolean reconcile() { + // Make copy of the assignment to reconcile as it could change as new assignments or metadata updates are received + SortedSet<TopicIdPartition> assignedTopicIdPartitions = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); + assignedTopicIdPartitions.addAll(assignmentReadyToReconcile); + + SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + ownedPartitions.addAll(subscriptions.assignedPartitions()); + + // Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are + // being reconciled. Needed for interactions with the centralized subscription state that + // does not support topic IDs yet, and for the callbacks. + SortedSet<TopicPartition> assignedTopicPartition = toTopicPartitionSet(assignedTopicIdPartitions); + + // Check same assignment. Based on topic names for now, until topic IDs are properly + // supported in the centralized subscription state object. + boolean sameAssignmentReceived = assignedTopicPartition.equals(ownedPartitions); + + if (reconciliationInProgress || sameAssignmentReceived) { + String reason; + if (reconciliationInProgress) { + reason = "Another reconciliation is already in progress. Assignment " + + assignmentReadyToReconcile + " will be handled in the next reconciliation loop."; + } else { + reason = "Target assignment ready to reconcile is equals to the member current assignment."; + } + log.debug("Ignoring reconciliation attempt. " + reason); + return false; + } + + markReconciliationInProgress(); + + // Partitions to assign (not previously owned) + SortedSet<TopicPartition> addedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + addedPartitions.addAll(assignedTopicPartition); + addedPartitions.removeAll(ownedPartitions); + + // Partitions to revoke + SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + revokedPartitions.addAll(ownedPartitions); + revokedPartitions.removeAll(assignedTopicPartition); + + log.info("Updating assignment with\n" + + "\tAssigned partitions: {}\n" + + "\tCurrent owned partitions: {}\n" + + "\tAdded partitions (assigned - owned): {}\n" + + "\tRevoked partitions (owned - assigned): {}\n", + assignedTopicIdPartitions, + ownedPartitions, + addedPartitions, + revokedPartitions + ); + + CompletableFuture<Void> revocationResult; + if (!revokedPartitions.isEmpty()) { + revocationResult = revokePartitions(revokedPartitions); + } else { + revocationResult = CompletableFuture.completedFuture(null); + // Reschedule the auto commit starting from now (new assignment received without any + // revocation). + commitRequestManager.resetAutoCommitTimer(); + } + + // Future that will complete when the full reconciliation process completes (revocation + // and assignment, executed sequentially) + CompletableFuture<Void> reconciliationResult = + revocationResult.thenCompose(__ -> { + boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch; + if (state == MemberState.RECONCILING && !memberHasRejoined) { + // Apply assignment + CompletableFuture<Void> assignResult = assignPartitions(assignedTopicPartition, + addedPartitions); + + // Clear topic names cache only for topics that are not in the subscription anymore + for (TopicPartition tp : revokedPartitions) { + if (!subscriptions.subscription().contains(tp.topic())) { + assignedTopicNamesCache.values().remove(tp.topic()); + } + } + return assignResult; + } else { + log.debug("Revocation callback completed but the member already " + + "transitioned out of the reconciling state for epoch {} into " + + "{} state with epoch {}. Interrupting reconciliation as it's " + + "not relevant anymore,", memberEpochOnReconciliationStart, state, memberEpoch); + String reason = interruptedReconciliationErrorMessage(); + CompletableFuture<Void> res = new CompletableFuture<>(); + res.completeExceptionally(new KafkaException("Interrupting reconciliation" + + " after revocation. " + reason)); + return res; + } + }); + + reconciliationResult.whenComplete((result, error) -> { + markReconciliationCompleted(); + if (error != null) { + // Leaving member in RECONCILING state after callbacks fail. The member + // won't send the ack, and the expectation is that the broker will kick the + // member out of the group after the rebalance timeout expires, leading to a + // RECONCILING -> FENCED transition. + log.error("Reconciliation failed.", error); + } else { + boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch; + if (state == MemberState.RECONCILING && !memberHasRejoined) { + // Make assignment effective on the broker by transitioning to send acknowledge. + transitionTo(MemberState.ACKNOWLEDGING); + + // Make assignment effective on the member group manager + currentAssignment = assignedTopicIdPartitions; + + // Indicate that we completed reconciling a subset of the assignment ready to + // reconcile (new assignments might have been received or discovered in + // metadata) + assignmentReadyToReconcile.removeAll(assignedTopicIdPartitions); + } else { + String reason = interruptedReconciliationErrorMessage(); + log.error("Interrupting reconciliation after partitions assigned callback " + + "completed. " + reason); + } + } + }); + + return true; + } + + /** + * Build set of {@link TopicPartition} from the given set of {@link TopicIdPartition}. + */ + private SortedSet<TopicPartition> toTopicPartitionSet(SortedSet<TopicIdPartition> topicIdPartitions) { + SortedSet<TopicPartition> result = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + topicIdPartitions.forEach(topicIdPartition -> result.add(topicIdPartition.topicPartition())); + return result; + } + + /** + * @return Reason for interrupting a reconciliation progress when callbacks complete. + */ + private String interruptedReconciliationErrorMessage() { + String reason; + if (state != MemberState.RECONCILING) { + reason = "The member already transitioned out of the reconciling state into " + state; + } else { + reason = "The member has re-joined the group."; } - return state.equals(MemberState.STABLE); + return reason; + } + + /** + * Visible for testing. + */ + void markReconciliationInProgress() { + reconciliationInProgress = true; + memberEpochOnReconciliationStart = memberEpoch; + } + + /** + * Visible for testing. + */ + void markReconciliationCompleted() { + reconciliationInProgress = false; } /** - * Take new target assignment received from the server and set it as targetAssignment to be - * processed. Following the consumer group protocol, the server won't send a new target - * member while a previous one hasn't been acknowledged by the member, so this will fail - * if a target assignment already exists. + * Build set of TopicPartition (topic name and partition id) from the target assignment + * received from the broker (topic IDs and list of partitions). + * + * <p> + * This will: * - * @throws IllegalStateException If a target assignment already exists. + * <ol type="1"> + * <li>Try to find topic names in the metadata cache</li> + * <li>For topics not found in metadata, try to find names in the local topic names cache + * (contains topic id and names currently assigned and resolved)</li> + * <li>If there are topics that are not in metadata cache or in the local cached + * of topic names assigned to this member, request a metadata update, and continue + * resolving names as the cache is updated. + * </li> + * </ol> */ - private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { - if (!targetAssignment.isPresent()) { - log.info("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); - targetAssignment = Optional.of(newTargetAssignment); + private void resolveMetadataForUnresolvedAssignment() { + // Try to resolve topic names from metadata cache or subscription cache, and move + // assignments from the "unresolved" collection, to the "readyToReconcile" one. + Iterator<Map.Entry<Uuid, List<Integer>>> it = assignmentUnresolved.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Uuid, List<Integer>> e = it.next(); + Uuid topicId = e.getKey(); + List<Integer> topicPartitions = e.getValue(); + + Optional<String> nameFromMetadata = findTopicNameInGlobalOrLocalCache(topicId); + nameFromMetadata.ifPresent(resolvedTopicName -> { + // Name resolved, so assignment is ready for reconciliation. + SortedSet<TopicIdPartition> topicIdPartitions = + buildAssignedPartitionsWithTopicName(topicId, resolvedTopicName, topicPartitions); + assignmentReadyToReconcile.addAll(topicIdPartitions); + it.remove(); + }); + } + + if (!assignmentUnresolved.isEmpty()) { + log.debug("Topic Ids {} received in target assignment were not found in metadata and " + + "are not currently assigned. Requesting a metadata update now to resolve " + + "topic names.", assignmentUnresolved.keySet()); + metadata.requestUpdate(true); + } + } + + /** + * Look for topic in the global metadata cache. If found, add it to the local cache and + * return it. If not found, look for it in the local metadata cache. Return empty if not + * found in any of the two. + */ + private Optional<String> findTopicNameInGlobalOrLocalCache(Uuid topicId) { + String nameFromMetadataCache = metadata.topicNames().getOrDefault(topicId, null); + if (nameFromMetadataCache != null) { + // Add topic name to local cache, so it can be reused if included in a next target + // assignment if metadata cache not available. + assignedTopicNamesCache.put(topicId, nameFromMetadataCache); + return Optional.of(nameFromMetadataCache); + } else { + // Topic ID was not found in metadata. Check if the topic name is in the local + // cache of topics currently assigned. This will avoid a metadata request in the + // case where the metadata cache may have been flushed right before the + // revocation of a previously assigned topic. + String nameFromSubscriptionCache = assignedTopicNamesCache.getOrDefault(topicId, null); + return Optional.ofNullable(nameFromSubscriptionCache); + } + } + + /** + * Build set of TopicPartition for the partitions included in the heartbeat topicPartitions, + * and using the given topic name. + */ + private SortedSet<TopicIdPartition> buildAssignedPartitionsWithTopicName( + Uuid topicId, + String topicName, + List<Integer> topicPartitions) { + SortedSet<TopicIdPartition> assignedPartitions = + new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); + topicPartitions.forEach(tp -> { + TopicIdPartition topicIdPartition = new TopicIdPartition( + topicId, + new TopicPartition(topicName, tp)); + assignedPartitions.add(topicIdPartition); + }); + return assignedPartitions; + } + + /** + * Revoke partitions. This will: + * <ul> + * <li>Trigger an async commit offsets request if auto-commit enabled.</li> + * <li>Invoke the onPartitionsRevoked callback if the user has registered it.</li> + * </ul> + * + * This will wait on the commit request to finish before invoking the callback. If the commit + * request fails, this will proceed to invoke the user callbacks anyway, + * returning a future that will complete or fail depending on the callback execution only. + * + * @param revokedPartitions Partitions to revoke. + * @return Future that will complete when the commit request and user callback completes. + */ + private CompletableFuture<Void> revokePartitions(Set<TopicPartition> revokedPartitions) { + log.info("Revoking previously assigned partitions {}", Utils.join(revokedPartitions, ", ")); + + logPausedPartitionsBeingRevoked(revokedPartitions); + + // Mark partitions as pending revocation to stop fetching from the partitions (no new + // fetches sent out, and no in-flight fetches responses processed). + markPendingRevocationToPauseFetching(revokedPartitions); + + // Future that will complete when the revocation completes (including offset commit + // request and user callback execution) Review Comment: nit: I have noticed that most of the comments end with a period but not all of them. It may be good to be consistent. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ########## @@ -17,57 +17,113 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ - UNJOINED, + UNSUBSCRIBED, + + /** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ + JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ - // TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ + ACKNOWLEDGING, + + /** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. */ - FAILED; + PREPARE_LEAVING, + /** + * Member has committed offsets and releases its assignment, so it stays in this state until + * the next heartbeat request is sent out with epoch -1 to effectively leave the group. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. + */ + LEAVING, + + /** + * The member failed with an unrecoverable error received in a heartbeat response. This in an + * unrecoverable state where the member won't send any requests to the broker and cannot + * perform any other transition. + */ + FATAL; + + /** + * Valid state transitions + */ static { - // Valid state transitions - STABLE.previousValidStates = Arrays.asList(UNJOINED, RECONCILING); - RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED); + STABLE.previousValidStates = Arrays.asList(JOINING, ACKNOWLEDGING); + + RECONCILING.previousValidStates = Arrays.asList(STABLE, JOINING, ACKNOWLEDGING); + + ACKNOWLEDGING.previousValidStates = Arrays.asList(RECONCILING); + + FATAL.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING); Review Comment: We still need to conclude on this one. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -168,76 +329,686 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); + if (assignment != null) { - setTargetAssignment(assignment); + transitionTo(MemberState.RECONCILING); + replaceUnresolvedAssignmentWithNewAssignment(assignment); + resolveMetadataForUnresolvedAssignment(); + reconcile(); + } else if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); } - maybeTransitionToStable(); + } + + /** + * Overwrite collection of unresolved topic Ids with the new target assignment. This will + * effectively achieve the following: + * + * - all topics received in assignment will try to be resolved to find their topic names + * + * - any topic received in a previous assignment that was still unresolved, and that is + * not included in the assignment anymore, will be removed from the unresolved collection. + * This should be the case when a topic is sent in an assignment, deleted right after, and + * removed from the assignment the next time a broker sends one to the member. + * + * @param assignment Target assignment received from the broker. + */ + private void replaceUnresolvedAssignmentWithNewAssignment( + ConsumerGroupHeartbeatResponseData.Assignment assignment) { + assignmentUnresolved.clear(); + assignment.topicPartitions().forEach(topicPartitions -> + assignmentUnresolved.put(topicPartitions.topicId(), topicPartitions.partitions())); } /** * {@inheritDoc} */ @Override public void transitionToFenced() { - resetEpoch(); transitionTo(MemberState.FENCED); + resetEpoch(); + log.debug("Member {} with epoch {} transitioned to {} state. It will release its " + + "assignment and rejoin the group.", memberId, memberEpoch, MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + " after member got fenced. Member will rejoin the group anyways.", error); + } + updateSubscription(Collections.emptySet(), true); + transitionToJoining(); + }); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + transitionTo(MemberState.FATAL); + log.error("Member {} with epoch {} transitioned to {} state", memberId, memberEpoch, MemberState.FATAL); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + updateSubscription(Collections.emptySet(), true); + }); } + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + } + + /** + * Update a new assignment by setting the assigned partitions in the member subscription. + * + * @param assignedPartitions Topic partitions to take as the new subscription assignment + * @param clearAssignments True if the + */ + private void updateSubscription(Collection<TopicPartition> assignedPartitions, + boolean clearAssignments) { + subscriptions.assignFromSubscribed(assignedPartitions); + if (clearAssignments) { + clearPendingAssignmentsAndLocalNamesCache(); + } + } + + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + registerForMetadataUpdates(); + } + + /** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ + private void registerForMetadataUpdates() { + if (!isRegisteredForMetadataUpdates) { + this.metadata.addClusterUpdateListener(this); + isRegisteredForMetadataUpdates = true; + } + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public CompletableFuture<Void> leaveGroup() { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + // Member is not part of the group. No-op and return completed future to avoid + // unnecessary transitions. + return CompletableFuture.completedFuture(null); + } + + if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { + // Member already leaving. No-op and return existing leave group future that will + // complete when the ongoing leave operation completes. + return leaveGroupInProgress.get(); + } + + transitionTo(MemberState.PREPARE_LEAVING); + CompletableFuture<Void> leaveResult = new CompletableFuture<>(); + leaveGroupInProgress = Optional.of(leaveResult); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + // Clear the subscription, no matter if the callback execution failed or succeeded. + updateSubscription(Collections.emptySet(), true); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + }); + + // Return future to indicate that the leave group is done when the callbacks + // complete, and the transition to send the heartbeat has been made. + return leaveResult; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * <ul> + * <li>If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe)</li> + * + * <li>If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced .</li> + * </ul> + * + * @return Future that will complete when the callback execution completes. */ - private boolean maybeTransitionToStable() { - if (!hasPendingTargetAssignment()) { - transitionTo(MemberState.STABLE); + private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<Void> callbackResult; + if (droppedPartitions.isEmpty()) { + // No assignment to release + callbackResult = CompletableFuture.completedFuture(null); } else { - transitionTo(MemberState.RECONCILING); + // Release assignment + if (memberEpoch > 0) { + // Member is part of the group. Invoke onPartitionsRevoked. + callbackResult = revokePartitions(droppedPartitions); + } else { + // Member is not part of the group anymore. Invoke onPartitionsLost. + callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); + } + } + return callbackResult; + } + + /** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#LEAVING} state so that a heartbeat + * request is sent out with it. + */ + private void transitionToSendingLeaveGroup() { + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + currentAssignment = new HashSet<>(); + transitionTo(MemberState.LEAVING); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean shouldHeartbeatNow() { + MemberState state = state(); + return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING; + } + + /** + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSent() { + MemberState state = state(); + if (state == MemberState.ACKNOWLEDGING) { + if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); + } else { + log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent " + + "to ack a previous reconciliation. New assignments are ready to " + + "be reconciled.", memberId, memberEpoch, MemberState.RECONCILING); + transitionTo(MemberState.RECONCILING); + } + } else if (state == MemberState.LEAVING) { + transitionToUnsubscribed(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSkipped() { + if (state == MemberState.LEAVING) { + log.debug("Heartbeat for leaving group could not be sent. Member {} with epoch {} will transition to {}.", + memberId, memberEpoch, MemberState.UNSUBSCRIBED); + transitionToUnsubscribed(); + } + } + + private void transitionToUnsubscribed() { + transitionTo(MemberState.UNSUBSCRIBED); + leaveGroupInProgress.get().complete(null); + leaveGroupInProgress = Optional.empty(); + } + + /** + * @return True if there are no assignments waiting to be resolved from metadata or reconciled. + */ + private boolean allPendingAssignmentsReconciled() { + return assignmentUnresolved.isEmpty() && assignmentReadyToReconcile.isEmpty(); + } + + @Override + public boolean shouldSkipHeartbeat() { + MemberState state = state(); + return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; + } + + /** + * Reconcile the assignment that has been received from the server and for which topic names + * are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed, + * trigger the callbacks and update the subscription state. Note that only one reconciliation + * can be in progress at a time. If there is already another one in progress when this is + * triggered, it will be no-op, and the assignment will be reconciled on the next + * reconciliation loop. + */ + boolean reconcile() { + // Make copy of the assignment to reconcile as it could change as new assignments or metadata updates are received + SortedSet<TopicIdPartition> assignedTopicIdPartitions = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); + assignedTopicIdPartitions.addAll(assignmentReadyToReconcile); + + SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + ownedPartitions.addAll(subscriptions.assignedPartitions()); + + // Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are + // being reconciled. Needed for interactions with the centralized subscription state that + // does not support topic IDs yet, and for the callbacks. + SortedSet<TopicPartition> assignedTopicPartition = toTopicPartitionSet(assignedTopicIdPartitions); + + // Check same assignment. Based on topic names for now, until topic IDs are properly + // supported in the centralized subscription state object. + boolean sameAssignmentReceived = assignedTopicPartition.equals(ownedPartitions); + + if (reconciliationInProgress || sameAssignmentReceived) { + String reason; + if (reconciliationInProgress) { + reason = "Another reconciliation is already in progress. Assignment " + + assignmentReadyToReconcile + " will be handled in the next reconciliation loop."; + } else { + reason = "Target assignment ready to reconcile is equals to the member current assignment."; + } + log.debug("Ignoring reconciliation attempt. " + reason); + return false; + } + + markReconciliationInProgress(); + + // Partitions to assign (not previously owned) + SortedSet<TopicPartition> addedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + addedPartitions.addAll(assignedTopicPartition); + addedPartitions.removeAll(ownedPartitions); + + // Partitions to revoke + SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + revokedPartitions.addAll(ownedPartitions); + revokedPartitions.removeAll(assignedTopicPartition); + + log.info("Updating assignment with\n" + + "\tAssigned partitions: {}\n" + + "\tCurrent owned partitions: {}\n" + + "\tAdded partitions (assigned - owned): {}\n" + + "\tRevoked partitions (owned - assigned): {}\n", + assignedTopicIdPartitions, + ownedPartitions, + addedPartitions, + revokedPartitions + ); + + CompletableFuture<Void> revocationResult; + if (!revokedPartitions.isEmpty()) { + revocationResult = revokePartitions(revokedPartitions); + } else { + revocationResult = CompletableFuture.completedFuture(null); + // Reschedule the auto commit starting from now (new assignment received without any + // revocation). + commitRequestManager.resetAutoCommitTimer(); + } + + // Future that will complete when the full reconciliation process completes (revocation + // and assignment, executed sequentially) + CompletableFuture<Void> reconciliationResult = + revocationResult.thenCompose(__ -> { + boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch; + if (state == MemberState.RECONCILING && !memberHasRejoined) { + // Apply assignment + CompletableFuture<Void> assignResult = assignPartitions(assignedTopicPartition, + addedPartitions); + + // Clear topic names cache only for topics that are not in the subscription anymore + for (TopicPartition tp : revokedPartitions) { + if (!subscriptions.subscription().contains(tp.topic())) { + assignedTopicNamesCache.values().remove(tp.topic()); + } + } Review Comment: nit: Should we move this code into `assignPartitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org