cadonna commented on code in PR #18669: URL: https://github.com/apache/kafka/pull/18669#discussion_r1930394290
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) Review Comment: Could you please add tests for the else-branch? Here and elsewhere. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. Review Comment: Is this comment needed? Could you please also add a unit test for this case? The test should be enough documentation for the code. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment Review Comment: Should we check for `null`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member Review Comment: Should we check for `null`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); Review Comment: Could you add a test for each task type where the list of task IDs is empty? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(member.memberEpoch()) + .setPreviousMemberEpoch(member.previousMemberEpoch()) + .setState(member.state().value()) + .setActiveTasks(toTaskIds(member.assignedTasks().activeTasks())) + .setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks())) + .setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks())) + .setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks())) + .setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks())) + .setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupCurrentMemberAssignment tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupCurrentAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds( + Map<String, Set<Integer>> tasks + ) { + List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size()); + tasks.forEach((subtopologyId, partitions) -> + taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopologyId(subtopologyId) + .setPartitions(partitions.stream().sorted().toList())) + ); + taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId)); + return taskIds; + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param topology The new topology. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, + StreamsGroupHeartbeatRequestData.Topology topology) { Review Comment: Should we check for `null`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class StreamsCoordinatorRecordHelpersTest { + + @Test + public void testNewStreamsGroupMemberRecord() { + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .setRackId("rack-id") Review Comment: nit: all of those strings are used many times. Maybe it makes sense to use a constant. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(member.memberEpoch()) + .setPreviousMemberEpoch(member.previousMemberEpoch()) + .setState(member.state().value()) + .setActiveTasks(toTaskIds(member.assignedTasks().activeTasks())) + .setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks())) + .setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks())) + .setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks())) + .setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks())) + .setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupCurrentMemberAssignment tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupCurrentAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds( + Map<String, Set<Integer>> tasks + ) { + List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size()); + tasks.forEach((subtopologyId, partitions) -> + taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopologyId(subtopologyId) + .setPartitions(partitions.stream().sorted().toList())) + ); + taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId)); + return taskIds; + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param topology The new topology. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, + StreamsGroupHeartbeatRequestData.Topology topology) { + return newStreamsGroupTopologyRecord(groupId, convertToStreamsGroupTopologyRecord(topology)); + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param value The encoded topology record value. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, StreamsGroupTopologyValue value) { + return CoordinatorRecord.record( + new StreamsGroupTopologyKey() + .setGroupId(groupId), + new ApiMessageAndVersion(value, (short) 0) + ); + } + + /** + * Encodes subtopologies from the Heartbeat RPC to a StreamsTopology record value. + * + * @param topology The new topology + * @return The record value. + */ + public static StreamsGroupTopologyValue convertToStreamsGroupTopologyRecord(StreamsGroupHeartbeatRequestData.Topology topology) { + StreamsGroupTopologyValue value = new StreamsGroupTopologyValue(); + value.setEpoch(topology.epoch()); + topology.subtopologies().forEach(subtopology -> { + List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics = Review Comment: Maybe add tests for empty repartition topics, changelog topics, etc. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata Review Comment: Should we check for `null`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( Review Comment: Could you please add a test with empty assignment? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(member.memberEpoch()) + .setPreviousMemberEpoch(member.previousMemberEpoch()) + .setState(member.state().value()) + .setActiveTasks(toTaskIds(member.assignedTasks().activeTasks())) + .setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks())) + .setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks())) + .setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks())) + .setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks())) + .setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupCurrentMemberAssignment tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupCurrentAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds( + Map<String, Set<Integer>> tasks + ) { + List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size()); + tasks.forEach((subtopologyId, partitions) -> + taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopologyId(subtopologyId) + .setPartitions(partitions.stream().sorted().toList())) + ); + taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId)); + return taskIds; + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param topology The new topology. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, + StreamsGroupHeartbeatRequestData.Topology topology) { + return newStreamsGroupTopologyRecord(groupId, convertToStreamsGroupTopologyRecord(topology)); + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param value The encoded topology record value. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, StreamsGroupTopologyValue value) { Review Comment: Why do we also need this public method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org