AndrewJSchofield commented on code in PR #19026: URL: https://github.com/apache/kafka/pull/19026#discussion_r1973313677
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -463,6 +472,19 @@ GroupMetadataManager build() { */ private final ShareGroupPartitionAssignor shareGroupAssignor; + /** + * A record class to hold the value representing ShareGroupStatePartitionMetadata for the TimelineHashmap + * keyed on share group id. + * + * @param initializedTopics Map of set of partition ids keyed on the topic id. + * @param deletingTopics Map of topic names keyed on topic id. + */ + private record ShareGroupStatePartitionMetadataInfo( Review Comment: I don't understand why we need topic name in one case, but not the other. Option 1: ``` Map<Uuid, Set<Integer> intializedTopics Set<Uuid> deletingTopics ``` Option 2: ``` Map<Uuid, Map.Entry<String, Set<Integer>> intializedTopics Map<Uuid, String> deletingTopics ``` I suspect that option 1 would be sufficient. What am I missing? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2190,10 +2213,10 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro * @param clientHost The client host. * @param subscribedTopicNames The list of subscribed topic names from the request or null. * - * @return A Result containing the ShareGroupHeartbeat response and - * a list of records to update the state machine. + * @return A Result containing a pair of ShareGroupHeartbeat response and maybe InitializeShareGroupStateParameters + * and a list of records to update the state machine. */ - private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat( + private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat( Review Comment: I am tempted to create a `Pair<K,V>` in the common module. There are other places I've seen where we need a pair. Absolutely no need for you to address that in this PR. I understand what you're doing here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4741,6 +4889,50 @@ public void replay( } } + /** + * Replays ShareGroupStatePartitionMetadataKey/Value to update the hard state of + * the share group. + * + * @param key A ShareGroupStatePartitionMetadataKey key. + * @param value A ShareGroupStatePartitionMetadataValue record. + */ + public void replay( + ShareGroupStatePartitionMetadataKey key, + ShareGroupStatePartitionMetadataValue value + ) { + String groupId = key.groupId(); + + getOrMaybeCreatePersistedShareGroup(groupId, false); + + // Update timeline structures with info about initialized/deleted topics. + if (value == null) { + // Tombstone! + shareGroupPartitionMetadata.remove(groupId); + } else { + // Init java record. + ShareGroupStatePartitionMetadataInfo record = shareGroupPartitionMetadata.computeIfAbsent( + groupId, k -> new ShareGroupStatePartitionMetadataInfo(new HashMap<>(), new HashMap<>()) + ); + + // Remove all topicIds in deleting state from java record. + Map<Uuid, String> deleting = new HashMap<>(); + for (ShareGroupStatePartitionMetadataValue.TopicInfo deletingTopic : value.deletingTopics()) { + deleting.put(deletingTopic.topicId(), deletingTopic.topicName()); + } + deleting.forEach((tId, tName) -> record.initializedTopics.remove(tId)); Review Comment: Looks to me like the initialized topics list is not initialized yet. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -1066,6 +1086,13 @@ public void replay( ); break; + case SHARE_GROUP_STATE_PARTITION_METADATA: Review Comment: Please move this case to follow `SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3981,6 +4105,30 @@ public CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sha request.subscribedTopicNames()); } + /** + * Handles an initialize share group state request. This is usually part of + * shareGroupHeartbeat code flow. + * @param groupId The group id corresponding to the share group whose share partitions have been initialized. + * @param topicPartitionMap Map representing topic partition data to be added to the share state partition metadata. + * + * @return A Result containing ShareGroupStatePartitionMetadata records and Void response. + */ + public CoordinatorResult<Void, CoordinatorRecord> initializeShareGroupState( + String groupId, + Map<Uuid, Map.Entry<String, List<Integer>>> topicPartitionMap + ) { + // Should be present + ShareGroup group = (ShareGroup) groups.get(groupId); + if (topicPartitionMap == null || topicPartitionMap.isEmpty()) { + return new CoordinatorResult<>(List.of(), null); + } + + return new CoordinatorResult<>( + List.of(newShareGroupPartitionMetadataRecord(group.groupId(), topicPartitionMap, Map.of())), Review Comment: This is incorrect in the case where the number of partitions has increased. Let's say that the T1 has increased from 1 to 4 partitions. The `topicPartitionMap` will contain `{topicId, {"T1", [1,2,3]}}`. However, the `ShareGroupPartitionMetadataRecord` should contain the complete list of partitions, not just the new ones. Because the `__consumer_offsets` topic is compacted and these records are keyed by group ID, only one such record can be expected to remain present. As a result, the most recent record needs to contain the most up-to-date list of all initialized topic-partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org