dajac commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1589061058
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10822,1146 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testJoiningConsumerGroupWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList(), + version)) + .build(); + + // The first round of join request gets the new member id. + GroupMetadataManagerTestContext.JoinResult firstJoinResult = context.sendClassicGroupJoin( + request, + true + ); + assertTrue(firstJoinResult.records.isEmpty()); + // Simulate a successful write to the log. + firstJoinResult.appendFuture.complete(null); + + assertTrue(firstJoinResult.joinFuture.isDone()); + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), firstJoinResult.joinFuture.get().errorCode()); + String newMemberId = firstJoinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + // Send second join group request for a new dynamic member with the new member id. + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + secondJoinResult.appendFuture.complete(null); + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + } + + @Test + public void testJoiningConsumerGroupFailingToPersistRecords() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String memberId = Uuid.randomUuid().toString(); + String newMemberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.commit(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(newMemberId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a failed write to the log. + joinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException()); + context.rollback(); + + context.assertNoSessionTimeout(groupId, newMemberId); + context.assertNoSyncTimeout(groupId, newMemberId); + assertThrows( + UnknownMemberIdException.class, + () -> context.groupMetadataManager.consumerGroup(groupId).getOrMaybeCreateMember(newMemberId, false) + ); + } + + @Test + public void testJoiningConsumerGroupWithNewStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + // An empty target assignment is used as the new member id is unknown before calling join group. + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); Review Comment: We may be able to use it here too. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10822,1146 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testJoiningConsumerGroupWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList(), + version)) + .build(); + + // The first round of join request gets the new member id. + GroupMetadataManagerTestContext.JoinResult firstJoinResult = context.sendClassicGroupJoin( + request, + true + ); + assertTrue(firstJoinResult.records.isEmpty()); + // Simulate a successful write to the log. + firstJoinResult.appendFuture.complete(null); + + assertTrue(firstJoinResult.joinFuture.isDone()); + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), firstJoinResult.joinFuture.get().errorCode()); + String newMemberId = firstJoinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + // Send second join group request for a new dynamic member with the new member id. + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + secondJoinResult.appendFuture.complete(null); + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + } + + @Test + public void testJoiningConsumerGroupFailingToPersistRecords() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String memberId = Uuid.randomUuid().toString(); + String newMemberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.commit(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(newMemberId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a failed write to the log. + joinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException()); + context.rollback(); + + context.assertNoSessionTimeout(groupId, newMemberId); + context.assertNoSyncTimeout(groupId, newMemberId); + assertThrows( + UnknownMemberIdException.class, + () -> context.groupMetadataManager.consumerGroup(groupId).getOrMaybeCreateMember(newMemberId, false) + ); + } + + @Test + public void testJoiningConsumerGroupWithNewStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + // An empty target assignment is used as the new member id is unknown before calling join group. + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a successful write to log. + joinResult.appendFuture.complete(null); + String newMemberId = joinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), joinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), joinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), joinResult.records.subList(5, 7)); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testJoiningConsumerGroupReplacingExistingStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + // An empty target assignment is used as the new member id is unknown before calling join group. + + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); Review Comment: In this case, I think that we could actually have an assignor which returns the assigned partitions that it receives. This would avoid this incoherent state. What do you think? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10822,1146 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testJoiningConsumerGroupWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList(), + version)) + .build(); + + // The first round of join request gets the new member id. + GroupMetadataManagerTestContext.JoinResult firstJoinResult = context.sendClassicGroupJoin( + request, + true + ); + assertTrue(firstJoinResult.records.isEmpty()); + // Simulate a successful write to the log. + firstJoinResult.appendFuture.complete(null); + + assertTrue(firstJoinResult.joinFuture.isDone()); + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), firstJoinResult.joinFuture.get().errorCode()); + String newMemberId = firstJoinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + // Send second join group request for a new dynamic member with the new member id. + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + secondJoinResult.appendFuture.complete(null); + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + } + + @Test + public void testJoiningConsumerGroupFailingToPersistRecords() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String memberId = Uuid.randomUuid().toString(); + String newMemberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.commit(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(newMemberId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a failed write to the log. + joinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException()); + context.rollback(); + + context.assertNoSessionTimeout(groupId, newMemberId); + context.assertNoSyncTimeout(groupId, newMemberId); + assertThrows( + UnknownMemberIdException.class, + () -> context.groupMetadataManager.consumerGroup(groupId).getOrMaybeCreateMember(newMemberId, false) + ); + } + + @Test + public void testJoiningConsumerGroupWithNewStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + // An empty target assignment is used as the new member id is unknown before calling join group. + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a successful write to log. + joinResult.appendFuture.complete(null); + String newMemberId = joinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), joinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), joinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), joinResult.records.subList(5, 7)); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testJoiningConsumerGroupReplacingExistingStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + // An empty target assignment is used as the new member id is unknown before calling join group. + + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + // The static member joins with UNKNOWN_MEMBER_ID. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request, + true + ); + + // Simulate a successful write to log. + joinResult.appendFuture.complete(null); + String newMemberId = joinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(0) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + // Remove the old static member. + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + + // Create the new static member. + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords, joinResult.records); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(0) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testJoiningConsumerGroupWithExistingStaticMemberAndNewSubscription() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .withAssignmentEpoch(10)) + .build(); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); + group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0), + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + // Member 1 rejoins with a new subscription list. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName, zarTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0))), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0), + mkTopicAssignment(fooTopicId, 1))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 2), joinResult.records.subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 4), joinResult.records.subList(2, 4)); + assertRecordsEquals(expectedRecords.subList(4, 6), joinResult.records.subList(4, 6)); + + joinResult.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(10) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + } + + @Test + public void testStaticMemberJoiningConsumerGroupWithUnknownInstanceId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + // Set up a ConsumerGroup with no static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with an instance id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testStaticMemberJoiningConsumerGroupWithUnmatchedMemberId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with the same instance id and a different member id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testReconciliationInJoiningConsumerGroupWithEagerProtocol() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(barTopicName, 0)))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .withAssignmentEpoch(10)) + .build(); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); + group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); + + // Prepare the new target assignment. + // Member 1 will need to revoke bar-0, and member 2 will need to revoke foo-1. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + // Member 1 rejoins with a new subscription list and transitions to UNREVOKED_PARTITIONS, because its + // assignedPartitions still contains bar-0 though all of its partitions have actually been revoked. Review Comment: I am glad that we added those tests because this does not seem to be correct. When an eager protocol is used, all the partitions are revoked prior to re-joining the group. Hence, there is not need to put the revoked partitions to the partitions pending revocation set. The reconciliation state machine does not handle this correctly. I played a bit with it and I think that we could do something like [this](https://github.com/apache/kafka/commit/bbe9c1fde2ba5b25dc66220971c21e80c9cb67a8). What do you think? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10822,1146 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testJoiningConsumerGroupWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList(), + version)) + .build(); + + // The first round of join request gets the new member id. + GroupMetadataManagerTestContext.JoinResult firstJoinResult = context.sendClassicGroupJoin( + request, + true + ); + assertTrue(firstJoinResult.records.isEmpty()); + // Simulate a successful write to the log. + firstJoinResult.appendFuture.complete(null); + + assertTrue(firstJoinResult.joinFuture.isDone()); + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), firstJoinResult.joinFuture.get().errorCode()); + String newMemberId = firstJoinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + // Send second join group request for a new dynamic member with the new member id. + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + secondJoinResult.appendFuture.complete(null); + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + } + + @Test + public void testJoiningConsumerGroupFailingToPersistRecords() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String memberId = Uuid.randomUuid().toString(); + String newMemberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.commit(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(newMemberId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a failed write to the log. + joinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException()); + context.rollback(); + + context.assertNoSessionTimeout(groupId, newMemberId); + context.assertNoSyncTimeout(groupId, newMemberId); + assertThrows( + UnknownMemberIdException.class, + () -> context.groupMetadataManager.consumerGroup(groupId).getOrMaybeCreateMember(newMemberId, false) + ); Review Comment: nit: We could use `ConsumerGroup#hasMember`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10822,1146 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testJoiningConsumerGroupWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList(), + version)) + .build(); + + // The first round of join request gets the new member id. + GroupMetadataManagerTestContext.JoinResult firstJoinResult = context.sendClassicGroupJoin( + request, + true + ); + assertTrue(firstJoinResult.records.isEmpty()); + // Simulate a successful write to the log. + firstJoinResult.appendFuture.complete(null); + + assertTrue(firstJoinResult.joinFuture.isDone()); + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), firstJoinResult.joinFuture.get().errorCode()); + String newMemberId = firstJoinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + // Send second join group request for a new dynamic member with the new member id. + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + secondJoinResult.appendFuture.complete(null); + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + } + + @Test + public void testJoiningConsumerGroupFailingToPersistRecords() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String memberId = Uuid.randomUuid().toString(); + String newMemberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.commit(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(newMemberId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a failed write to the log. + joinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException()); + context.rollback(); + + context.assertNoSessionTimeout(groupId, newMemberId); + context.assertNoSyncTimeout(groupId, newMemberId); + assertThrows( + UnknownMemberIdException.class, + () -> context.groupMetadataManager.consumerGroup(groupId).getOrMaybeCreateMember(newMemberId, false) + ); + } + + @Test + public void testJoiningConsumerGroupWithNewStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + // An empty target assignment is used as the new member id is unknown before calling join group. + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + // Simulate a successful write to log. + joinResult.appendFuture.complete(null); + String newMemberId = joinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), joinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), joinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), joinResult.records.subList(5, 7)); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testJoiningConsumerGroupReplacingExistingStaticMember() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + // An empty target assignment is used as the new member id is unknown before calling join group. + + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + // The static member joins with UNKNOWN_MEMBER_ID. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request, + true + ); + + // Simulate a successful write to log. + joinResult.appendFuture.complete(null); + String newMemberId = joinResult.joinFuture.get().memberId(); + assertNotEquals("", newMemberId); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(0) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + // Remove the old static member. + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + + // Create the new static member. + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords, joinResult.records); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(0) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testJoiningConsumerGroupWithExistingStaticMemberAndNewSubscription() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .withAssignmentEpoch(10)) + .build(); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); + group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0), + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + // Member 1 rejoins with a new subscription list. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName, zarTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0))), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0), + mkTopicAssignment(fooTopicId, 1))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 2), joinResult.records.subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 4), joinResult.records.subList(2, 4)); + assertRecordsEquals(expectedRecords.subList(4, 6), joinResult.records.subList(4, 6)); + + joinResult.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(10) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + } + + @Test + public void testStaticMemberJoiningConsumerGroupWithUnknownInstanceId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + // Set up a ConsumerGroup with no static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with an instance id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testStaticMemberJoiningConsumerGroupWithUnmatchedMemberId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with the same instance id and a different member id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testReconciliationInJoiningConsumerGroupWithEagerProtocol() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(barTopicName, 0)))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .withAssignmentEpoch(10)) + .build(); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); + group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); + + // Prepare the new target assignment. + // Member 1 will need to revoke bar-0, and member 2 will need to revoke foo-1. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + // Member 1 rejoins with a new subscription list and transitions to UNREVOKED_PARTITIONS, because its + // assignedPartitions still contains bar-0 though all of its partitions have actually been revoked. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult1 = context.sendClassicGroupJoin(request); + + ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName, zarTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName, zarTopicName), + Collections.emptyList())) + .build(); + + List<Record> expectedRecords1 = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember1), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0))), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0))), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember1) + ); + assertEquals(expectedRecords1.size(), joinResult1.records.size()); + assertRecordsEquals(expectedRecords1.subList(0, 3), joinResult1.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords1.subList(3, 5), joinResult1.records.subList(3, 5)); + assertRecordsEquals(expectedRecords1.subList(5, 7), joinResult1.records.subList(5, 7)); + + assertEquals(expectedMember1.state(), group.getOrMaybeCreateMember(memberId1, false).state()); + + joinResult1.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(10) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult1.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + + // Member 1 rejoins to transition from UNREVOKED_PARTITIONS to UNRELEASED_PARTITIONS. + GroupMetadataManagerTestContext.JoinResult joinResult2 = context.sendClassicGroupJoin(request); + + ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(expectedMember1) + .setMemberEpoch(11) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setPartitionsPendingRevocation(Collections.emptyMap()) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(zarTopicId, 0))) + .build(); + + assertRecordsEquals( + Collections.singletonList(RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember2)), + joinResult2.records + ); + assertEquals(expectedMember2.state(), group.getOrMaybeCreateMember(memberId1, false).state()); + + joinResult2.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult2.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + + // Member 2 rejoins to transition to UNREVOKED_PARTITIONS and confirm revoking foo-1. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + ); + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setTopicPartitions(Collections.emptyList()) + ); + + // Member 1 rejoins to transition from UNRELEASED_PARTITIONS to STABLE. + GroupMetadataManagerTestContext.JoinResult joinResult3 = context.sendClassicGroupJoin(request); + ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(expectedMember2) + .setState(MemberState.STABLE) + .setPreviousMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0))) + .build(); + + assertRecordsEquals( + Collections.singletonList(RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)), + joinResult3.records + ); + assertEquals(expectedMember3.state(), group.getOrMaybeCreateMember(memberId1, false).state()); + + joinResult3.appendFuture.complete(null); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult3.joinFuture.get() + ); + context.assertSessionTimeout(groupId, memberId1, request.sessionTimeoutMs()); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + } + + @Test + public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() throws Exception { + String groupId = "group-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(barTopicName, 0)))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0), + mkTopicAssignment(barTopicId, 0))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .withAssignmentEpoch(10)) + .build(); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); + group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); + + // Prepare the new target assignment. + // Member 1 will need to revoke bar-0, and member 2 will need to revoke foo-1. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(zarTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + // Member 1 rejoins with a new subscription list and transitions to UNREVOKED_PARTITIONS, because its + // assignedPartitions still contains bar-0 though all of its partitions have actually been revoked. Review Comment: nit: This comment seems to be incorrect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org