jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259054171
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } - private <T> void assertUnorderedListEquals( - List<T> expected, - List<T> actual - ) { - assertEquals(new HashSet<>(expected), new HashSet<>(actual)); - } + @Test + public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(10) + .build(); - private void assertResponseEquals( - ConsumerGroupHeartbeatResponseData expected, - ConsumerGroupHeartbeatResponseData actual - ) { - if (!responseEquals(expected, actual)) { - assertionFailure() - .expected(expected) - .actual(actual) - .buildAndThrow(); + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .withReason("exceed max group size") + .build(); + + for (int i = 0; i < 10; i++) { + CompletableFuture<JoinGroupResponseData> responseFuture; + if (i == 0) { + responseFuture = context.sendGenericGroupJoin( + request, + false, + false, + new ExpectedGenericGroupResult(Errors.NONE, true) + ); + } else { + responseFuture = context.sendGenericGroupJoin(request); + } + assertFalse(responseFuture.isDone()); } + CompletableFuture<JoinGroupResponseData> responseFuture = context.sendGenericGroupJoin(request); + assertTrue(responseFuture.isDone()); + assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } - private boolean responseEquals( - ConsumerGroupHeartbeatResponseData expected, - ConsumerGroupHeartbeatResponseData actual - ) { - if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; - if (expected.errorCode() != actual.errorCode()) return false; - if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; - if (!Objects.equals(expected.memberId(), actual.memberId())) return false; - if (expected.memberEpoch() != actual.memberEpoch()) return false; - if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; - if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; - // Unordered comparison of the assignments. - return responseAssignmentEquals(expected.assignment(), actual.assignment()); - } + @Test + public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { + boolean requiredKnownMemberId = true; + int groupMaxSize = 10; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(groupMaxSize) + .withGenericGroupInitialRebalanceDelayMs(50) + .build(); - private boolean responseAssignmentEquals( - ConsumerGroupHeartbeatResponseData.Assignment expected, - ConsumerGroupHeartbeatResponseData.Assignment actual - ) { - if (expected == actual) return true; - if (expected == null) return false; - if (actual == null) return false; + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); - if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions()))) - return false; + // First round of join requests. Generate member ids. All requests will be accepted + // as the group is still Empty. + List<CompletableFuture<JoinGroupResponseData>> responseFutures = new ArrayList<>(); + for (int i = 0; i < groupMaxSize + 1; i++) { + if (i == 0) { + responseFutures.add(context.sendGenericGroupJoin( + request, + requiredKnownMemberId, + false, + new ExpectedGenericGroupResult(Errors.NONE, true) + )); + } else { + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + } - return Objects.equals(fromAssignment(expected.assignedTopicPartitions()), fromAssignment(actual.assignedTopicPartitions())); + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "group-id", + false + ); + + List<String> memberIds = verifyGenericGroupJoinResponses(responseFutures, 0, Errors.MEMBER_ID_REQUIRED); + assertEquals(groupMaxSize + 1, memberIds.size()); + assertEquals(0, group.size()); + assertTrue(group.isInState(EMPTY)); + assertEquals(groupMaxSize + 1, group.numPendingJoinMembers()); + + // Second round of join requests with the generated member ids. + // One of them will fail, reaching group max size. + responseFutures = new ArrayList<>(); + for (String memberId : memberIds) { + request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(memberId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + + // Advance clock by group initial rebalance delay to complete first inital delayed join. + // This will extend the initial rebalance as new members have joined. + context.timer.advanceClock(50); + // Advance clock by group initial rebalance delay to complete second inital delayed join. + // Since there are no new members that joined since the previous delayed join, + // the join group phase will complete. + context.timer.advanceClock(50); + + verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, Errors.GROUP_MAX_SIZE_REACHED); + assertEquals(groupMaxSize, group.size()); + assertEquals(0, group.numPendingJoinMembers()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + + // Members that were accepted can rejoin while others are rejected in CompletingRebalance state. + responseFutures = new ArrayList<>(); + for (String memberId : memberIds) { + request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(memberId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + + verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, Errors.GROUP_MAX_SIZE_REACHED); } - private Map<Uuid, Set<Integer>> fromAssignment( - List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment - ) { - if (assignment == null) return null; + @Test + public void testDynamicMembersJoinGroupWithMaxSizeAndNotRequiredKnownMember() { + boolean requiredKnownMemberId = false; + int groupMaxSize = 10; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(groupMaxSize) + .withGenericGroupInitialRebalanceDelayMs(50) + .build(); - Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>(); - assignment.forEach(topicPartitions -> { - assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions())); - }); - return assignmentMap; + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + // First round of join requests. This will trigger a rebalance. + List<CompletableFuture<JoinGroupResponseData>> responseFutures = new ArrayList<>(); + responseFutures.add(context.sendGenericGroupJoin( + request, + requiredKnownMemberId, + true, + new ExpectedGenericGroupResult(Errors.NONE, true) + )); + for (int i = 0; i < groupMaxSize; i++) { + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "group-id", + + false + ); + + assertEquals(groupMaxSize, group.size()); + assertEquals(groupMaxSize, group.numAwaitingJoinResponse()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + // Advance clock by group initial rebalance delay to complete first inital delayed join. + // This will extend the initial rebalance as new members have joined. + context.timer.advanceClock(50); + // Advance clock by group initial rebalance delay to complete second inital delayed join. + // Since there are no new members that joined since the previous delayed join, + // we will complete the rebalance. + context.timer.advanceClock(50); + + List<String> memberIds = verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, Errors.GROUP_MAX_SIZE_REACHED); + + // Members that were accepted can rejoin while others are rejected in CompletingRebalance state. + responseFutures = new ArrayList<>(); + for (String memberId : memberIds) { + request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(memberId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + + verifyGenericGroupJoinResponses(responseFutures, 10, Errors.GROUP_MAX_SIZE_REACHED); + assertEquals(groupMaxSize, group.size()); + assertEquals(0, group.numAwaitingJoinResponse()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); } - private void assertRecordsEquals( - List<Record> expectedRecords, - List<Record> actualRecords - ) { - try { - assertEquals(expectedRecords.size(), actualRecords.size()); + @Test + public void testStaticMembersJoinGroupWithMaxSize() { + int groupMaxSize = 10; - for (int i = 0; i < expectedRecords.size(); i++) { - Record expectedRecord = expectedRecords.get(i); - Record actualRecord = actualRecords.get(i); - assertRecordEquals(expectedRecord, actualRecord); + List<String> groupInstanceIds = IntStream.range(0, groupMaxSize + 1) + .mapToObj(i -> "instance-id-" + i) + .collect(Collectors.toList()); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(groupMaxSize) + .withGenericGroupInitialRebalanceDelayMs(50) + .build(); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + // First round of join requests. The group metadata manager will prepare a rebalance. + List<CompletableFuture<JoinGroupResponseData>> responseFutures = new ArrayList<>(); + for (int i = 0; i < groupMaxSize + 1; i++) { Review Comment: the syntax requires to only access final (temporary) variables. setting existing variables to new values require them to be atomic. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } - private <T> void assertUnorderedListEquals( - List<T> expected, - List<T> actual - ) { - assertEquals(new HashSet<>(expected), new HashSet<>(actual)); - } + @Test + public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(10) + .build(); - private void assertResponseEquals( - ConsumerGroupHeartbeatResponseData expected, - ConsumerGroupHeartbeatResponseData actual - ) { - if (!responseEquals(expected, actual)) { - assertionFailure() - .expected(expected) - .actual(actual) - .buildAndThrow(); + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .withReason("exceed max group size") + .build(); + + for (int i = 0; i < 10; i++) { + CompletableFuture<JoinGroupResponseData> responseFuture; + if (i == 0) { + responseFuture = context.sendGenericGroupJoin( + request, + false, + false, + new ExpectedGenericGroupResult(Errors.NONE, true) + ); + } else { + responseFuture = context.sendGenericGroupJoin(request); + } + assertFalse(responseFuture.isDone()); } + CompletableFuture<JoinGroupResponseData> responseFuture = context.sendGenericGroupJoin(request); + assertTrue(responseFuture.isDone()); + assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } - private boolean responseEquals( - ConsumerGroupHeartbeatResponseData expected, - ConsumerGroupHeartbeatResponseData actual - ) { - if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; - if (expected.errorCode() != actual.errorCode()) return false; - if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; - if (!Objects.equals(expected.memberId(), actual.memberId())) return false; - if (expected.memberEpoch() != actual.memberEpoch()) return false; - if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; - if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; - // Unordered comparison of the assignments. - return responseAssignmentEquals(expected.assignment(), actual.assignment()); - } + @Test + public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { + boolean requiredKnownMemberId = true; + int groupMaxSize = 10; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(groupMaxSize) + .withGenericGroupInitialRebalanceDelayMs(50) + .build(); - private boolean responseAssignmentEquals( - ConsumerGroupHeartbeatResponseData.Assignment expected, - ConsumerGroupHeartbeatResponseData.Assignment actual - ) { - if (expected == actual) return true; - if (expected == null) return false; - if (actual == null) return false; + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); - if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions()))) - return false; + // First round of join requests. Generate member ids. All requests will be accepted + // as the group is still Empty. + List<CompletableFuture<JoinGroupResponseData>> responseFutures = new ArrayList<>(); + for (int i = 0; i < groupMaxSize + 1; i++) { + if (i == 0) { + responseFutures.add(context.sendGenericGroupJoin( + request, + requiredKnownMemberId, + false, + new ExpectedGenericGroupResult(Errors.NONE, true) + )); + } else { + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + } - return Objects.equals(fromAssignment(expected.assignedTopicPartitions()), fromAssignment(actual.assignedTopicPartitions())); + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "group-id", + false + ); + + List<String> memberIds = verifyGenericGroupJoinResponses(responseFutures, 0, Errors.MEMBER_ID_REQUIRED); + assertEquals(groupMaxSize + 1, memberIds.size()); + assertEquals(0, group.size()); + assertTrue(group.isInState(EMPTY)); + assertEquals(groupMaxSize + 1, group.numPendingJoinMembers()); + + // Second round of join requests with the generated member ids. + // One of them will fail, reaching group max size. + responseFutures = new ArrayList<>(); + for (String memberId : memberIds) { + request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(memberId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + + // Advance clock by group initial rebalance delay to complete first inital delayed join. + // This will extend the initial rebalance as new members have joined. + context.timer.advanceClock(50); + // Advance clock by group initial rebalance delay to complete second inital delayed join. + // Since there are no new members that joined since the previous delayed join, + // the join group phase will complete. + context.timer.advanceClock(50); + + verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, Errors.GROUP_MAX_SIZE_REACHED); + assertEquals(groupMaxSize, group.size()); + assertEquals(0, group.numPendingJoinMembers()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + + // Members that were accepted can rejoin while others are rejected in CompletingRebalance state. + responseFutures = new ArrayList<>(); + for (String memberId : memberIds) { + request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(memberId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + + verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, Errors.GROUP_MAX_SIZE_REACHED); } - private Map<Uuid, Set<Integer>> fromAssignment( - List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment - ) { - if (assignment == null) return null; + @Test + public void testDynamicMembersJoinGroupWithMaxSizeAndNotRequiredKnownMember() { + boolean requiredKnownMemberId = false; + int groupMaxSize = 10; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(groupMaxSize) + .withGenericGroupInitialRebalanceDelayMs(50) + .build(); - Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>(); - assignment.forEach(topicPartitions -> { - assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions())); - }); - return assignmentMap; + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + // First round of join requests. This will trigger a rebalance. + List<CompletableFuture<JoinGroupResponseData>> responseFutures = new ArrayList<>(); + responseFutures.add(context.sendGenericGroupJoin( + request, + requiredKnownMemberId, + true, + new ExpectedGenericGroupResult(Errors.NONE, true) + )); + for (int i = 0; i < groupMaxSize; i++) { + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "group-id", + + false + ); + + assertEquals(groupMaxSize, group.size()); + assertEquals(groupMaxSize, group.numAwaitingJoinResponse()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + // Advance clock by group initial rebalance delay to complete first inital delayed join. + // This will extend the initial rebalance as new members have joined. + context.timer.advanceClock(50); + // Advance clock by group initial rebalance delay to complete second inital delayed join. + // Since there are no new members that joined since the previous delayed join, + // we will complete the rebalance. + context.timer.advanceClock(50); + + List<String> memberIds = verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, Errors.GROUP_MAX_SIZE_REACHED); + + // Members that were accepted can rejoin while others are rejected in CompletingRebalance state. + responseFutures = new ArrayList<>(); + for (String memberId : memberIds) { + request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(memberId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); + } + + verifyGenericGroupJoinResponses(responseFutures, 10, Errors.GROUP_MAX_SIZE_REACHED); + assertEquals(groupMaxSize, group.size()); + assertEquals(0, group.numAwaitingJoinResponse()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); } - private void assertRecordsEquals( - List<Record> expectedRecords, - List<Record> actualRecords - ) { - try { - assertEquals(expectedRecords.size(), actualRecords.size()); + @Test + public void testStaticMembersJoinGroupWithMaxSize() { + int groupMaxSize = 10; - for (int i = 0; i < expectedRecords.size(); i++) { - Record expectedRecord = expectedRecords.get(i); - Record actualRecord = actualRecords.get(i); - assertRecordEquals(expectedRecord, actualRecord); + List<String> groupInstanceIds = IntStream.range(0, groupMaxSize + 1) + .mapToObj(i -> "instance-id-" + i) + .collect(Collectors.toList()); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(groupMaxSize) + .withGenericGroupInitialRebalanceDelayMs(50) + .build(); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + // First round of join requests. The group metadata manager will prepare a rebalance. + List<CompletableFuture<JoinGroupResponseData>> responseFutures = new ArrayList<>(); + for (int i = 0; i < groupMaxSize + 1; i++) { + String instanceId = groupInstanceIds.get(i); + request = request.setGroupInstanceId(instanceId); + if (i == 0) { + responseFutures.add(context.sendGenericGroupJoin( + request, + false, + true, + new ExpectedGenericGroupResult(Errors.NONE, true) + )); + } else { + responseFutures.add(context.sendGenericGroupJoin(request)); } - } catch (AssertionFailedError e) { - assertionFailure() - .expected(expectedRecords) - .actual(actualRecords) - .buildAndThrow(); } - } - private void assertRecordEquals( - Record expected, - Record actual - ) { - try { - assertApiMessageAndVersionEquals(expected.key(), actual.key()); - assertApiMessageAndVersionEquals(expected.value(), actual.value()); - } catch (AssertionFailedError e) { - assertionFailure() - .expected(expected) - .actual(actual) - .buildAndThrow(); + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "group-id", + false + ); Review Comment: addressed in above comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org