AndrewJSchofield commented on code in PR #17941: URL: https://github.com/apache/kafka/pull/17941#discussion_r1862713952
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetaManagerHelper.java: ########## @@ -0,0 +1,1613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import com.google.re2j.Pattern; +import com.google.re2j.PatternSyntaxException; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRegularExpression; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnreleasedInstanceIdException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.image.TopicsImage; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE; +import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; +import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; +import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID; +import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC; +import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.Utils.assignmentToString; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; + +public class GroupMetaManagerHelper { Review Comment: I wonder whether this was intended to be `GroupMetadataManagerHelper`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupMetadataManager.java: ########## @@ -0,0 +1,1136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData; +import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.ModernGroup; +import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup; +import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder; +import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; +import org.apache.kafka.image.MetadataImage; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.Utils.assignmentToString; +import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged; + +public class ShareGroupMetadataManager { + + GroupStore groupStore; + + /** + * The system time. + */ + private final Time time; + + /** + * The system timer. + */ + private final CoordinatorTimer<Void, CoordinatorRecord> timer; + + /** + * The share group partition assignor. + */ + private final ShareGroupPartitionAssignor shareGroupAssignor; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The share group metadata refresh interval. + */ + private final int shareGroupMetadataRefreshIntervalMs; + + /** + * The maximum number of members allowed in a single share group. + */ + private final int shareGroupMaxSize; + + /** + * The session timeout for share groups. + */ + private final int shareGroupSessionTimeoutMs; + + /** + * The heartbeat interval for share groups. + */ + private final int shareGroupHeartbeatIntervalMs; + + /** + * The log context. + */ + private final LogContext logContext; + + /** + * The logger. + */ + private final Logger log; + + /** + * The group manager. + */ + private final GroupConfigManager groupConfigManager; + + public ShareGroupMetadataManager( + GroupStore groupStore, + ShareGroupPartitionAssignor shareGroupAssignor, + Time time, + CoordinatorTimer<Void, CoordinatorRecord> timer, + int shareGroupMaxSize, + int shareGroupMetadataRefreshIntervalMs, + int shareGroupSessionTimeoutMs, + int shareGroupHeartbeatIntervalMs, + LogContext logContext, + GroupConfigManager groupConfigManager) { + this.groupStore = groupStore; + this.shareGroupAssignor = shareGroupAssignor; + this.time = time; + this.timer = timer; + this.shareGroupMaxSize = shareGroupMaxSize; + this.metadataImage = groupStore.image(); + this.shareGroupMetadataRefreshIntervalMs = shareGroupMetadataRefreshIntervalMs; + this.shareGroupSessionTimeoutMs = shareGroupSessionTimeoutMs; + this.shareGroupHeartbeatIntervalMs = shareGroupHeartbeatIntervalMs; + this.logContext = logContext; + this.log = this.logContext.logger(ShareGroupMetadataManager.class); + this.groupConfigManager = groupConfigManager; + } + + /** + * Handles a ShareGroupDescribe request. + * + * @param groupIds The IDs of the groups to describe. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A list containing the ShareGroupDescribeResponseData.DescribedGroup. + */ + public List<ShareGroupDescribeResponseData.DescribedGroup> shareGroupDescribe( + List<String> groupIds, + long committedOffset + ) { + final List<ShareGroupDescribeResponseData.DescribedGroup> describedGroups = new ArrayList<>(); + groupIds.forEach(groupId -> { + try { + describedGroups.add(shareGroup(groupId, committedOffset).asDescribedGroup( + committedOffset, + shareGroupAssignor.name(), + metadataImage.topics() + )); + } catch (GroupIdNotFoundException exception) { + describedGroups.add(new ShareGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + ); + } + }); + + return describedGroups; + } + + /** + * Gets a share group by committed offset. + * + * @param groupId The group id. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A ConsumerGroup. Review Comment: A ShareGroup. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupMetadataManager.java: ########## @@ -0,0 +1,1136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData; +import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.ModernGroup; +import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup; +import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder; +import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; +import org.apache.kafka.image.MetadataImage; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.Utils.assignmentToString; +import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged; + +public class ShareGroupMetadataManager { + + GroupStore groupStore; + + /** + * The system time. + */ + private final Time time; + + /** + * The system timer. + */ + private final CoordinatorTimer<Void, CoordinatorRecord> timer; + + /** + * The share group partition assignor. + */ + private final ShareGroupPartitionAssignor shareGroupAssignor; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The share group metadata refresh interval. + */ + private final int shareGroupMetadataRefreshIntervalMs; + + /** + * The maximum number of members allowed in a single share group. + */ + private final int shareGroupMaxSize; + + /** + * The session timeout for share groups. + */ + private final int shareGroupSessionTimeoutMs; + + /** + * The heartbeat interval for share groups. + */ + private final int shareGroupHeartbeatIntervalMs; + + /** + * The log context. + */ + private final LogContext logContext; + + /** + * The logger. + */ + private final Logger log; + + /** + * The group manager. Review Comment: Group config manager. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupMetadataManager.java: ########## @@ -0,0 +1,1136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData; +import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.ModernGroup; +import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup; +import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder; +import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; +import org.apache.kafka.image.MetadataImage; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; Review Comment: It seems to me that this import is incorrect. In almost all cases, `ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH` is used, and I think actually that should be used in the other one also, meaning that this import is not required. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ClassicGroupMetadataManager.java: ########## @@ -0,0 +1,2500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.InconsistentGroupProtocolException; +import org.apache.kafka.common.errors.RebalanceInProgressException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerProtocolSubscription; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE; +import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION; +import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID; +import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC; +import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.Utils.ofSentinel; +import static org.apache.kafka.coordinator.group.Utils.toConsumerProtocolAssignment; +import static org.apache.kafka.coordinator.group.Utils.toTopicPartitions; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; + +public class ClassicGroupMetadataManager { + + private final GroupStore groupStore; + private final GroupMetaManagerHelper helper; + + /** + * The maximum number of members allowed in a single classic group. + */ + private final int classicGroupMaxSize; + + /** + * Initial rebalance delay for members joining a classic group. + */ + private final int classicGroupInitialRebalanceDelayMs; + + /** + * The metadata refresh interval. + */ + private final int consumerGroupMetadataRefreshIntervalMs; + + /** + * The timeout used to wait for a new member in milliseconds. + */ + private final int classicGroupNewMemberJoinTimeoutMs; + + /** + * The maximum number of members allowed in a single consumer group. + */ + private final int consumerGroupMaxSize; + + /** + * The coordinator metrics. + */ + private final GroupCoordinatorMetricsShard metrics; + + /** + * The system timer. + */ + private final CoordinatorTimer<Void, CoordinatorRecord> timer; + + /** + * The system time. + */ + private final Time time; + + /** + * The log context. + */ + private final LogContext logContext; + + /** + * The logger. + */ + private final Logger log; + + /** + * The supported consumer group partition assignors keyed by their name. + */ + private final Map<String, ConsumerGroupPartitionAssignor> consumerGroupAssignors; + + /** + * The default consumer group assignor used. + */ + private final ConsumerGroupPartitionAssignor defaultConsumerGroupAssignor; + + /** + * The config indicating whether group protocol upgrade/downgrade is allowed. + */ + private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; + + + /** + * An empty result returned to the state machine. This means that + * there are no records to append to the log. + * + * Package private for testing. + */ + static final CoordinatorResult<Void, CoordinatorRecord> EMPTY_RESULT = + new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null), false); + + + public ClassicGroupMetadataManager( + GroupStore groupStore, + GroupCoordinatorMetricsShard metrics, + int classicGroupMaxSize, + int classicGroupInitialRebalanceDelayMs, + int consumerGroupMetadataRefreshIntervalMs, + int classicGroupNewMemberJoinTimeoutMs, + int consumerGroupMaxSize, + CoordinatorTimer<Void, CoordinatorRecord> timer, + Time time, + LogContext logContext, + List<ConsumerGroupPartitionAssignor> consumerGroupAssignors, + ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy) { + + this.groupStore = groupStore; + this.metrics = metrics; + this.classicGroupMaxSize = classicGroupMaxSize; + this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs; + this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs; + this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs; + this.consumerGroupMaxSize = consumerGroupMaxSize; + this.timer = timer; + this.time = time; + this.logContext = logContext; + this.log = logContext.logger(ClassicGroupMetadataManager.class); + this.consumerGroupAssignors = consumerGroupAssignors.stream().collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity())); + this.defaultConsumerGroupAssignor = consumerGroupAssignors.get(0); + this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy; + this.helper = new GroupMetaManagerHelper(groupStore, time, timer, logContext, log, consumerGroupMigrationPolicy, classicGroupMaxSize, classicGroupInitialRebalanceDelayMs); + } + + /** + * Handles a DescribeGroup request. Review Comment: pedantic nit: It's DescribeGroups. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ClassicGroupMetadataManager.java: ########## @@ -0,0 +1,2500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.InconsistentGroupProtocolException; +import org.apache.kafka.common.errors.RebalanceInProgressException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerProtocolSubscription; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE; +import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION; +import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID; +import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC; +import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.Utils.ofSentinel; +import static org.apache.kafka.coordinator.group.Utils.toConsumerProtocolAssignment; +import static org.apache.kafka.coordinator.group.Utils.toTopicPartitions; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; + +public class ClassicGroupMetadataManager { + + private final GroupStore groupStore; + private final GroupMetaManagerHelper helper; + + /** + * The maximum number of members allowed in a single classic group. + */ + private final int classicGroupMaxSize; + + /** + * Initial rebalance delay for members joining a classic group. + */ + private final int classicGroupInitialRebalanceDelayMs; + + /** + * The metadata refresh interval. + */ + private final int consumerGroupMetadataRefreshIntervalMs; + + /** + * The timeout used to wait for a new member in milliseconds. + */ + private final int classicGroupNewMemberJoinTimeoutMs; + + /** + * The maximum number of members allowed in a single consumer group. + */ + private final int consumerGroupMaxSize; + + /** + * The coordinator metrics. + */ + private final GroupCoordinatorMetricsShard metrics; + + /** + * The system timer. + */ + private final CoordinatorTimer<Void, CoordinatorRecord> timer; + + /** + * The system time. + */ + private final Time time; + + /** + * The log context. + */ + private final LogContext logContext; + + /** + * The logger. + */ + private final Logger log; + + /** + * The supported consumer group partition assignors keyed by their name. + */ + private final Map<String, ConsumerGroupPartitionAssignor> consumerGroupAssignors; + + /** + * The default consumer group assignor used. + */ + private final ConsumerGroupPartitionAssignor defaultConsumerGroupAssignor; + + /** + * The config indicating whether group protocol upgrade/downgrade is allowed. + */ + private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; + + + /** + * An empty result returned to the state machine. This means that + * there are no records to append to the log. + * + * Package private for testing. + */ + static final CoordinatorResult<Void, CoordinatorRecord> EMPTY_RESULT = + new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null), false); + + + public ClassicGroupMetadataManager( + GroupStore groupStore, + GroupCoordinatorMetricsShard metrics, + int classicGroupMaxSize, + int classicGroupInitialRebalanceDelayMs, + int consumerGroupMetadataRefreshIntervalMs, + int classicGroupNewMemberJoinTimeoutMs, + int consumerGroupMaxSize, + CoordinatorTimer<Void, CoordinatorRecord> timer, + Time time, + LogContext logContext, + List<ConsumerGroupPartitionAssignor> consumerGroupAssignors, + ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy) { Review Comment: I think this would be a bit more legible with the line break after "policy" so the parentheses is at the state of the next line. Then the argument declarations will not blend into the assignments in the method body. There are a few other instances in this PR which would also benefit from the same formatting tweak. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMetadataManager.java: ########## @@ -0,0 +1,1356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.ModernGroup; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION; +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; +import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC; +import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER; +import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.Utils.ofSentinel; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged; + +public class ConsumerGroupMetadataManager { + + private final GroupStore groupStore; + private final GroupMetaManagerHelper helper; + + /** + * The default consumer group assignor used. + */ + private final ConsumerGroupPartitionAssignor defaultConsumerGroupAssignor; + + /** + * The supported consumer group partition assignors keyed by their name. + */ + private final Map<String, ConsumerGroupPartitionAssignor> consumerGroupAssignors; + + /** + * The metadata refresh interval. + */ + private final int consumerGroupMetadataRefreshIntervalMs; + + /** + * The maximum number of members allowed in a single consumer group. + */ + private final int consumerGroupMaxSize; + + /** + * The default heartbeat interval for consumer groups. + */ + private final int consumerGroupHeartbeatIntervalMs; + + /** + * Initial rebalance delay for members joining a classic group. + */ + private final int classicGroupInitialRebalanceDelayMs; + + /** + * The maximum number of members allowed in a single classic group. + */ + private final int classicGroupMaxSize; + + /** + * The config indicating whether group protocol upgrade/downgrade is allowed. + */ + private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; + + /** + * The group manager. + */ + private final GroupConfigManager groupConfigManager; + + /** + * The default session timeout for consumer groups. + */ + private final int consumerGroupSessionTimeoutMs; + + /** + * The coordinator metrics. + */ + private final GroupCoordinatorMetricsShard metrics; + + /** + * The system time. + */ + private final Time time; + + /** + * The system timer. + */ + private final CoordinatorTimer<Void, CoordinatorRecord> timer; + + /** + * The log context. + */ + private final LogContext logContext; + + /** + * The logger. + */ + private final Logger log; + + + public ConsumerGroupMetadataManager( + GroupStore groupStore, + List<ConsumerGroupPartitionAssignor> consumerGroupAssignors, + int consumerGroupMetadataRefreshIntervalMs, + int classicGroupInitialRebalanceDelayMs, + int consumerGroupMaxSize, + int consumerGroupHeartbeatIntervalMs, + int consumerGroupSessionTimeoutMs, + int classicGroupMaxSize, + GroupConfigManager groupConfigManager, + ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy, + LogContext logContext, + GroupCoordinatorMetricsShard metrics, + Time time, + CoordinatorTimer<Void, CoordinatorRecord> timer) { + this.groupStore = groupStore; Review Comment: nit: Probably more legible with a line break before the `) {` so the method body is distinct from the argument declarations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org