dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1587740243
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10823,1158 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testJoiningConsumerGroupWithNewDynamicMember(boolean replaySuccessfully) throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList(), + version)) + .build(); + + // The first round of join request gets the new member id. + GroupMetadataManagerTestContext.JoinResult firstJoinResult = context.sendClassicGroupJoin( + request, + true + ); + assertTrue(firstJoinResult.records.isEmpty()); + // Simulate a successful write to the log. + firstJoinResult.appendFuture.complete(null); + + assertTrue(firstJoinResult.joinFuture.isDone()); + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), firstJoinResult.joinFuture.get().errorCode()); + String newMemberId = firstJoinResult.joinFuture.get().memberId(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + if (replaySuccessfully) { + secondJoinResult.appendFuture.complete(null); + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } else { + secondJoinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException()); + assertEquals( + new JoinGroupResponseData() + .setErrorCode(Errors.NOT_COORDINATOR.code()), + secondJoinResult.joinFuture.get() + ); + context.assertNoSessionTimeout(groupId, newMemberId); + context.assertNoSyncTimeout(groupId, newMemberId); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testJoiningConsumerGroupWithNewStaticMember(boolean replaySuccessfully) throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id-" + version; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList(), + version)) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + if (replaySuccessfully) { + joinResult.appendFuture.complete(null); + String newMemberId = joinResult.joinFuture.get().memberId(); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), joinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), joinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), joinResult.records.subList(5, 7)); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } else { + joinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException()); + assertEquals( + new JoinGroupResponseData() + .setErrorCode(Errors.NOT_COORDINATOR.code()), + joinResult.joinFuture.get() + ); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testJoiningConsumerGroupReplacingExistingStaticMember(boolean replaySuccessfully) throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id-" + version; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList(), + version)) + .build(); + + // The static member joins with UNKNOWN_MEMBER_ID. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request, + true + ); + + if (replaySuccessfully) { + joinResult.appendFuture.complete(null); + + String newMemberId = joinResult.joinFuture.get().memberId(); + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(0) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) Review Comment: Yeah this is weird..😢 This is also because we don't know the new member id until join group is called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org