lucasbru commented on code in PR #18669:
URL: https://github.com/apache/kafka/pull/18669#discussion_r1933937741


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))

Review Comment:
   Sure. 
   
   More generally, are you going for 100% coverage, or what is the motivation 
to test these cases specifically? I would attempt to get the simple code like 
this through as much as possible on the first try, but you always have many 
such comments. I try to test what I think is useful to test, but I guess we are 
never aligned on this? If you are aiming for 100% coverage that'd be good to 
know.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            // If the partition rack information map is empty, store an empty 
list in the record.
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) ->
+                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(racks.stream().sorted().toList())
+                    )
+                );
+            }
+            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+                .setPartitionMetadata(partitionMetadata)
+            );
+        });
+
+        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        TasksTuple assignment
+    ) {
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = 
new ArrayList<>(assignment.activeTasks().size());

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            // If the partition rack information map is empty, store an empty 
list in the record.

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata

Review Comment:
   I'm puzzled, where would you see the benefit? There is a practice of 
checking for `null` in constructors, because it helps detecting problems 
earlier (the null pointer exception when accessing a null field can come much 
later). Here, we'd add the check exactly where the null pointer exception would 
fly as well. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            // If the partition rack information map is empty, store an empty 
list in the record.
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) ->
+                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(racks.stream().sorted().toList())
+                    )
+                );
+            }
+            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+                .setPartitionMetadata(partitionMetadata)
+            );
+        });
+
+        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        TasksTuple assignment

Review Comment:
   See above.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            // If the partition rack information map is empty, store an empty 
list in the record.
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) ->
+                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(racks.stream().sorted().toList())
+                    )
+                );
+            }
+            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+                .setPartitionMetadata(partitionMetadata)
+            );
+        });
+
+        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        TasksTuple assignment
+    ) {
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = 
new ArrayList<>(assignment.activeTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.activeTasks().entrySet()) {
+            activeTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = 
new ArrayList<>(assignment.standbyTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.standbyTasks().entrySet()) {
+            standbyTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = 
new ArrayList<>(assignment.warmupTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.warmupTasks().entrySet()) {
+            warmupTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberValue()
+                    .setActiveTasks(activeTaskIds)
+                    .setStandbyTasks(standbyTaskIds)
+                    .setWarmupTasks(warmupTaskIds),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMember tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+        String groupId,
+        int assignmentEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(assignmentEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
+        String groupId,
+        StreamsGroupMember member

Review Comment:
   See above



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            // If the partition rack information map is empty, store an empty 
list in the record.
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) ->
+                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(racks.stream().sorted().toList())
+                    )
+                );
+            }
+            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+                .setPartitionMetadata(partitionMetadata)
+            );
+        });
+
+        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        TasksTuple assignment
+    ) {
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = 
new ArrayList<>(assignment.activeTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.activeTasks().entrySet()) {
+            activeTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = 
new ArrayList<>(assignment.standbyTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.standbyTasks().entrySet()) {
+            standbyTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = 
new ArrayList<>(assignment.warmupTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.warmupTasks().entrySet()) {
+            warmupTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberValue()
+                    .setActiveTasks(activeTaskIds)
+                    .setStandbyTasks(standbyTaskIds)
+                    .setWarmupTasks(warmupTaskIds),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMember tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+        String groupId,
+        int assignmentEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(assignmentEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(

Review Comment:
   Done



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class StreamsCoordinatorRecordHelpersTest {
+
+    @Test
+    public void testNewStreamsGroupMemberRecord() {
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+            .setRackId("rack-id")

Review Comment:
   Done.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            // If the partition rack information map is empty, store an empty 
list in the record.
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) ->
+                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(racks.stream().sorted().toList())
+                    )
+                );
+            }
+            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+                .setPartitionMetadata(partitionMetadata)
+            );
+        });
+
+        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        TasksTuple assignment
+    ) {
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = 
new ArrayList<>(assignment.activeTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.activeTasks().entrySet()) {
+            activeTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = 
new ArrayList<>(assignment.standbyTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.standbyTasks().entrySet()) {
+            standbyTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = 
new ArrayList<>(assignment.warmupTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.warmupTasks().entrySet()) {
+            warmupTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberValue()
+                    .setActiveTasks(activeTaskIds)
+                    .setStandbyTasks(standbyTaskIds)
+                    .setWarmupTasks(warmupTaskIds),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMember tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+        String groupId,
+        int assignmentEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(assignmentEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupCurrentMemberAssignmentValue()
+                    .setMemberEpoch(member.memberEpoch())
+                    .setPreviousMemberEpoch(member.previousMemberEpoch())
+                    .setState(member.state().value())
+                    
.setActiveTasks(toTaskIds(member.assignedTasks().activeTasks()))
+                    
.setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks()))
+                    
.setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks()))
+                    
.setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks()))
+                    
.setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks()))
+                    
.setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupCurrentMemberAssignment tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupCurrentAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
toTaskIds(
+        Map<String, Set<Integer>> tasks
+    ) {
+        List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new 
ArrayList<>(tasks.size());
+        tasks.forEach((subtopologyId, partitions) ->
+            taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopologyId(subtopologyId)
+                .setPartitions(partitions.stream().sorted().toList()))
+        );
+        
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
+        return taskIds;
+    }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId  The consumer group id.
+     * @param topology The new topology.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId,
+                                                                  
StreamsGroupHeartbeatRequestData.Topology topology) {
+        return newStreamsGroupTopologyRecord(groupId, 
convertToStreamsGroupTopologyRecord(topology));
+    }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId The consumer group id.
+     * @param value   The encoded topology record value.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId, StreamsGroupTopologyValue value) {

Review Comment:
   We use the topology value record to configure the topology - so we can do it 
both when we get it from the heartbeat and when we get it from the record. So 
in the heartbeat handler, we want to have the `coordinatorrecord` for storing 
the topology and the `StreamsGroupTopologyValue` record for configuring the 
topology. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            // If the partition rack information map is empty, store an empty 
list in the record.
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) ->
+                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(racks.stream().sorted().toList())
+                    )
+                );
+            }
+            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+                .setPartitionMetadata(partitionMetadata)
+            );
+        });
+
+        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        TasksTuple assignment
+    ) {
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = 
new ArrayList<>(assignment.activeTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.activeTasks().entrySet()) {
+            activeTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = 
new ArrayList<>(assignment.standbyTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.standbyTasks().entrySet()) {
+            standbyTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = 
new ArrayList<>(assignment.warmupTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.warmupTasks().entrySet()) {
+            warmupTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberValue()
+                    .setActiveTasks(activeTaskIds)
+                    .setStandbyTasks(standbyTaskIds)
+                    .setWarmupTasks(warmupTaskIds),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMember tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+        String groupId,
+        int assignmentEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(assignmentEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupCurrentMemberAssignmentValue()
+                    .setMemberEpoch(member.memberEpoch())
+                    .setPreviousMemberEpoch(member.previousMemberEpoch())
+                    .setState(member.state().value())
+                    
.setActiveTasks(toTaskIds(member.assignedTasks().activeTasks()))
+                    
.setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks()))
+                    
.setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks()))
+                    
.setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks()))
+                    
.setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks()))
+                    
.setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupCurrentMemberAssignment tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupCurrentAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
toTaskIds(
+        Map<String, Set<Integer>> tasks
+    ) {
+        List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new 
ArrayList<>(tasks.size());
+        tasks.forEach((subtopologyId, partitions) ->
+            taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopologyId(subtopologyId)
+                .setPartitions(partitions.stream().sorted().toList()))
+        );
+        
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
+        return taskIds;
+    }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId  The consumer group id.
+     * @param topology The new topology.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId,
+                                                                  
StreamsGroupHeartbeatRequestData.Topology topology) {

Review Comment:
   See above



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    private StreamsCoordinatorRecordHelpers() {
+    }
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            // If the partition rack information map is empty, store an empty 
list in the record.
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) ->
+                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(racks.stream().sorted().toList())
+                    )
+                );
+            }
+            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+                .setPartitionMetadata(partitionMetadata)
+            );
+        });
+
+        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        TasksTuple assignment
+    ) {
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = 
new ArrayList<>(assignment.activeTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.activeTasks().entrySet()) {
+            activeTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = 
new ArrayList<>(assignment.standbyTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.standbyTasks().entrySet()) {
+            standbyTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = 
new ArrayList<>(assignment.warmupTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.warmupTasks().entrySet()) {
+            warmupTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberValue()
+                    .setActiveTasks(activeTaskIds)
+                    .setStandbyTasks(standbyTaskIds)
+                    .setWarmupTasks(warmupTaskIds),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMember tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+        String groupId,
+        int assignmentEpoch
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(assignmentEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentEpochTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return CoordinatorRecord.record(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupCurrentMemberAssignmentValue()
+                    .setMemberEpoch(member.memberEpoch())
+                    .setPreviousMemberEpoch(member.previousMemberEpoch())
+                    .setState(member.state().value())
+                    
.setActiveTasks(toTaskIds(member.assignedTasks().activeTasks()))
+                    
.setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks()))
+                    
.setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks()))
+                    
.setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks()))
+                    
.setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks()))
+                    
.setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupCurrentMemberAssignment tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupCurrentAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
toTaskIds(
+        Map<String, Set<Integer>> tasks
+    ) {
+        List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new 
ArrayList<>(tasks.size());
+        tasks.forEach((subtopologyId, partitions) ->
+            taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopologyId(subtopologyId)
+                .setPartitions(partitions.stream().sorted().toList()))
+        );
+        
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
+        return taskIds;
+    }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId  The consumer group id.
+     * @param topology The new topology.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId,
+                                                                  
StreamsGroupHeartbeatRequestData.Topology topology) {
+        return newStreamsGroupTopologyRecord(groupId, 
convertToStreamsGroupTopologyRecord(topology));
+    }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId The consumer group id.
+     * @param value   The encoded topology record value.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId, StreamsGroupTopologyValue value) {
+        return CoordinatorRecord.record(
+            new StreamsGroupTopologyKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(value, (short) 0)
+        );
+    }
+
+    /**
+     * Encodes subtopologies from the Heartbeat RPC to a StreamsTopology 
record value.
+     *
+     * @param topology The new topology
+     * @return The record value.
+     */
+    public static StreamsGroupTopologyValue 
convertToStreamsGroupTopologyRecord(StreamsGroupHeartbeatRequestData.Topology 
topology) {
+        StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
+        value.setEpoch(topology.epoch());
+        topology.subtopologies().forEach(subtopology -> {
+            List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics =

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to