lucasbru commented on code in PR #18809:
URL: https://github.com/apache/kafka/pull/18809#discussion_r1952918550


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
     }
 
+    @Test
+    public void testReplayStreamsGroupMemberMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setClientId("clientid")
+            .setClientHost("clienthost")
+            .setRackId("rackid")
+            .setInstanceId("instanceid")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(10)
+            .setProcessId("processid")
+            .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+            .setClientTags(Collections.singletonMap("key", "value"))
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMemberMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists but the member is already gone. Replaying the
+        // StreamsGroupMemberMetadata tombstone should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
StreamsGroupMemberMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("bar"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the StreamsGroupMetadata 
tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
metadata = Map.of(
+            "bar",
+            new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), 
"bar", 10)
+        );
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
 metadata));
+        assertEquals(metadata, 
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+    }
+
+    @Test
+    public void testReplayStreamsGroupPartitionMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupPartitionMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMember() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        final TasksTuple tasks =
+            new TasksTuple(
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+                TaskAssignmentTestUtil.mkTasksPerSubtopology(
+                    TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+            );
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
 "m1", tasks));
+        assertEquals(tasks.activeTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+        assertEquals(tasks.standbyTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+        assertEquals(tasks.warmupTasks(), 
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupTargetAssignmentMember tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+    }
+
+    @Test
+    public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
StreamsGroupTargetAssignmentMetadata tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignment() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS)
+            .setAssignedTasks(new TasksTuple(
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 0, 1, 2)),
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 3, 4, 5)),
+                
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
 6, 7, 8))
+            ))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists, but the member is already gone. Replaying 
the
+        // StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
StreamsGroupCurrentMemberAssignment tombstone
+        // should be a no-op.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("bar"));
+    }
+
+    @Test
+    public void testReplayStreamsGroupTopology() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
+            .setEpoch(12)
+            .setSubtopologies(
+                List.of(
+                    new StreamsGroupTopologyValue.Subtopology()
+                        .setSubtopologyId("subtopology-1")
+                        .setSourceTopics(List.of("source-topic"))
+                        .setRepartitionSinkTopics(List.of("sink-topic"))
+                )
+            );
+
+        // The group and the topology are created if they do not exist.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("bar",
 topology));
+        final Optional<StreamsTopology> actualTopology = 
context.groupMetadataManager.streamsGroup("bar").topology();
+        assertTrue(actualTopology.isPresent(), "topology should be set");
+        assertEquals(topology.epoch(), actualTopology.get().topologyEpoch());
+        assertEquals(topology.subtopologies().size(), 
actualTopology.get().subtopologies().size());
+        assertEquals(
+            topology.subtopologies().iterator().next(),
+            actualTopology.get().subtopologies().values().iterator().next()
+        );
+    }
+
+    @Test
+    public void testReplayStreamsGroupTopologyTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()

Review Comment:
   Dome



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to