lucasbru commented on code in PR #19219: URL: https://github.com/apache/kafka/pull/19219#discussion_r2020722085
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15294,221 +15314,2860 @@ public void testShareGroupStates() { } @Test - public void testConsumerGroupDynamicConfigs() { - String groupId = "fooup"; - // Use a static member id as it makes the test easier. + public void testStreamsHeartbeatRequestValidation() { String memberId = Uuid.randomUuid().toString(); - - Uuid fooTopicId = Uuid.randomUuid(); - String fooTopicName = "foo"; - - MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addRacks() - .build()) .build(); + Exception ex; - assignor.prepareGroupAssignment(new GroupAssignment( - Map.of(memberId, new MemberAssignmentImpl(mkAssignment( - mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) - ))) - )); + // MemberId must be present in all requests. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData())); + assertEquals("MemberId can't be empty.", ex.getMessage()); - // Session timer is scheduled on first heartbeat. - CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = - context.consumerGroupHeartbeat( - new ConsumerGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(90000) - .setSubscribedTopicNames(List.of("foo")) - .setTopicPartitions(List.of())); - assertEquals(1, result.response().memberEpoch()); + // MemberId can't be all whitespaces. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(" "))); + assertEquals("MemberId can't be empty.", ex.getMessage()); - // Verify heartbeat interval - assertEquals(5000, result.response().heartbeatIntervalMs()); + // GroupId must be present in all requests. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId))); + assertEquals("GroupId can't be empty.", ex.getMessage()); - // Verify that there is a session time. - context.assertSessionTimeout(groupId, memberId, 45000); + // GroupId can't be all whitespaces. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId) + .setGroupId(" "))); + assertEquals("GroupId can't be empty.", ex.getMessage()); - // Advance time. - assertEquals( - List.of(), - context.sleep(result.response().heartbeatIntervalMs()) - ); + // RebalanceTimeoutMs must be present in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId) + .setGroupId("foo") + .setMemberEpoch(0))); + assertEquals("RebalanceTimeoutMs must be provided in first request.", ex.getMessage()); - // Dynamic update group config - Properties newGroupConfig = new Properties(); - newGroupConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 50000); - newGroupConfig.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 10000); - context.updateGroupConfig(groupId, newGroupConfig); + // ActiveTasks must be present and empty in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId) + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500))); + assertEquals("ActiveTasks must be empty when (re-)joining.", ex.getMessage()); - // Session timer is rescheduled on second heartbeat. - result = context.consumerGroupHeartbeat( - new ConsumerGroupHeartbeatRequestData() - .setGroupId(groupId) + // StandbyTasks must be present and empty in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() .setMemberId(memberId) - .setMemberEpoch(result.response().memberEpoch())); - assertEquals(1, result.response().memberEpoch()); + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()))); + assertEquals("StandbyTasks must be empty when (re-)joining.", ex.getMessage()); - // Verify heartbeat interval - assertEquals(10000, result.response().heartbeatIntervalMs()); + // WarmupTasks must be present and empty in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId) + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()))); + assertEquals("WarmupTasks must be empty when (re-)joining.", ex.getMessage()); + + // Topology must be present in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId) + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()))); + assertEquals("Topology must be non-null when (re-)joining.", ex.getMessage()); - // Verify that there is a session time. - context.assertSessionTimeout(groupId, memberId, 50000); + // InstanceId must be non-empty if provided in all requests. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(memberId) + .setMemberEpoch(1) + .setInstanceId(""))); + assertEquals("InstanceId can't be empty.", ex.getMessage()); - // Advance time. - assertEquals( - List.of(), - context.sleep(result.response().heartbeatIntervalMs()) - ); + // RackId must be non-empty if provided in all requests. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(memberId) + .setMemberEpoch(1) + .setRackId(""))); + assertEquals("RackId can't be empty.", ex.getMessage()); - // Session timer is cancelled on leave. - result = context.consumerGroupHeartbeat( - new ConsumerGroupHeartbeatRequestData() - .setGroupId(groupId) + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId("foo") .setMemberId(memberId) - .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); - assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().memberEpoch()); + .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH) + .setRebalanceTimeoutMs(1500) + .setTopology(new StreamsGroupHeartbeatRequestData.Topology()) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()))); + assertEquals("InstanceId can't be null.", ex.getMessage()); - // Verify that there are no timers. - context.assertNoSessionTimeout(groupId, memberId); - context.assertNoRebalanceTimeout(groupId, memberId); + // Member epoch cannot be < -2 + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId) + .setGroupId("foo") + .setMemberEpoch(-3) + .setRebalanceTimeoutMs(1500) + )); + assertEquals("MemberEpoch is -3, but must be greater than or equal to -2.", ex.getMessage()); + + // Topology must not be present in the later requests (epoch != 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId) + .setGroupId("foo") + .setMemberEpoch(1) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setTopology(new StreamsGroupHeartbeatRequestData.Topology()) + )); + assertEquals("Topology can only be provided when (re-)joining.", ex.getMessage()); + + // Topology must not contain changelog topics with fixed partition numbers + StreamsInvalidTopologyException topoEx = assertThrows(StreamsInvalidTopologyException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setMemberId(memberId) + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setTopology(new StreamsGroupHeartbeatRequestData.Topology().setSubtopologies( + List.of( + new StreamsGroupHeartbeatRequestData.Subtopology() + .setStateChangelogTopics( + List.of( + new StreamsGroupHeartbeatRequestData.TopicInfo() + .setName("changelog_topic_with_fixed_partition") + .setPartitions(3) + ) + ) + ) + )) + )); + assertEquals("Changelog topic changelog_topic_with_fixed_partition must have an undefined partition count, but it is set to 3.", + topoEx.getMessage()); } @Test - public void testShareGroupDynamicConfigs() { + public void testUnknownStreamsGroupId() { String groupId = "fooup"; - // Use a static member id as it makes the test easier. String memberId = Uuid.randomUuid().toString(); - Uuid fooTopicId = Uuid.randomUuid(); - String fooTopicName = "foo"; - - MockPartitionAssignor assignor = new MockPartitionAssignor("simple"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withShareGroupAssignor(assignor) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addRacks() - .build()) .build(); - assignor.prepareGroupAssignment(new GroupAssignment( - Map.of(memberId, new MemberAssignmentImpl(mkAssignment( - mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) - ))) - )); - - // Session timer is scheduled on first heartbeat. - CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = - context.shareGroupHeartbeat( - new ShareGroupHeartbeatRequestData() + GroupIdNotFoundException e = assertThrows(GroupIdNotFoundException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId) - .setMemberEpoch(0) - .setSubscribedTopicNames(List.of("foo"))); - assertEquals(1, result.response().memberEpoch()); - - // Verify heartbeat interval - assertEquals(5000, result.response().heartbeatIntervalMs()); - - // Verify that there is a session time. - context.assertSessionTimeout(groupId, memberId, 45000); - - // Advance time. - assertEquals( - List.of(), - context.sleep(result.response().heartbeatIntervalMs()) - ); - - // Dynamic update group config - Properties newGroupConfig = new Properties(); - newGroupConfig.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 50000); - newGroupConfig.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 10000); - context.updateGroupConfig(groupId, newGroupConfig); - - // Session timer is rescheduled on second heartbeat. - result = context.shareGroupHeartbeat( - new ShareGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(result.response().memberEpoch())); - assertEquals(1, result.response().memberEpoch()); + .setMemberEpoch(100) // Epoch must be > 0. + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()))); + assertEquals("Streams group fooup not found.", e.getMessage()); + } - // Verify heartbeat interval - assertEquals(10000, result.response().heartbeatIntervalMs()); + @Test + public void testUnknownMemberIdJoinsStreamsGroup() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Topology topology = new Topology(); - // Verify that there is a session time. - context.assertSessionTimeout(groupId, memberId, 50000); + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .build(); - // Advance time. - assertEquals( - List.of(), - context.sleep(result.response().heartbeatIntervalMs()) - ); + assignor.prepareGroupAssignment(Map.of(memberId, TasksTuple.EMPTY)); - // Session timer is cancelled on leave. - result = context.shareGroupHeartbeat( - new ShareGroupHeartbeatRequestData() + // A first member joins to create the group. + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId) - .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); - assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().memberEpoch()); + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); - // Verify that there are no timers. - context.assertNoSessionTimeout(groupId, memberId); - context.assertNoRebalanceTimeout(groupId, memberId); + // The second member is rejected because the member id is unknown and + // the member epoch is not zero. + final String memberId2 = Uuid.randomUuid().toString(); + UnknownMemberIdException e = assertThrows(UnknownMemberIdException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()))); + assertEquals(String.format("Member %s is not a member of group %s.", memberId2, groupId), e.getMessage()); } @Test - public void testReplayConsumerGroupMemberMetadata() { + public void testStreamsGroupMemberEpochValidation() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) .build(); + assignor.prepareGroupAssignment(Map.of(memberId, TasksTuple.EMPTY)); - ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") - .setClientId("clientid") - .setClientHost("clienthost") - .setServerAssignorName("range") - .setRackId("rackid") - .setSubscribedTopicNames(List.of("foo")) + StreamsGroupMember member = streamsGroupMemberBuilderWithDefaults(memberId) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3))) .build(); - // The group and the member are created if they do not exist. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord("foo", member)); - assertEquals(member, context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("member", false)); - } + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member)); - @Test - public void testReplayConsumerGroupMemberMetadataTombstone() { - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .build(); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100)); - // The group still exists but the member is already gone. Replaying the - // ConsumerGroupMemberMetadata tombstone should be a no-op. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10)); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo", "m1")); - assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology)); - // The group may not exist at all. Replaying the ConsumerGroupMemberMetadata tombstone - // should a no-op. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("bar", "m1")); - assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); - } + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3) + ))); - @Test - public void testReplayConsumerGroupMetadata() { - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .build(); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 100)); - // The group is created if it does not exist. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10)); - assertEquals(10, context.groupMetadataManager.consumerGroup("foo").groupEpoch()); - } + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, member)); + + // Member epoch is greater than the expected epoch. + FencedMemberEpochException e1 = assertThrows(FencedMemberEpochException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(200) + .setRebalanceTimeoutMs(1500))); + assertEquals("The streams group member has a greater member epoch (200) than the one known by the group coordinator (100). " + + "The member must abandon all its partitions and rejoin.", e1.getMessage()); + + // Member epoch is smaller than the expected epoch. + FencedMemberEpochException e2 = assertThrows(FencedMemberEpochException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(50) + .setRebalanceTimeoutMs(1500))); + assertEquals("The streams group member has a smaller member epoch (50) than the one known by the group coordinator (100). " + + "The member must abandon all its partitions and rejoin.", e2.getMessage()); + + // Member joins with previous epoch but without providing tasks. + FencedMemberEpochException e3 = assertThrows(FencedMemberEpochException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(99) + .setRebalanceTimeoutMs(1500))); + assertEquals("The streams group member has a smaller member epoch (99) than the one known by the group coordinator (100). " + + "The member must abandon all its partitions and rejoin.", e3.getMessage()); + + // Member joins with previous epoch and has a subset of the owned tasks. + // This is accepted as the response with the bumped epoch may have been lost. + // In this case, we provide back the correct epoch to the member. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(99) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(1, 2)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + assertEquals(100, result.response().data().memberEpoch()); + } + + @Test + public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + // Create a context with one streams group containing two members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 2) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .build()) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .build()) + .withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .withTargetAssignment(memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .withTargetAssignmentEpoch(10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withPartitionMetadata(Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )) + ) + .build(); + + assertThrows(GroupMaxSizeReachedException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(0) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + )); + } + + @Test + public void testMemberJoinsEmptyStreamsGroup() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .build(); + + assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) + ))); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.streamsGroup(groupId)); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)), + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(0, 1, 2)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2))) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .build(); + + // Member joins the streams group. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertEquals( + Map.of(), + result.response().creatableTopics() + ); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.MISSING_SOURCE_TOPICS.code()) + .setStatusDetail("Source topics bar are missing."))), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, + Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6) + ) + ), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName)) + .setStateChangelogTopics(List.of(new TopicInfo().setName(barTopicName))) + ) + ); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .build(); + + // Member joins the streams group. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertEquals( + Map.of(barTopicName, + new CreatableTopic() + .setName(barTopicName) + .setNumPartitions(6) + .setReplicationFactor((short) -1) + ), + result.response().creatableTopics() + ); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code()) + .setStatusDetail("Internal topics are missing: [bar]"))), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName, barTopicName)) + .setCopartitionGroups(List.of(new CopartitionGroup().setSourceTopics(List.of((short) 0, (short) 1)))) + ) + ); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .build(); + + // Member joins the streams group. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertEquals( + Map.of(), + result.response().creatableTopics() + ); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code()) + .setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]"))), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsGroupMemberJoiningWithStaleTopology() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology0 = new Topology().setEpoch(0).setSubtopologies(List.of( + new Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName)) + ) + ); + Topology topology1 = new Topology().setEpoch(1).setSubtopologies(List.of( + new Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName, barTopicName)) + ) + ); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withStreamsGroup( + new StreamsGroupBuilder(groupId, 10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology1)) + ) + .build(); + + assignor.prepareGroupAssignment(new org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment(Map.of( + memberId, org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment.empty() + ))); + + // Member joins the streams group. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology0) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertEquals( + Map.of(), + result.response().creatableTopics() + ); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.STALE_TOPOLOGY.code()) + .setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1."))), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .build()) + .withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .withTargetAssignmentEpoch(10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withPartitionMetadata(Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )) + ) + .build(); + + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) + )) + ); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10) + .setProcessId("process-id2") + ); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)), + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(0, 1, 2)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2))) + .setProcessId("process-id2") + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsUpdatingPartitionMetadataTriggersNewTargetAssignment() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .build()) + .withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .withTargetAssignmentEpoch(10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + ) + .build(); + + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) + )) + ); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10) + ); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)), + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(0, 1, 2)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2))) + .setProcessId("process-id2") + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsNewJoiningMemberTriggersNewTargetAssignment() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .build()) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .build()) + .withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .withTargetAssignment(memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .withTargetAssignmentEpoch(10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withPartitionMetadata(Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )) + ) + .build(); + + assignor.prepareGroupAssignment(Map.of( + memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0) + ), + memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 2, 3), + TaskAssignmentTestUtil.mkTasks(subtopology2, 1) + ), + memberId3, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2) + ) + )); + + // Member 3 joins the streams group. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + } + + @Test + public void testStreamsLeavingMemberBumpsGroupEpoch() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .build()) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .build()) + .withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .withTargetAssignment(memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .withTargetAssignmentEpoch(10)) + .build(); + + // Member 2 leaves the streams group. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH), + result.response().data() + ); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId2), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId2), + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId2), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .build()) + .build(); + + // Prepare new assignment for the group. + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)))); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result; + + // A full response should be sent back on joining. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + // Otherwise, a partial response should be sent back. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(result.response().data().memberEpoch())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000), + result.response().data() + ); + } + + @Test + public void testStreamsReconciliationProcess() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .build()) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .build()) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .withTargetAssignment(memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .withTargetAssignmentEpoch(10)) + .build(); + + // Prepare new assignment for the group. + assignor.prepareGroupAssignment(Map.of( + memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0) + ), + memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 2, 3), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2) + ), + memberId3, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 1) + ) + )); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result; + + // Members in the group are in Stable state. + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.STABLE, context.streamsGroupMemberState(groupId, memberId1)); + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.STABLE, context.streamsGroupMemberState(groupId, memberId2)); + assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, context.streamsGroupState(groupId)); + + // Member 3 joins the group. This triggers the computation of a new target assignment + // for the group. Member 3 does not get any assigned tasks yet because they are + // all owned by other members. However, it transitions to epoch 11 and the + // Unreleased Tasks state. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + // We only check the last record as the subscription/target assignment updates are + // already covered by other tests. + assertRecordEquals( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId3) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .build()), + result.records().get(result.records().size() - 1) + ); + + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS, + context.streamsGroupMemberState(groupId, memberId3)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 1 heartbeats. It remains at epoch 10 but transitions to Unrevoked Tasks + // state until it acknowledges the revocation of its tasks. The response contains the new + // assignment without the tasks that must be revoked. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)), + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(0)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertRecordsEquals(List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0))) + .setTasksPendingRevocation(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 1))) + .build())), + result.records() + ); + + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS, + context.streamsGroupMemberState(groupId, memberId1)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 2 heartbeats. It remains at epoch 10 but transitions to Unrevoked Tasks + // state until it acknowledges the revocation of its tasks. The response contains the new + // assignment without the tasks that must be revoked. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(3)), + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(2)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertRecordsEquals(List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId2) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .setTasksPendingRevocation(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 4, 5))) + .build())), + result.records() + ); + + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS, + context.streamsGroupMemberState(groupId, memberId2)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 3 heartbeats. The response does not contain any assignment + // because the member is still waiting on other members to revoke tasks. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(11)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000), + result.response().data() + ); + + assertRecordsEquals(List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId3) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .build())), + result.records() + ); + + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS, + context.streamsGroupMemberState(groupId, memberId3)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 1 acknowledges the revocation of the tasks. It does so by providing the + // tasks that it still owns in the request. This allows him to transition to epoch 11 + // and to the Stable state. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)), + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(0)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000), + result.response().data() + ); + + assertRecordsEquals(List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0))) + .build())), + result.records() + ); + + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.STABLE, context.streamsGroupMemberState(groupId, memberId1)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 2 heartbeats but without acknowledging the revocation yet. This is basically a no-op. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000), + result.response().data() + ); + + assertEquals(List.of(), result.records()); + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS, + context.streamsGroupMemberState(groupId, memberId2)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 3 heartbeats. It receives the tasks revoked by member 1 but remains + // in Unreleased tasks state because it still waits on other tasks. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(11)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertRecordsEquals(List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId3) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology2, 1))) + .build())), + result.records() + ); + + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS, + context.streamsGroupMemberState(groupId, memberId3)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 3 heartbeats. Member 2 has not acknowledged the revocation of its tasks so + // member keeps its current assignment. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(11)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000), + result.response().data() + ); + + assertEquals(List.of(), result.records()); + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS, + context.streamsGroupMemberState(groupId, memberId3)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 2 acknowledges the revocation of the tasks. It does so by providing the + // tasks that it still owns in the request. This allows him to transition to epoch 11 + // and to the Stable state. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(3)), + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(2)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + ); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(2, 3)), + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(2)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertRecordsEquals(List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId2) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 2, 3), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .build())), + result.records() + ); + + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.STABLE, context.streamsGroupMemberState(groupId, memberId2)); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + // Member 3 heartbeats to acknowledge its current assignment. It receives all its tasks and + // transitions to Stable state. + result = context.streamsGroupHeartbeat(new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(11) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(4, 5)), + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology2) + .setPartitions(List.of(1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertRecordsEquals(List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId3) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 1))) + .build())), + result.records() + ); + + assertEquals(org.apache.kafka.coordinator.group.streams.MemberState.STABLE, context.streamsGroupMemberState(groupId, memberId3)); + assertEquals(StreamsGroup.StreamsGroupState.STABLE, context.streamsGroupState(groupId)); + } + + @Test + public void testStreamsStreamsGroupStates() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)) + .build(); + + assertEquals(StreamsGroup.StreamsGroupState.EMPTY, context.streamsGroupState(groupId)); + + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1) + .build())); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11)); + + assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId)); + + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, + Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + ) + )); + + assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, context.streamsGroupState(groupId)); + + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId1, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3)))); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11)); + + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + context.replay( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks( + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3))) + .build())); + + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState(groupId)); + + context.replay( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks( + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3))) + .build())); + + assertEquals(StreamsGroup.StreamsGroupState.STABLE, context.streamsGroupState(groupId)); + } + + @Test + public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + TaskAssignor assignor = mock(TaskAssignor.class); + when(assignor.name()).thenReturn("sticky"); + when(assignor.assign(any(), any())).thenThrow(new TaskAssignorException("Assignment failed.")); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .build(); + + // Member 1 joins the streams group. The request fails because the + // target assignment computation failed. + assertThrows(UnknownServerException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()))); + } + + @Test + public void testStreamsPartitionMetadataRefreshedAfterGroupIsLoaded() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .build()) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .withTargetAssignmentEpoch(10) + .withPartitionMetadata( + // foo only has 3 tasks stored in the metadata but foo has + // 6 partitions the metadata image. + Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 3)) + )) + .build(); + + // The metadata refresh flag should be true. + StreamsGroup streamsGroup = context.groupMetadataManager + .streamsGroup(groupId); + assertTrue(streamsGroup.hasMetadataExpired(context.time.milliseconds())); + + // Prepare the assignment result. + assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + ))); + + // Heartbeat. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + // The member gets tasks 3, 4 and 5 assigned. + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, + Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)) + ), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // Check next refresh time. + assertFalse(streamsGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + Integer.MAX_VALUE, streamsGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, streamsGroup.metadataRefreshDeadline().epoch); + } + + @Test + public void testStreamsPartitionMetadataRefreshedAgainAfterWriteFailure() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .build()) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .withTargetAssignmentEpoch(10) + .withPartitionMetadata( + // foo only has 3 partitions stored in the metadata but foo has + // 6 partitions the metadata image. + Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 3)) + )) + .build(); + + // The metadata refresh flag should be true. + StreamsGroup streamsGroup = context.groupMetadataManager + .streamsGroup(groupId); + assertTrue(streamsGroup.hasMetadataExpired(context.time.milliseconds())); + + // Prepare the assignment result. + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + )) + ); + + // Heartbeat. + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + // The metadata refresh flag is set to a future time. + assertFalse(streamsGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + Integer.MAX_VALUE, streamsGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, streamsGroup.metadataRefreshDeadline().epoch); + + // Rollback the uncommitted changes. This does not rollback the metadata flag + // because it is not using a timeline data structure. + context.rollback(); + + // However, the next heartbeat should detect the divergence based on the epoch and trigger + // a metadata refr + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + // The member gets tasks 3, 4 and 5 assigned. + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .setTasksPendingRevocation(TasksTuple.EMPTY) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, + Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)) + ), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // Check next refresh time. + assertFalse(streamsGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + Integer.MAX_VALUE, streamsGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, streamsGroup.metadataRefreshDeadline().epoch); + } + + @Test + public void testStreamsSessionTimeoutLifecycle() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .build(); + + assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + ))); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + assertEquals(1, result.response().data().memberEpoch()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 45000); + + // Advance time. + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Session timer is rescheduled on second heartbeat. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(result.response().data().memberEpoch())); + assertEquals(1, result.response().data().memberEpoch()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 45000); + + // Advance time. + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Session timer is cancelled on leave. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); + assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().data().memberEpoch()); + + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId); + context.assertNoRebalanceTimeout(groupId, memberId); + } + + @Test + public void testStreamsSessionTimeoutExpiration() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .build(); + + assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + ))); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + assertEquals(1, result.response().data().memberEpoch()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 45000); + + // Advance time past the session timeout. + List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(45000 + 1); + + // Verify the expired timeout. + assertEquals( + List.of(new ExpiredTimeout<Void, CoordinatorRecord>( + groupSessionTimeoutKey(groupId, memberId), + new CoordinatorResult<>( + List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId), + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2) + ) + ) + )), + timeouts + ); + + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId); + context.assertNoRebalanceTimeout(groupId, memberId); + } + + @Test + public void testStreamsRebalanceTimeoutLifecycle() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .build()) + .build(); + + assignor.prepareGroupAssignment(Map.of(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2) + ))); + + // Member 1 joins the group. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(12000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Prepare next assignment. + assignor.prepareGroupAssignment(Map.of( + memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1) + ), + memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 2) + ) + )); + + // Member 2 joins the group. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Member 1 heartbeats and transitions to unrevoked tasks. The rebalance timeout + // is scheduled. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(12000)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + // Verify that there is a revocation timeout. Keep a reference + // to the timeout for later. + ScheduledTimeout<Void, CoordinatorRecord> scheduledTimeout = + context.assertRebalanceTimeout(groupId, memberId1, 12000); + + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Member 1 acks the revocation. The revocation timeout is cancelled. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1) + .setActiveTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000), + result.response().data() + ); + + // Verify that there is not revocation timeout. + context.assertNoRebalanceTimeout(groupId, memberId1); + + // Execute the scheduled revocation timeout captured earlier to simulate a + // stale timeout. This should be a no-op. + assertEquals(List.of(), scheduledTimeout.operation.generateRecords().records()); + } + + @Test + public void testStreamsRebalanceTimeoutExpiration() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .build()) + .build(); + + assignor.prepareGroupAssignment( + Map.of(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))); + + // Member 1 joins the group. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(10000) // Use timeout smaller than session timeout. + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Prepare next assignment. + assignor.prepareGroupAssignment(Map.of( + memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1) + ), + memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 2) + ) + )); + + // Member 2 joins the group. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(10000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Member 1 heartbeats and transitions to revoking. The revocation timeout + // is scheduled. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + // Advance time past the revocation timeout. + List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(10000 + 1); + + // Verify the expired timeout. + assertEquals( + List.of(new ExpiredTimeout<Void, CoordinatorRecord>( + groupRebalanceTimeoutKey(groupId, memberId1), + new CoordinatorResult<>( + List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1), + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3) + ) + ) + )), + timeouts + ); + + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId1); + context.assertNoRebalanceTimeout(groupId, memberId1); + } + + @Test + public void testStreamsOnNewMetadataImage() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); + String subtopology1 = "subtopology1"; Review Comment: Yes, you are right. I gave the subtopologies different names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org