lucasbru commented on code in PR #18809: URL: https://github.com/apache/kafka/pull/18809#discussion_r1952918550
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15135,6 +15284,229 @@ public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() { assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); } + @Test + public void testReplayStreamsGroupMemberMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setClientId("clientid") + .setClientHost("clienthost") + .setRackId("rackid") + .setInstanceId("instanceid") + .setRebalanceTimeoutMs(1000) + .setTopologyEpoch(10) + .setProcessId("processid") + .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999)) + .setClientTags(Collections.singletonMap("key", "value")) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupMemberMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // StreamsGroupMemberMetadata tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupMemberMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); + } + + @Test + public void testReplayStreamsGroupMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupPartitionMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of( + "bar", + new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10) + ); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata)); + assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata()); + } + + @Test + public void testReplayStreamsGroupPartitionMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupPartitionMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMember() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + final TasksTuple tasks = + new TasksTuple( + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8)) + ); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo", "m1", tasks)); + assertEquals(tasks.activeTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks()); + assertEquals(tasks.standbyTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks()); + assertEquals(tasks.warmupTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMemberTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMember tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").assignmentEpoch()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupCurrentMemberAssignment() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS) + .setAssignedTasks(new TasksTuple( + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)), + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)), + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8)) + )) + .setTasksPendingRevocation(TasksTuple.EMPTY) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists, but the member is already gone. Replaying the + // StreamsGroupCurrentMemberAssignment tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupCurrentMemberAssignment tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupTopology() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue() + .setEpoch(12) + .setSubtopologies( + List.of( + new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("subtopology-1") + .setSourceTopics(List.of("source-topic")) + .setRepartitionSinkTopics(List.of("sink-topic")) + ) + ); + + // The group and the topology are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("bar", topology)); + final Optional<StreamsTopology> actualTopology = context.groupMetadataManager.streamsGroup("bar").topology(); + assertTrue(actualTopology.isPresent(), "topology should be set"); + assertEquals(topology.epoch(), actualTopology.get().topologyEpoch()); + assertEquals(topology.subtopologies().size(), actualTopology.get().subtopologies().size()); + assertEquals( + topology.subtopologies().iterator().next(), + actualTopology.get().subtopologies().values().iterator().next() + ); + } + + @Test + public void testReplayStreamsGroupTopologyTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() Review Comment: Dome -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org