cadonna commented on code in PR #18809:
URL: https://github.com/apache/kafka/pull/18809#discussion_r1952406373


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3138,6 +3274,45 @@ private void replaceMember(
         ));
     }
 
+    /**
+     * Fences a member from a streams group and maybe downgrade the streams 
group to a classic group.
+     *
+     * @param group     The group.
+     * @param member    The member.
+     * @param response  The response of the CoordinatorResult.
+     *
+     * @return The CoordinatorResult to be applied.
+     */
+    private <T> CoordinatorResult<T, CoordinatorRecord> 
streamsGroupFenceMember(
+        StreamsGroup group,
+        StreamsGroupMember member,
+        T response
+    ) {
+        List<CoordinatorRecord> records = new ArrayList<>();
+        removeStreamsMember(records, group.groupId(), member.memberId());
+
+        // We bump the group epoch.
+        int groupEpoch = group.groupEpoch() + 1;
+        records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch));
+
+        cancelTimers(group.groupId(), member.memberId());
+
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Write tombstones for the member. The order matters here.
+     *
+     * @param records       The list of records to append the member 
assignment tombstone records.
+     * @param groupId       The group id.
+     * @param memberId      The member id.
+     */
+    private void removeStreamsMember(List<CoordinatorRecord> records, String 
groupId, String memberId) {

Review Comment:
   nit: Why not return a list of records?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3329,6 +3550,27 @@ private void scheduleShareGroupSessionTimeout(
         );
     }
 
+    /**
+     * Schedules (or reschedules) the session timeout for the member.
+     *
+     * @param groupId           The group id.
+     * @param memberId          The member id.
+     * @param sessionTimeoutMs  The session timeout.
+     */
+    private void scheduleStreamsGroupSessionTimeout(

Review Comment:
   This is only used in `scheduleStreamsGroupSessionTimeout(groupId, 
memberId)`. Will this be used somewhere else in future? Why not calling 
`timer.schedule()` directly in `scheduleStreamsGroupSessionTimeout(groupId, 
memberId)`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -830,6 +844,385 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
         verify(groupMetadataManager, times(1)).replay(key, null);
     }
 
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        StreamsGroupMetadataKey key = new StreamsGroupMetadataKey();
+        StreamsGroupMetadataValue value = new StreamsGroupMetadataValue();
+
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
+            key,
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, value);

Review Comment:
   You could drop `times(1)` since it is the default. I am fine either way.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3385,6 +3627,49 @@ private void scheduleConsumerGroupRebalanceTimeout(
         });
     }
 
+    /**
+     * Schedules a rebalance timeout for the member.
+     *
+     * @param groupId               The group id.
+     * @param memberId              The member id.
+     * @param memberEpoch           The member epoch.
+     * @param rebalanceTimeoutMs    The rebalance timeout.
+     */
+    private void scheduleStreamsGroupRebalanceTimeout(
+        String groupId,
+        String memberId,
+        int memberEpoch,
+        int rebalanceTimeoutMs
+    ) {
+        String key = streamsGroupRebalanceTimeoutKey(groupId, memberId);
+        timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+            try {
+                StreamsGroup group = streamsGroup(groupId);
+                StreamsGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+                if (member.memberEpoch() == memberEpoch) {
+                    log.info("[GroupId {}] Member {} fenced from the group 
because " +
+                            "it failed to transition from epoch {} within 
{}ms.",
+                        groupId, memberId, memberEpoch, rebalanceTimeoutMs);
+
+                    return streamsGroupFenceMember(group, member, null);
+                } else {
+                    log.debug("[GroupId {}] Ignoring rebalance timeout for {} 
because the member " +
+                        "left the epoch {}.", groupId, memberId, memberEpoch);

Review Comment:
   What does "leave the epoch" mean?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
     }
 
+    @Test
+    public void testReplayStreamsGroupMemberMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setClientId("clientid")
+            .setClientHost("clienthost")
+            .setRackId("rackid")
+            .setInstanceId("instanceid")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(10)
+            .setProcessId("processid")
+            .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+            .setClientTags(Collections.singletonMap("key", "value"))
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMemberMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists but the member is already gone. Replaying the
+        // StreamsGroupMemberMetadata tombstone should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
StreamsGroupMemberMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("bar"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the StreamsGroupMetadata 
tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
metadata = Map.of(
+            "bar",
+            new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), 
"bar", 10)
+        );
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
 metadata));
+        assertEquals(metadata, 
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadataTombstone() {

Review Comment:
   Could you please also add a test that removes an existing group?
   
   What about all the `IllegalStateException` that are thrown in the method 
under test?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3138,6 +3274,45 @@ private void replaceMember(
         ));
     }
 
+    /**
+     * Fences a member from a streams group and maybe downgrade the streams 
group to a classic group.

Review Comment:
   Is the classic part true for Streams?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
     }
 
+    @Test
+    public void testReplayStreamsGroupMemberMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setClientId("clientid")
+            .setClientHost("clienthost")
+            .setRackId("rackid")
+            .setInstanceId("instanceid")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(10)
+            .setProcessId("processid")
+            .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+            .setClientTags(Collections.singletonMap("key", "value"))
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMemberMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists but the member is already gone. Replaying the
+        // StreamsGroupMemberMetadata tombstone should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
StreamsGroupMemberMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("bar"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the StreamsGroupMetadata 
tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
metadata = Map.of(
+            "bar",
+            new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), 
"bar", 10)
+        );
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
 metadata));
+        assertEquals(metadata, 
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupPartitionMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMember() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        final TasksTuple tasks =
+            new TasksTuple(
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+            );
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
 "m1", tasks));
+        assertEquals(tasks.activeTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+        assertEquals(tasks.standbyTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+        assertEquals(tasks.warmupTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupTargetAssignmentMember tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupTargetAssignmentMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignment() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS)
+            .setAssignedTasks(new TasksTuple(
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 0, 1, 2)),
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 3, 4, 5)),
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 6, 7, 8))
+            ))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists, but the member is already gone. Replaying 
the
+        // StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
StreamsGroupCurrentMemberAssignment tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("bar"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTopology() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
+            .setEpoch(12)
+            .setSubtopologies(
+                List.of(
+                    new StreamsGroupTopologyValue.Subtopology()
+                        .setSubtopologyId("subtopology-1")
+                        .setSourceTopics(List.of("source-topic"))
+                        .setRepartitionSinkTopics(List.of("sink-topic"))
+                )
+            );
+
+        // The group and the topology are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("bar",
 topology));
+        final Optional<StreamsTopology> actualTopology = 
context.groupMetadataManager.streamsGroup("bar").topology();
+        assertTrue(actualTopology.isPresent(), "topology should be set");
+        assertEquals(topology.epoch(), actualTopology.get().topologyEpoch());
+        assertEquals(topology.subtopologies().size(), 
actualTopology.get().subtopologies().size());
+        assertEquals(
+            topology.subtopologies().iterator().next(),
+            actualTopology.get().subtopologies().values().iterator().next()
+        );
+    }
+
+    @Test
+    public void testReplayStreamsGroupTopologyTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()

Review Comment:
   Could you please add a test for when a group exists?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -760,6 +859,43 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
         }
     }
 
+    /**
+     * The method should be called on the replay path.
+     * Gets or maybe creates a streams group and updates the groups map if a 
new group is created.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should 
be
+     *                          created if it does not exist.
+     *
+     * @return A StreamsGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
+     *                                  if the group is not a streams group.
+     * @throws IllegalStateException    if the group does not have the 
expected type.
+     * Package private for testing.
+     */
+    StreamsGroup getOrMaybeCreatePersistedStreamsGroup(

Review Comment:
   why is this method package-private? It is not used outside of this class.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
     }
 
+    @Test
+    public void testReplayStreamsGroupMemberMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setClientId("clientid")
+            .setClientHost("clienthost")
+            .setRackId("rackid")
+            .setInstanceId("instanceid")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(10)
+            .setProcessId("processid")
+            .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+            .setClientTags(Collections.singletonMap("key", "value"))
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMemberMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists but the member is already gone. Replaying the
+        // StreamsGroupMemberMetadata tombstone should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
StreamsGroupMemberMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("bar"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the StreamsGroupMetadata 
tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
metadata = Map.of(
+            "bar",
+            new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), 
"bar", 10)
+        );
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
 metadata));
+        assertEquals(metadata, 
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupPartitionMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMember() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        final TasksTuple tasks =
+            new TasksTuple(
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+            );
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
 "m1", tasks));
+        assertEquals(tasks.activeTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+        assertEquals(tasks.standbyTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+        assertEquals(tasks.warmupTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {

Review Comment:
   Could you please add a test for when a group exists? 
   



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -830,6 +844,385 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
         verify(groupMetadataManager, times(1)).replay(key, null);
     }
 
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+

Review Comment:
   nit:  I would delete this line so that the call under test is more easily 
identifiable (also in the other tests). Just a proposal. Feel free to ignore if 
you want. 
   ```suggestion
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3962,7 +4359,167 @@ public void replay(
             }
             removeGroup(groupId);
         }
+    }
+
+    /**
+     * Replays StreamsGroupMemberMetadataKey/Value to update the hard state of
+     * the streams group.
+     * It updates the subscription part of the member or deletes the member.
+     *
+     * @param key   A StreamsGroupMemberMetadataKey key.
+     * @param value A StreamsGroupMemberMetadataValue record.
+     */
+    public void replay(
+        StreamsGroupMemberMetadataKey key,
+        StreamsGroupMemberMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+
+        StreamsGroup streamsGroup;
+        try {
+            streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, 
value != null);
+        } catch (GroupIdNotFoundException ex) {
+            // If the group does not exist and a tombstone is replayed, we can 
ignore it.
+            return;
+        }
+
+        if (value != null) {
+            StreamsGroupMember oldMember = 
streamsGroup.getOrMaybeCreateMember(memberId, true);
+            streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember)
+                .updateWith(value)
+                .build());

Review Comment:
   I find it confusing that the Streams group object creates a member without 
adding it to its members and the member needs to be added with 
`updateMember()`. I know that it is done this way for the other groups. 
Creating a member outside of the group should not be a concern of the group. 
   
   You do not need to change anything. Just a thought that I had for discussion.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
     }
 
+    @Test
+    public void testReplayStreamsGroupMemberMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setClientId("clientid")
+            .setClientHost("clienthost")
+            .setRackId("rackid")
+            .setInstanceId("instanceid")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(10)
+            .setProcessId("processid")
+            .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+            .setClientTags(Collections.singletonMap("key", "value"))
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMemberMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists but the member is already gone. Replaying the
+        // StreamsGroupMemberMetadata tombstone should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
StreamsGroupMemberMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("bar"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the StreamsGroupMetadata 
tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
metadata = Map.of(
+            "bar",
+            new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), 
"bar", 10)
+        );
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
 metadata));
+        assertEquals(metadata, 
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupPartitionMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMember() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        final TasksTuple tasks =
+            new TasksTuple(
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+            );
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
 "m1", tasks));
+        assertEquals(tasks.activeTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+        assertEquals(tasks.standbyTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+        assertEquals(tasks.warmupTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupTargetAssignmentMember tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupTargetAssignmentMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignment() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS)
+            .setAssignedTasks(new TasksTuple(
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 0, 1, 2)),
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 3, 4, 5)),
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 6, 7, 8))
+            ))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()

Review Comment:
   Could you please add a test for when a group exists?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -687,6 +717,75 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
         }
     }
 
+    /**
+     * Gets or maybe creates a streams group without updating the groups map.
+     * The group will be materialized during the replay.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should 
be
+     *                          created if it does not exist or is an empty 
classic group.
+     *
+     * @return A StreamsGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
+     *                                  if the group is not a streams group.
+     *
+     * Package private for testing.
+     */
+    StreamsGroup getOrMaybeCreateStreamsGroup(
+        String groupId,
+        boolean createIfNotExists
+    ) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId);
+
+        if (group == null && !createIfNotExists) {
+            throw new GroupIdNotFoundException(String.format("Streams group %s 
not found.", groupId));
+        }
+
+        if (group == null) {
+            return new StreamsGroup(logContext, snapshotRegistry, groupId, 
metrics);
+        } else {
+            if (group.type() == STREAMS) {
+                return (StreamsGroup) group;
+            } else {
+                throw new GroupIdNotFoundException(String.format("Group %s is 
not a streams group.", groupId));
+            }
+        }
+    }
+    
+    /**
+     * Gets a streams group by committed offset.
+     *
+     * @param groupId           The group id.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A StreamsGroup.
+     * @throws GroupIdNotFoundException if the group does not exist or is not 
a streams group.
+     */
+    public StreamsGroup streamsGroup(

Review Comment:
   Why is this method `public`? It is not used outside of this class.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
     }
 
+    @Test
+    public void testReplayStreamsGroupMemberMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setClientId("clientid")
+            .setClientHost("clienthost")
+            .setRackId("rackid")
+            .setInstanceId("instanceid")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(10)
+            .setProcessId("processid")
+            .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+            .setClientTags(Collections.singletonMap("key", "value"))
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMemberMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists but the member is already gone. Replaying the
+        // StreamsGroupMemberMetadata tombstone should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
StreamsGroupMemberMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("bar"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the StreamsGroupMetadata 
tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
metadata = Map.of(
+            "bar",
+            new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), 
"bar", 10)
+        );
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
 metadata));
+        assertEquals(metadata, 
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupPartitionMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMember() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        final TasksTuple tasks =
+            new TasksTuple(
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+            );
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
 "m1", tasks));
+        assertEquals(tasks.activeTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+        assertEquals(tasks.standbyTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+        assertEquals(tasks.warmupTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupTargetAssignmentMember tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() {

Review Comment:
   Could you please add a test for when a group exists?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to