lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395929369
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); + } + + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + // TODO: If the member is already part of the group, this should only ensure that the + // updated subscription is included in the next heartbeat request. Review Comment: Agree that the next HB will pick it up based on the interval (also not seeing much need/value in the forced HB). Removed the TODO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org