cadonna commented on code in PR #18809: URL: https://github.com/apache/kafka/pull/18809#discussion_r1952406373
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3138,6 +3274,45 @@ private void replaceMember( )); } + /** + * Fences a member from a streams group and maybe downgrade the streams group to a classic group. + * + * @param group The group. + * @param member The member. + * @param response The response of the CoordinatorResult. + * + * @return The CoordinatorResult to be applied. + */ + private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMember( + StreamsGroup group, + StreamsGroupMember member, + T response + ) { + List<CoordinatorRecord> records = new ArrayList<>(); + removeStreamsMember(records, group.groupId(), member.memberId()); + + // We bump the group epoch. + int groupEpoch = group.groupEpoch() + 1; + records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch)); + + cancelTimers(group.groupId(), member.memberId()); + + return new CoordinatorResult<>(records, response); + } + + /** + * Write tombstones for the member. The order matters here. + * + * @param records The list of records to append the member assignment tombstone records. + * @param groupId The group id. + * @param memberId The member id. + */ + private void removeStreamsMember(List<CoordinatorRecord> records, String groupId, String memberId) { Review Comment: nit: Why not return a list of records? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3329,6 +3550,27 @@ private void scheduleShareGroupSessionTimeout( ); } + /** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param sessionTimeoutMs The session timeout. + */ + private void scheduleStreamsGroupSessionTimeout( Review Comment: This is only used in `scheduleStreamsGroupSessionTimeout(groupId, memberId)`. Will this be used somewhere else in future? Why not calling `timer.schedule()` directly in `scheduleStreamsGroupSessionTimeout(groupId, memberId)`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -830,6 +844,385 @@ public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { verify(groupMetadataManager, times(1)).replay(key, null); } + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + StreamsGroupMetadataKey key = new StreamsGroupMetadataKey(); + StreamsGroupMetadataValue value = new StreamsGroupMetadataValue(); + + coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record( + key, + new ApiMessageAndVersion(value, (short) 0) + )); + + verify(groupMetadataManager, times(1)).replay(key, value); Review Comment: You could drop `times(1)` since it is the default. I am fine either way. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3385,6 +3627,49 @@ private void scheduleConsumerGroupRebalanceTimeout( }); } + /** + * Schedules a rebalance timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param memberEpoch The member epoch. + * @param rebalanceTimeoutMs The rebalance timeout. + */ + private void scheduleStreamsGroupRebalanceTimeout( + String groupId, + String memberId, + int memberEpoch, + int rebalanceTimeoutMs + ) { + String key = streamsGroupRebalanceTimeoutKey(groupId, memberId); + timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { + try { + StreamsGroup group = streamsGroup(groupId); + StreamsGroupMember member = group.getOrMaybeCreateMember(memberId, false); + + if (member.memberEpoch() == memberEpoch) { + log.info("[GroupId {}] Member {} fenced from the group because " + + "it failed to transition from epoch {} within {}ms.", + groupId, memberId, memberEpoch, rebalanceTimeoutMs); + + return streamsGroupFenceMember(group, member, null); + } else { + log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member " + + "left the epoch {}.", groupId, memberId, memberEpoch); Review Comment: What does "leave the epoch" mean? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15135,6 +15284,229 @@ public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() { assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); } + @Test + public void testReplayStreamsGroupMemberMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setClientId("clientid") + .setClientHost("clienthost") + .setRackId("rackid") + .setInstanceId("instanceid") + .setRebalanceTimeoutMs(1000) + .setTopologyEpoch(10) + .setProcessId("processid") + .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999)) + .setClientTags(Collections.singletonMap("key", "value")) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupMemberMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // StreamsGroupMemberMetadata tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupMemberMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); + } + + @Test + public void testReplayStreamsGroupMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupPartitionMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of( + "bar", + new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10) + ); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata)); + assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata()); + } + + @Test + public void testReplayStreamsGroupPartitionMetadataTombstone() { Review Comment: Could you please also add a test that removes an existing group? What about all the `IllegalStateException` that are thrown in the method under test? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3138,6 +3274,45 @@ private void replaceMember( )); } + /** + * Fences a member from a streams group and maybe downgrade the streams group to a classic group. Review Comment: Is the classic part true for Streams? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15135,6 +15284,229 @@ public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() { assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); } + @Test + public void testReplayStreamsGroupMemberMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setClientId("clientid") + .setClientHost("clienthost") + .setRackId("rackid") + .setInstanceId("instanceid") + .setRebalanceTimeoutMs(1000) + .setTopologyEpoch(10) + .setProcessId("processid") + .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999)) + .setClientTags(Collections.singletonMap("key", "value")) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupMemberMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // StreamsGroupMemberMetadata tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupMemberMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); + } + + @Test + public void testReplayStreamsGroupMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupPartitionMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of( + "bar", + new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10) + ); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata)); + assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata()); + } + + @Test + public void testReplayStreamsGroupPartitionMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupPartitionMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMember() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + final TasksTuple tasks = + new TasksTuple( + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8)) + ); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo", "m1", tasks)); + assertEquals(tasks.activeTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks()); + assertEquals(tasks.standbyTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks()); + assertEquals(tasks.warmupTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMemberTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMember tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").assignmentEpoch()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupCurrentMemberAssignment() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS) + .setAssignedTasks(new TasksTuple( + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)), + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)), + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8)) + )) + .setTasksPendingRevocation(TasksTuple.EMPTY) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists, but the member is already gone. Replaying the + // StreamsGroupCurrentMemberAssignment tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupCurrentMemberAssignment tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupTopology() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue() + .setEpoch(12) + .setSubtopologies( + List.of( + new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("subtopology-1") + .setSourceTopics(List.of("source-topic")) + .setRepartitionSinkTopics(List.of("sink-topic")) + ) + ); + + // The group and the topology are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("bar", topology)); + final Optional<StreamsTopology> actualTopology = context.groupMetadataManager.streamsGroup("bar").topology(); + assertTrue(actualTopology.isPresent(), "topology should be set"); + assertEquals(topology.epoch(), actualTopology.get().topologyEpoch()); + assertEquals(topology.subtopologies().size(), actualTopology.get().subtopologies().size()); + assertEquals( + topology.subtopologies().iterator().next(), + actualTopology.get().subtopologies().values().iterator().next() + ); + } + + @Test + public void testReplayStreamsGroupTopologyTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() Review Comment: Could you please add a test for when a group exists? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -760,6 +859,43 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup( } } + /** + * The method should be called on the replay path. + * Gets or maybe creates a streams group and updates the groups map if a new group is created. + * + * @param groupId The group id. + * @param createIfNotExists A boolean indicating whether the group should be + * created if it does not exist. + * + * @return A StreamsGroup. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or + * if the group is not a streams group. + * @throws IllegalStateException if the group does not have the expected type. + * Package private for testing. + */ + StreamsGroup getOrMaybeCreatePersistedStreamsGroup( Review Comment: why is this method package-private? It is not used outside of this class. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15135,6 +15284,229 @@ public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() { assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); } + @Test + public void testReplayStreamsGroupMemberMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setClientId("clientid") + .setClientHost("clienthost") + .setRackId("rackid") + .setInstanceId("instanceid") + .setRebalanceTimeoutMs(1000) + .setTopologyEpoch(10) + .setProcessId("processid") + .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999)) + .setClientTags(Collections.singletonMap("key", "value")) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupMemberMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // StreamsGroupMemberMetadata tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupMemberMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); + } + + @Test + public void testReplayStreamsGroupMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupPartitionMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of( + "bar", + new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10) + ); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata)); + assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata()); + } + + @Test + public void testReplayStreamsGroupPartitionMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupPartitionMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMember() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + final TasksTuple tasks = + new TasksTuple( + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8)) + ); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo", "m1", tasks)); + assertEquals(tasks.activeTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks()); + assertEquals(tasks.standbyTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks()); + assertEquals(tasks.warmupTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMemberTombstone() { Review Comment: Could you please add a test for when a group exists? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -830,6 +844,385 @@ public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { verify(groupMetadataManager, times(1)).replay(key, null); } + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + Review Comment: nit: I would delete this line so that the call under test is more easily identifiable (also in the other tests). Just a proposal. Feel free to ignore if you want. ```suggestion ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3962,7 +4359,167 @@ public void replay( } removeGroup(groupId); } + } + + /** + * Replays StreamsGroupMemberMetadataKey/Value to update the hard state of + * the streams group. + * It updates the subscription part of the member or deletes the member. + * + * @param key A StreamsGroupMemberMetadataKey key. + * @param value A StreamsGroupMemberMetadataValue record. + */ + public void replay( + StreamsGroupMemberMetadataKey key, + StreamsGroupMemberMetadataValue value + ) { + String groupId = key.groupId(); + String memberId = key.memberId(); + + StreamsGroup streamsGroup; + try { + streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist and a tombstone is replayed, we can ignore it. + return; + } + + if (value != null) { + StreamsGroupMember oldMember = streamsGroup.getOrMaybeCreateMember(memberId, true); + streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember) + .updateWith(value) + .build()); Review Comment: I find it confusing that the Streams group object creates a member without adding it to its members and the member needs to be added with `updateMember()`. I know that it is done this way for the other groups. Creating a member outside of the group should not be a concern of the group. You do not need to change anything. Just a thought that I had for discussion. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15135,6 +15284,229 @@ public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() { assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); } + @Test + public void testReplayStreamsGroupMemberMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setClientId("clientid") + .setClientHost("clienthost") + .setRackId("rackid") + .setInstanceId("instanceid") + .setRebalanceTimeoutMs(1000) + .setTopologyEpoch(10) + .setProcessId("processid") + .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999)) + .setClientTags(Collections.singletonMap("key", "value")) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupMemberMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // StreamsGroupMemberMetadata tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupMemberMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); + } + + @Test + public void testReplayStreamsGroupMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupPartitionMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of( + "bar", + new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10) + ); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata)); + assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata()); + } + + @Test + public void testReplayStreamsGroupPartitionMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupPartitionMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMember() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + final TasksTuple tasks = + new TasksTuple( + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8)) + ); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo", "m1", tasks)); + assertEquals(tasks.activeTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks()); + assertEquals(tasks.standbyTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks()); + assertEquals(tasks.warmupTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMemberTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMember tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").assignmentEpoch()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupCurrentMemberAssignment() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS) + .setAssignedTasks(new TasksTuple( + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)), + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)), + TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8)) + )) + .setTasksPendingRevocation(TasksTuple.EMPTY) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() Review Comment: Could you please add a test for when a group exists? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -687,6 +717,75 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( } } + /** + * Gets or maybe creates a streams group without updating the groups map. + * The group will be materialized during the replay. + * + * @param groupId The group id. + * @param createIfNotExists A boolean indicating whether the group should be + * created if it does not exist or is an empty classic group. + * + * @return A StreamsGroup. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or + * if the group is not a streams group. + * + * Package private for testing. + */ + StreamsGroup getOrMaybeCreateStreamsGroup( + String groupId, + boolean createIfNotExists + ) throws GroupIdNotFoundException { + Group group = groups.get(groupId); + + if (group == null && !createIfNotExists) { + throw new GroupIdNotFoundException(String.format("Streams group %s not found.", groupId)); + } + + if (group == null) { + return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); + } else { + if (group.type() == STREAMS) { + return (StreamsGroup) group; + } else { + throw new GroupIdNotFoundException(String.format("Group %s is not a streams group.", groupId)); + } + } + } + + /** + * Gets a streams group by committed offset. + * + * @param groupId The group id. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A StreamsGroup. + * @throws GroupIdNotFoundException if the group does not exist or is not a streams group. + */ + public StreamsGroup streamsGroup( Review Comment: Why is this method `public`? It is not used outside of this class. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15135,6 +15284,229 @@ public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() { assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); } + @Test + public void testReplayStreamsGroupMemberMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setClientId("clientid") + .setClientHost("clienthost") + .setRackId("rackid") + .setInstanceId("instanceid") + .setRebalanceTimeoutMs(1000) + .setTopologyEpoch(10) + .setProcessId("processid") + .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999)) + .setClientTags(Collections.singletonMap("key", "value")) + .build(); + + // The group and the member are created if they do not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo", member)); + assertEquals(member, context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayStreamsGroupMemberMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // StreamsGroupMemberMetadata tombstone should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the StreamsGroupMemberMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar")); + } + + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); + } + + @Test + public void testReplayStreamsGroupMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupPartitionMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of( + "bar", + new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10) + ); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata)); + assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata()); + } + + @Test + public void testReplayStreamsGroupPartitionMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupPartitionMetadata tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMember() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + final TasksTuple tasks = + new TasksTuple( + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)), + TaskAssignmentTestUtil.mkTasksPerSubtopology( + TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8)) + ); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo", "m1", tasks)); + assertEquals(tasks.activeTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks()); + assertEquals(tasks.standbyTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks()); + assertEquals(tasks.warmupTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMemberTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMember tombstone + // should be a no-op. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo")); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.streamsGroup("foo").assignmentEpoch()); + } + + @Test + public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() { Review Comment: Could you please add a test for when a group exists? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org