lucasbru commented on code in PR #18669: URL: https://github.com/apache/kafka/pull/18669#discussion_r1933937741
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) Review Comment: Sure. More generally, are you going for 100% coverage, or what is the motivation to test these cases specifically? I would attempt to get the simple code like this through as much as possible on the first try, but you always have many such comments. I try to test what I think is useful to test, but I guess we are never aligned on this? If you are aiming for 100% coverage that'd be good to know. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata Review Comment: I'm puzzled, where would you see the benefit? There is a practice of checking for `null` in constructors, because it helps detecting problems earlier (the null pointer exception when accessing a null field can come much later). Here, we'd add the check exactly where the null pointer exception would fly as well. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment Review Comment: See above. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member Review Comment: See above ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( Review Comment: Done ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class StreamsCoordinatorRecordHelpersTest { + + @Test + public void testNewStreamsGroupMemberRecord() { + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .setRackId("rack-id") Review Comment: Done. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(member.memberEpoch()) + .setPreviousMemberEpoch(member.previousMemberEpoch()) + .setState(member.state().value()) + .setActiveTasks(toTaskIds(member.assignedTasks().activeTasks())) + .setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks())) + .setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks())) + .setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks())) + .setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks())) + .setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupCurrentMemberAssignment tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupCurrentAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds( + Map<String, Set<Integer>> tasks + ) { + List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size()); + tasks.forEach((subtopologyId, partitions) -> + taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopologyId(subtopologyId) + .setPartitions(partitions.stream().sorted().toList())) + ); + taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId)); + return taskIds; + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param topology The new topology. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, + StreamsGroupHeartbeatRequestData.Topology topology) { + return newStreamsGroupTopologyRecord(groupId, convertToStreamsGroupTopologyRecord(topology)); + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param value The encoded topology record value. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, StreamsGroupTopologyValue value) { Review Comment: We use the topology value record to configure the topology - so we can do it both when we get it from the heartbeat and when we get it from the record. So in the heartbeat handler, we want to have the `coordinatorrecord` for storing the topology and the `StreamsGroupTopologyValue` record for configuring the topology. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(member.memberEpoch()) + .setPreviousMemberEpoch(member.previousMemberEpoch()) + .setState(member.state().value()) + .setActiveTasks(toTaskIds(member.assignedTasks().activeTasks())) + .setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks())) + .setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks())) + .setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks())) + .setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks())) + .setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupCurrentMemberAssignment tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupCurrentAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds( + Map<String, Set<Integer>> tasks + ) { + List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size()); + tasks.forEach((subtopologyId, partitions) -> + taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopologyId(subtopologyId) + .setPartitions(partitions.stream().sorted().toList())) + ); + taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId)); + return taskIds; + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param topology The new topology. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, + StreamsGroupHeartbeatRequestData.Topology topology) { Review Comment: See above ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java: ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class contains helper methods to create records stored in the __consumer_offsets topic. + */ +public class StreamsCoordinatorRecordHelpers { + + private StreamsCoordinatorRecordHelpers() { + } + + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId().orElse(null)) + .setInstanceId(member.instanceId().orElse(null)) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyEpoch(member.topologyEpoch()) + .setProcessId(member.processId()) + .setUserEndpoint(member.userEndpoint().orElse(null)) + .setClientTags(member.clientTags().entrySet().stream().map(e -> + new StreamsGroupMemberMetadataValue.KeyValue() + .setKey(e.getKey()) + .setValue(e.getValue()) + ).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMemberMetadata tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata record. + * + * @param groupId The streams group id. + * @param newPartitionMetadata The partition metadata. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord( + String groupId, + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata + ) { + StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); + newPartitionMetadata.forEach((topicName, topicMetadata) -> { + List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(racks.stream().sorted().toList()) + ) + ); + } + partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition)); + value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(topicMetadata.id()) + .setTopicName(topicMetadata.name()) + .setNumPartitions(topicMetadata.numPartitions()) + .setPartitionMetadata(partitionMetadata) + ); + }); + + value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName)); + + return CoordinatorRecord.record( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + value, + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupPartitionMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupPartitionMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( + String groupId, + String memberId, + TasksTuple assignment + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size()); + for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue().stream().sorted().toList()) + ); + } + warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId)); + + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMember tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return CoordinatorRecord.record( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupTargetAssignmentMetadata tombstone. + * + * @param groupId The streams group id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( + String groupId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ); + } + + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member + ) { + return CoordinatorRecord.record( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + new ApiMessageAndVersion( + new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(member.memberEpoch()) + .setPreviousMemberEpoch(member.previousMemberEpoch()) + .setState(member.state().value()) + .setActiveTasks(toTaskIds(member.assignedTasks().activeTasks())) + .setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks())) + .setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks())) + .setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks())) + .setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks())) + .setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())), + (short) 0 + ) + ); + } + + /** + * Creates a StreamsGroupCurrentMemberAssignment tombstone. + * + * @param groupId The streams group id. + * @param memberId The streams group member id. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupCurrentAssignmentTombstoneRecord( + String groupId, + String memberId + ) { + return CoordinatorRecord.tombstone( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(memberId) + ); + } + + private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds( + Map<String, Set<Integer>> tasks + ) { + List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size()); + tasks.forEach((subtopologyId, partitions) -> + taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopologyId(subtopologyId) + .setPartitions(partitions.stream().sorted().toList())) + ); + taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId)); + return taskIds; + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param topology The new topology. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, + StreamsGroupHeartbeatRequestData.Topology topology) { + return newStreamsGroupTopologyRecord(groupId, convertToStreamsGroupTopologyRecord(topology)); + } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param value The encoded topology record value. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, StreamsGroupTopologyValue value) { + return CoordinatorRecord.record( + new StreamsGroupTopologyKey() + .setGroupId(groupId), + new ApiMessageAndVersion(value, (short) 0) + ); + } + + /** + * Encodes subtopologies from the Heartbeat RPC to a StreamsTopology record value. + * + * @param topology The new topology + * @return The record value. + */ + public static StreamsGroupTopologyValue convertToStreamsGroupTopologyRecord(StreamsGroupHeartbeatRequestData.Topology topology) { + StreamsGroupTopologyValue value = new StreamsGroupTopologyValue(); + value.setEpoch(topology.epoch()); + topology.subtopologies().forEach(subtopology -> { + List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics = Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org