jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1260500922


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2402,143 +2610,2090 @@ public void testOnNewMetadataImage() {
         assertEquals(image, context.groupMetadataManager.image());
     }
 
-    private <T> void assertUnorderedListEquals(
-        List<T> expected,
-        List<T> actual
-    ) {
-        assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-    }
+    @Test
+    public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(10)
+            .build();
 
-    private void assertResponseEquals(
-        ConsumerGroupHeartbeatResponseData expected,
-        ConsumerGroupHeartbeatResponseData actual
-    ) {
-        if (!responseEquals(expected, actual)) {
-            assertionFailure()
-                .expected(expected)
-                .actual(actual)
-                .buildAndThrow();
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withReason("exceed max group size")
+            .build();
+
+        for (int i = 0; i < 10; i++) {
+            CompletableFuture<JoinGroupResponseData> responseFuture;
+            if (i == 0) {
+                responseFuture = context.sendGenericGroupJoin(
+                    request,
+                    false,
+                    false,
+                    new ExpectedGenericGroupResult(Errors.NONE, true)
+                );
+            } else {
+                responseFuture = context.sendGenericGroupJoin(request);
+            }
+            assertFalse(responseFuture.isDone());
         }
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
     }
 
-    private boolean responseEquals(
-        ConsumerGroupHeartbeatResponseData expected,
-        ConsumerGroupHeartbeatResponseData actual
-    ) {
-        if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-        if (expected.errorCode() != actual.errorCode()) return false;
-        if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-        if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-        if (expected.memberEpoch() != actual.memberEpoch()) return false;
-        if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-        if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-        // Unordered comparison of the assignments.
-        return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-    }
+    @Test
+    public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+        boolean requiredKnownMemberId = true;
+        int groupMaxSize = 10;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
 
-    private boolean responseAssignmentEquals(
-        ConsumerGroupHeartbeatResponseData.Assignment expected,
-        ConsumerGroupHeartbeatResponseData.Assignment actual
-    ) {
-        if (expected == actual) return true;
-        if (expected == null) return false;
-        if (actual == null) return false;
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
 
-        if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions())))
-            return false;
+        // First round of join requests. Generate member ids. All requests 
will be accepted
+        // as the group is still Empty.
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        for (int i = 0; i < groupMaxSize + 1; i++) {
+            if (i == 0) {
+                responseFutures.add(context.sendGenericGroupJoin(
+                    request,
+                    requiredKnownMemberId,
+                    false,
+                    new ExpectedGenericGroupResult(Errors.NONE, true)
+                ));
+            } else {
+                responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+            }
+        }
 
-        return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 
fromAssignment(actual.assignedTopicPartitions()));
+        GenericGroup group = genericGroup(context, "group-id");
+
+        List<String> memberIds = 
verifyGenericGroupJoinResponses(responseFutures, 0, Errors.MEMBER_ID_REQUIRED);
+        assertEquals(groupMaxSize + 1, memberIds.size());
+        assertEquals(0, group.size());
+        assertTrue(group.isInState(EMPTY));
+        assertEquals(groupMaxSize + 1, group.numPendingJoinMembers());
+
+        // Second round of join requests with the generated member ids.
+        // One of them will fail, reaching group max size.
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        // Advance clock by group initial rebalance delay to complete first 
inital delayed join.
+        // This will extend the initial rebalance as new members have joined.
+        context.timer.advanceClock(50);
+        // Advance clock by group initial rebalance delay to complete second 
inital delayed join.
+        // Since there are no new members that joined since the previous 
delayed join,
+        // the join group phase will complete.
+        context.timer.advanceClock(50);
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(0, group.numPendingJoinMembers());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        // Members that were accepted can rejoin while others are rejected in 
CompletingRebalance state.
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
     }
 
-    private Map<Uuid, Set<Integer>> fromAssignment(
-        List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment
-    ) {
-        if (assignment == null) return null;
+    @Test
+    public void 
testDynamicMembersJoinGroupWithMaxSizeAndNotRequiredKnownMember() {
+        boolean requiredKnownMemberId = false;
+        int groupMaxSize = 10;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
 
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions -> {
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
-        });
-        return assignmentMap;
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        // First round of join requests. This will trigger a rebalance.
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        responseFutures.add(context.sendGenericGroupJoin(
+            request,
+            requiredKnownMemberId,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        ));
+        for (int i = 0; i < groupMaxSize; i++) {
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Advance clock by group initial rebalance delay to complete first 
inital delayed join.
+        // This will extend the initial rebalance as new members have joined.
+        context.timer.advanceClock(50);
+        // Advance clock by group initial rebalance delay to complete second 
inital delayed join.
+        // Since there are no new members that joined since the previous 
delayed join,
+        // we will complete the rebalance.
+        context.timer.advanceClock(50);
+
+        List<String> memberIds = 
verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+
+        // Members that were accepted can rejoin while others are rejected in 
CompletingRebalance state.
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        verifyGenericGroupJoinResponses(responseFutures, 10, 
Errors.GROUP_MAX_SIZE_REACHED);
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(0, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
     }
 
-    private void assertRecordsEquals(
-        List<Record> expectedRecords,
-        List<Record> actualRecords
-    ) {
-        try {
-            assertEquals(expectedRecords.size(), actualRecords.size());
+    @Test
+    public void testStaticMembersJoinGroupWithMaxSize() {
+        int groupMaxSize = 10;
 
-            for (int i = 0; i < expectedRecords.size(); i++) {
-                Record expectedRecord = expectedRecords.get(i);
-                Record actualRecord = actualRecords.get(i);
-                assertRecordEquals(expectedRecord, actualRecord);
+        List<String> groupInstanceIds = IntStream.range(0, groupMaxSize + 1)
+            .mapToObj(i -> "instance-id-" + i)
+            .collect(Collectors.toList());
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        // First round of join requests. The group metadata manager will 
prepare a rebalance.
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        for (int i = 0; i < groupMaxSize + 1; i++) {
+            String instanceId = groupInstanceIds.get(i);
+            request = request.setGroupInstanceId(instanceId);
+            if (i == 0) {
+                responseFutures.add(context.sendGenericGroupJoin(
+                    request,
+                    false,
+                    true,
+                    new ExpectedGenericGroupResult(Errors.NONE, true)
+                ));
+            } else {
+                responseFutures.add(context.sendGenericGroupJoin(request));
             }
-        } catch (AssertionFailedError e) {
-            assertionFailure()
-                .expected(expectedRecords)
-                .actual(actualRecords)
-                .buildAndThrow();
         }
-    }
 
-    private void assertRecordEquals(
-        Record expected,
-        Record actual
-    ) {
-        try {
-            assertApiMessageAndVersionEquals(expected.key(), actual.key());
-            assertApiMessageAndVersionEquals(expected.value(), actual.value());
-        } catch (AssertionFailedError e) {
-            assertionFailure()
-                .expected(expected)
-                .actual(actual)
-                .buildAndThrow();
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Advance clock by group initial rebalance delay to complete first 
inital delayed join.
+        // This will extend the initial rebalance as new members have joined.
+        context.timer.advanceClock(50);
+        // Advance clock by group initial rebalance delay to complete second 
inital delayed join.
+        // Since there are no new members that joined since the previous 
delayed join,
+        // we will complete the rebalance.
+        context.timer.advanceClock(50);
+
+        List<String> memberIds = 
verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+
+        // Members which were accepted can rejoin, others are rejected, while
+        // completing rebalance
+        responseFutures = new ArrayList<>();
+        for (int i = 0; i < groupMaxSize; i++) {
+            String memberId = memberIds.get(i);
+            String instanceId = groupInstanceIds.get(i);
+
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withGroupInstanceId(instanceId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request));
         }
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(0, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
     }
 
-    private void assertApiMessageAndVersionEquals(
-        ApiMessageAndVersion expected,
-        ApiMessageAndVersion actual
-    ) {
-        if (expected == actual) return;
+    @Test
+    public void testDynamicMembersCanRejoinGroupWithMaxSizeWhileRebalancing() {
+        boolean requiredKnownMemberId = true;
+        int groupMaxSize = 10;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
 
-        assertEquals(expected.version(), actual.version());
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
 
-        if (actual.message() instanceof 
ConsumerGroupCurrentMemberAssignmentValue) {
-            // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
-            // always guaranteed. Therefore, we need a special comparator.
-            ConsumerGroupCurrentMemberAssignmentValue expectedValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) expected.message();
-            ConsumerGroupCurrentMemberAssignmentValue actualValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) actual.message();
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        // First round of join requests. Generate member ids.
+        for (int i = 0; i < groupMaxSize + 1; i++) {
+            if (i == 0) {
+                responseFutures.add(context.sendGenericGroupJoin(
+                    request,
+                    requiredKnownMemberId,
+                    false,
+                    new ExpectedGenericGroupResult(Errors.NONE, true)
+                ));
+            } else {
+                responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+            }
+        }
 
-            assertEquals(expectedValue.memberEpoch(), 
actualValue.memberEpoch());
-            assertEquals(expectedValue.previousMemberEpoch(), 
actualValue.previousMemberEpoch());
-            assertEquals(expectedValue.targetMemberEpoch(), 
actualValue.targetMemberEpoch());
-            assertEquals(expectedValue.error(), actualValue.error());
-            assertEquals(expectedValue.metadataVersion(), 
actualValue.metadataVersion());
-            assertEquals(expectedValue.metadataBytes(), 
actualValue.metadataBytes());
+        GenericGroup group = genericGroup(context, "group-id");
 
-            // We transform those to Maps before comparing them.
-            
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
-                fromTopicPartitions(actualValue.assignedPartitions()));
-            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
-                
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
-            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
-                
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
-        } else {
-            assertEquals(expected.message(), actual.message());
+        assertEquals(0, group.size());
+        assertEquals(groupMaxSize + 1, group.numPendingJoinMembers());
+        assertTrue(group.isInState(EMPTY));
+
+        List<String> memberIds = 
verifyGenericGroupJoinResponses(responseFutures, 0, Errors.MEMBER_ID_REQUIRED);
+        assertEquals(groupMaxSize + 1, memberIds.size());
+
+        // Second round of join requests with the generated member ids.
+        // One of them will fail, reaching group max size.
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            
responseFutures.add(context.sendGenericGroupJoin(request.setMemberId(memberId), 
requiredKnownMemberId));
+        }
+
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Members can rejoin while rebalancing
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
         }
+
+        // Advance clock by group initial rebalance delay to complete first 
inital delayed join.
+        // This will extend the initial rebalance as new members have joined.
+        context.timer.advanceClock(50);
+        // Advance clock by group initial rebalance delay to complete second 
inital delayed join.
+        // Since there are no new members that joined since the previous 
delayed join,
+        // we will complete the rebalance.
+        context.timer.advanceClock(50);
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(0, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
     }
 
-    private Map<Uuid, Set<Integer>> fromTopicPartitions(
-        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
assignment
-    ) {
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions -> {
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
+    @Test
+    public void 
testLastJoiningMembersAreKickedOutWhenRejoiningGroupWithMaxSize() {
+        int groupMaxSize = 10;
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
+
+        // Create a group and add members that exceed the group max size.
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group-id",
+            true
+        );
+
+        List<String> memberIds = IntStream.range(0, groupMaxSize + 2)
+            .mapToObj(i -> group.generateMemberId("client-id", 
Optional.empty()))
+            .collect(Collectors.toList());
+
+        memberIds.forEach(memberId -> {
+            JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+            protocols.add(new JoinGroupRequestProtocol()
+                .setName("range")
+                .setMetadata(new byte[0]));
+
+            group.add(
+                new GenericGroupMember(
+                    memberId,
+                    Optional.empty(),
+                    "client-id",
+                    "client-host",
+                    10000,
+                    5000,
+                    "consumer",
+                    protocols
+                )
+            );
+        });
+
+        context.groupMetadataManager.prepareRebalance(group, "test");
+
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        for (String memberId : memberIds) {
+            JoinGroupRequestData request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .withRebalanceTimeoutMs(10000)
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request));
+        }
+
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Advance clock by rebalance timeout to complete join phase.
+        context.timer.advanceClock(10000);
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+
+        assertEquals(groupMaxSize, group.size());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        memberIds.subList(groupMaxSize, groupMaxSize + 2)
+            .forEach(memberId -> assertFalse(group.hasMemberId(memberId)));
+
+        memberIds.subList(0, groupMaxSize)
+            .forEach(memberId -> assertTrue(group.hasMemberId(memberId)));
+    }
+
+    @Test
+    public void testJoinGroupSessionTimeoutTooSmall() throws Exception {
+        int minSessionTimeout = 50;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMinSessionTimeoutMs(minSessionTimeout)
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withSessionTimeoutMs(minSessionTimeout - 1)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.INVALID_SESSION_TIMEOUT.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupSessionTimeoutTooLarge() throws Exception {
+        int maxSessionTimeout = 50;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSessionTimeoutMs(maxSessionTimeout)
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withSessionTimeoutMs(maxSessionTimeout + 1)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.INVALID_SESSION_TIMEOUT.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupUnknownConsumerNewGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId("member-id")
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), response.errorCode());
+
+        // Static member
+        request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId("member-id")
+            .withGroupInstanceId("group-instance-id")
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testGenericGroupJoinInconsistentProtocolType() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+        protocols.add(new JoinGroupRequestProtocol().setName("range"));
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+
+        request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("connect")
+            .withProtocols(protocols)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupWithEmptyProtocolType() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+        protocols.add(new JoinGroupRequestProtocol().setName("range"));
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), 
response.errorCode());
+
+        // Send as static member join.
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request.setGroupInstanceId("group-instance-id"), true, true, null);
+
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), 
responseFuture.get().errorCode());
+    }
+
+    @Test
+    public void testJoinGroupWithEmptyGroupProtocol() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), 
response.errorCode());
+    }
+
+    @Test
+    public void testNewMemberJoinExpiration() throws Exception {
+        // This tests new member expiration during a protracted rebalance. We 
first create a
+        // group with one member which uses a large value for session timeout 
and rebalance timeout.
+        // We then join with one new member and let the rebalance hang while 
we await the first member.
+        // The new member join timeout expires and its JoinGroup request is 
failed.
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(5000 + 
context.genericGroupNewMemberJoinTimeoutMs)
+            .withRebalanceTimeoutMs(2 * 
context.genericGroupNewMemberJoinTimeoutMs)
+            .build();
+
+        JoinGroupResponseData firstResponse = 
context.joinGenericGroupAsDynamicMember(request);
+        String firstMemberId = firstResponse.memberId();
+        assertEquals(firstResponse.leader(), firstMemberId);
+        assertEquals(Errors.NONE.code(), firstResponse.errorCode());
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertNotNull(group);
+        assertEquals(0, 
group.allMembers().stream().filter(GenericGroupMember::isNew).count());
+
+        // Send second join group request for a new dynamic member.
+        CompletableFuture<JoinGroupResponseData> secondResponseFuture = 
context.sendGenericGroupJoin(request
+            .setSessionTimeoutMs(5000)
+            .setRebalanceTimeoutMs(5000));
+
+        assertFalse(secondResponseFuture.isDone());
+
+        assertEquals(2, group.allMembers().size());
+        assertEquals(1, 
group.allMembers().stream().filter(GenericGroupMember::isNew).count());
+
+        GenericGroupMember newMember = 
group.allMembers().stream().filter(GenericGroupMember::isNew).findFirst().get();
+        assertNotEquals(firstMemberId, newMember.memberId());
+
+        // Advance clock by new member join timeout to expire the second 
member.
+        context.timer.advanceClock(context.genericGroupNewMemberJoinTimeoutMs);
+
+        assertTrue(secondResponseFuture.isDone());
+
+        JoinGroupResponseData secondResponse = secondResponseFuture.get(5, 
TimeUnit.SECONDS);
+
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
secondResponse.errorCode());
+        assertEquals(1, group.allMembers().size());
+        assertEquals(0, 
group.allMembers().stream().filter(GenericGroupMember::isNew).count());
+        assertEquals(firstMemberId, 
group.allMembers().iterator().next().memberId());
+    }
+
+    @Test
+    public void testJoinGroupInconsistentGroupProtocol() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request,
+            false,
+            false,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+        assertFalse(responseFuture.isDone());
+
+        JoinGroupRequestProtocolCollection otherProtocols = new 
JoinGroupRequestProtocolCollection(0);
+        otherProtocols.add(new 
JoinGroupRequestProtocol().setName("roundrobin"));
+        CompletableFuture<JoinGroupResponseData> otherResponseFuture = 
context.sendGenericGroupJoin(
+            request.setProtocols(otherProtocols)
+        );
+
+        
context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.NONE.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), 
otherResponseFuture.get(5, TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupSecondJoinInconsistentProtocol() throws Exception 
{
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array())
+        );
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request,
+            true,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.MEMBER_ID_REQUIRED.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+
+        // Sending an inconsistent protocol should be refused
+        String memberId = responseFuture.get(5, TimeUnit.SECONDS).memberId();
+        JoinGroupRequestProtocolCollection emptyProtocols = new 
JoinGroupRequestProtocolCollection(0);
+        request = request.setMemberId(memberId)
+            .setProtocols(emptyProtocols);
+
+        responseFuture = context.sendGenericGroupJoin(request, true);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
+
+        // Sending consistent protocol should be accepted
+        responseFuture = 
context.sendGenericGroupJoin(request.setProtocols(protocols), true);
+        
context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.NONE.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testStaticMemberJoinAsFirstMember() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId("group-instance-id")
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData response = context.joinGenericGroup(request, 
false, true);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+    }
+
+    @Test
+    public void testStaticMemberRejoinWithExplicitUnknownMemberId() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId("group-instance-id")
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(5000)
+            .withRebalanceTimeoutMs(5000)
+            .build();
+
+        JoinGroupResponseData response = context.joinGenericGroup(request, 
false, true);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request.setMemberId("unknown-member-id"));
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.FENCED_INSTANCE_ID.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupUnknownConsumerExistingGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(5000)
+            .withRebalanceTimeoutMs(5000)
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+
+        JoinGroupResponseData otherResponse = 
context.joinGenericGroupAsDynamicMember(request
+            .setMemberId("other-member-id"));
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
otherResponse.errorCode());
+    }
+
+    @Test
+    public void testJoinGroupUnknownConsumerNewDeadGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group-id",
+            true
+        );
+        group.transitionTo(DEAD);
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupProtocolTypeIsNotProvidedWhenAnErrorOccurs() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId("member-id")
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), response.errorCode());
+        assertNull(response.protocolType());
+    }
+
+    @Test
+    public void testJoinGroupReturnsTheProtocolType() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // Leader joins
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> leaderResponseFuture = 
context.sendGenericGroupJoin(
+            request,
+            false,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+        assertFalse(leaderResponseFuture.isDone());
+
+        // Member joins
+        CompletableFuture<JoinGroupResponseData> memberResponseFuture = 
context.sendGenericGroupJoin(request);
+        assertFalse(memberResponseFuture.isDone());
+
+        // Complete join group phase
+        
context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs);
+        assertTrue(leaderResponseFuture.isDone());
+        assertTrue(memberResponseFuture.isDone());
+
+        assertEquals(Errors.NONE.code(), leaderResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals("consumer", leaderResponseFuture.get(5, 
TimeUnit.SECONDS).protocolType());
+        assertEquals(Errors.NONE.code(), memberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals("consumer", memberResponseFuture.get(5, 
TimeUnit.SECONDS).protocolType());
+    }
+
+    @Test
+    public void 
shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request,
+            false,
+            false,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+
+        context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs 
/ 2);
+        assertFalse(responseFuture.isDone());
+
+        context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs 
/ 2 + 1);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.NONE.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void 
shouldResetRebalanceDelayWhenNewMemberJoinsGroupDuringInitialRebalance() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            
.withRebalanceTimeoutMs(context.genericGroupInitialRebalanceDelayMs * 3)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> firstMemberResponseFuture = 
context.sendGenericGroupJoin(
+            request,
+            false,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+
+        context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs 
- 1);
+        CompletableFuture<JoinGroupResponseData> secondMemberResponseFuture = 
context.sendGenericGroupJoin(request);
+        context.timer.advanceClock(2);
+
+        // Advance clock past initial rebalance delay and verify futures are 
not completed.
+        context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs 
/ 2 + 1);
+        assertFalse(firstMemberResponseFuture.isDone());
+        assertFalse(secondMemberResponseFuture.isDone());
+
+        // Advance clock beyond recomputed delay and make sure the futures 
have completed.
+        context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs 
/ 2);
+        assertTrue(firstMemberResponseFuture.isDone());
+        assertTrue(secondMemberResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), firstMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(Errors.NONE.code(), secondMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void shouldDelayRebalanceUptoRebalanceTimeout() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            
.withRebalanceTimeoutMs(context.genericGroupInitialRebalanceDelayMs * 2)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> firstMemberResponseFuture = 
context.sendGenericGroupJoin(
+            request,
+            false,
+            false,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+        CompletableFuture<JoinGroupResponseData> secondMemberResponseFuture = 
context.sendGenericGroupJoin(request);
+
+        context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs 
+ 1);
+
+        CompletableFuture<JoinGroupResponseData> thirdMemberResponseFuture = 
context.sendGenericGroupJoin(request);
+
+        // Advance clock right before rebalance timeout.
+        context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs 
- 1);
+        assertFalse(firstMemberResponseFuture.isDone());
+        assertFalse(secondMemberResponseFuture.isDone());
+        assertFalse(thirdMemberResponseFuture.isDone());
+
+        // Advance clock beyond rebalance timeout.
+        context.timer.advanceClock(1);
+        assertTrue(firstMemberResponseFuture.isDone());
+        assertTrue(secondMemberResponseFuture.isDone());
+        assertTrue(thirdMemberResponseFuture.isDone());
+
+        assertEquals(Errors.NONE.code(), firstMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(Errors.NONE.code(), secondMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(Errors.NONE.code(), thirdMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupReplaceStaticMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId("group-instance-id")
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(5000)
+            .build();
+
+
+        // Send join group as static member.
+        CompletableFuture<JoinGroupResponseData> oldMemberResponseFuture = 
context.sendGenericGroupJoin(
+            request,
+            true,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+        assertFalse(oldMemberResponseFuture.isDone());
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertEquals(1, group.numAwaitingJoinResponse());
+        assertEquals(1, group.size());
+
+        // Replace static member with new member id. Old member id should be 
fenced.
+        CompletableFuture<JoinGroupResponseData> newMemberResponseFuture = 
context.sendGenericGroupJoin(request);
+
+        assertFalse(newMemberResponseFuture.isDone());
+        assertTrue(oldMemberResponseFuture.isDone());
+        assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
oldMemberResponseFuture.get(5, TimeUnit.SECONDS).errorCode());
+        assertEquals(1, group.numAwaitingJoinResponse());
+        assertEquals(1, group.size());
+
+        // Complete join for new member.
+        
context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs);
+        assertTrue(newMemberResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), newMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(0, group.numAwaitingJoinResponse());
+        assertEquals(1, group.size());
+    }
+
+    @Test
+    public void testHeartbeatExpirationShouldRemovePendingMember() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(1000)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request,
+            true,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.MEMBER_ID_REQUIRED.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+        assertEquals(0, group.size());
+        assertEquals(1, group.numPendingJoinMembers());
+
+        // Advance clock by session timeout. Pending member should be removed 
from group as heartbeat expires.
+        context.timer.advanceClock(1000);
+        assertEquals(0, group.numPendingJoinMembers());
+    }
+
+    @Test
+    public void testHeartbeatExpirationShouldRemoveMember() throws Exception {
+        // Set initial rebalance delay to simulate a long running rebalance.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupInitialRebalanceDelayMs(10 * 60 * 1000)
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request,
+            false,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+        assertFalse(responseFuture.isDone());
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        String memberId = group.leaderOrNull();
+        assertEquals(1, group.size());
+
+        // Advance clock by new member join timeout. Member should be removed 
from group as heartbeat expires.
+        // A group that transitions to Empty after completing join phase will 
generate records.
+        context.timer.expectResult(heartbeatKey("group-id", memberId), new 
ExpectedGenericGroupResult(
+            Collections.singletonList(newGroupMetadataRecord("group-id",
+                new GroupMetadataValue()
+                    .setMembers(Collections.emptyList())
+                    .setGeneration(1)
+                    .setLeader(null)
+                    .setProtocolType("consumer")
+                    .setProtocol(null)
+                    .setCurrentStateTimestamp(context.time.milliseconds()),
+                MetadataVersion.latest())),
+            Errors.NONE,
+            false
+        ));
+
+        context.timer.advanceClock(context.genericGroupNewMemberJoinTimeoutMs);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(0, group.size());
+    }
+
+    @Test
+    public void testExistingMemberJoinDeadGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        String memberId = response.memberId();
+
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+        assertTrue(group.hasMemberId(memberId));
+
+        group.transitionTo(DEAD);
+
+        response = 
context.joinGenericGroupAsDynamicMember(request.setMemberId(memberId));
+        assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
response.errorCode());
+    }
+
+    @Test
+    public void 
testJoinGroupExistingPendingMemberWithGroupInstanceIdThrowsException() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request,
+            true,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+        assertTrue(responseFuture.isDone());
+        String memberId = responseFuture.get(5, TimeUnit.SECONDS).memberId();
+
+        assertThrows(IllegalStateException.class,
+            () -> 
context.sendGenericGroupJoin(request.setMemberId(memberId).setGroupInstanceId("group-instance-id"))
+        );
+    }
+
+    @Test
+    public void testJoinGroupExistingMemberUpdatedMetadataTriggersRebalance() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array()));
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        String memberId = response.memberId();
+
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+        GenericGroupMember member = group.member(memberId);
+
+        assertEquals(protocols, member.supportedProtocols());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(1, group.generationId());
+
+        protocols = new JoinGroupRequestProtocolCollection(0);
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array()));
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("roundrobin")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("bar"))).array()));
+
+        // Send updated member metadata. This should trigger a rebalance and 
complete the join phase.
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request
+            .setMemberId(memberId)
+            .setProtocols(protocols)
+        );
+
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(2, group.generationId());
+        assertEquals(protocols, member.supportedProtocols());
+    }
+
+    @Test
+    public void testJoinGroupAsExistingLeaderTriggersRebalanceInStableState() 
throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        String memberId = response.memberId();
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertTrue(group.isLeader(memberId));
+        assertEquals(1, group.generationId());
+
+        group.transitionTo(STABLE);
+        // Sending join group as leader should trigger a rebalance.
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request.setMemberId(memberId));
+
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.NONE.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(2, group.generationId());
+    }
+
+    @Test
+    public void 
testJoinGroupAsExistingMemberWithUpdatedMetadataTriggersRebalanceInStableState()
 throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array()));
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData leaderResponse = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.NONE.code(), leaderResponse.errorCode());
+        String leaderId = leaderResponse.leader();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+        assertEquals(1, group.generationId());
+
+        // Member joins.
+        CompletableFuture<JoinGroupResponseData> memberResponseFuture = 
context.sendGenericGroupJoin(request);
+        // Leader also rejoins. Completes join group phase.
+        CompletableFuture<JoinGroupResponseData> leaderResponseFuture = 
context.sendGenericGroupJoin(request
+            .setMemberId(leaderId));
+
+        assertTrue(leaderResponseFuture.isDone());
+        assertTrue(memberResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), leaderResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(Errors.NONE.code(), memberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(2, group.size());
+        assertEquals(2, group.generationId());
+
+        group.transitionTo(STABLE);
+
+        // Member rejoins with updated metadata. This should trigger a 
rebalance.
+        String memberId = memberResponseFuture.get(5, 
TimeUnit.SECONDS).memberId();
+
+        protocols = new JoinGroupRequestProtocolCollection(0);
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array()));
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("roundrobin")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("bar"))).array()));
+
+        JoinGroupRequestData memberRequest = 
request.setMemberId(memberId).setProtocols(protocols);
+        memberResponseFuture = context.sendGenericGroupJoin(memberRequest);
+
+        assertFalse(memberResponseFuture.isDone());
+
+        // Leader rejoins. This completes the join group phase.
+        leaderResponseFuture = 
context.sendGenericGroupJoin(request.setMemberId(leaderId));
+
+        assertTrue(leaderResponseFuture.isDone());
+        assertTrue(memberResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), memberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(3, group.generationId());
+        assertEquals(2, group.size());
+    }
+
+    @Test
+    public void 
testJoinGroupExistingMemberDoesNotTriggerRebalanceInStableState() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderResponse = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.NONE.code(), leaderResponse.errorCode());
+        String leaderId = leaderResponse.leader();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+        assertEquals(1, group.generationId());
+
+        // Member joins.
+        CompletableFuture<JoinGroupResponseData> memberResponseFuture = 
context.sendGenericGroupJoin(request);
+        // Leader also rejoins. Completes join group phase.
+        CompletableFuture<JoinGroupResponseData> leaderResponseFuture = 
context.sendGenericGroupJoin(request
+            .setMemberId(leaderId));
+
+        assertTrue(leaderResponseFuture.isDone());
+        assertTrue(memberResponseFuture.isDone());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(2, group.size());
+        assertEquals(2, group.generationId());
+
+        String memberId = memberResponseFuture.get(5, 
TimeUnit.SECONDS).memberId();
+
+        group.transitionTo(STABLE);
+
+        // Member rejoins with no metadata changes. This does not trigger a 
rebalance.
+        memberResponseFuture = 
context.sendGenericGroupJoin(request.setMemberId(memberId));
+
+        assertTrue(memberResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), memberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(2, memberResponseFuture.get(5, 
TimeUnit.SECONDS).generationId());
+        assertTrue(group.isInState(STABLE));
+    }
+
+    @Test
+    public void testJoinGroupExistingMemberInEmptyState() throws Exception {
+        // Existing member joins a group that is in Empty/Dead state. Ask 
member to rejoin with generation id reset.
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        String memberId = response.memberId();
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(EMPTY);
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request.setMemberId(memberId));
+
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(-1, responseFuture.get(5, 
TimeUnit.SECONDS).generationId());
+    }
+
+    @Test
+    public void testCompleteJoinRemoveNotYetRejoinedDynamicMembers() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(1000)
+            .withRebalanceTimeoutMs(1000)
+            .build();
+
+        JoinGroupResponseData leaderResponse = 
context.joinGenericGroupAsDynamicMember(request);
+        assertEquals(Errors.NONE.code(), leaderResponse.errorCode());
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertEquals(1, group.generationId());
+
+        // Add new member. This triggers a rebalance.
+        CompletableFuture<JoinGroupResponseData> memberResponseFuture = 
context.sendGenericGroupJoin(request);
+        assertFalse(memberResponseFuture.isDone());
+        assertEquals(2, group.size());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Advance clock by rebalance timeout. This will expire the leader as 
it has not rejoined.
+        context.timer.advanceClock(1000);
+
+        assertTrue(memberResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), memberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(1, group.size());
+        assertTrue(group.hasMemberId(memberResponseFuture.get(5, 
TimeUnit.SECONDS).memberId()));
+        assertEquals(2, group.generationId());
+    }
+
+    @Test
+    public void testCompleteJoinPhaseInEmptyStateSkipsRebalance() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(1000)
+            .withRebalanceTimeoutMs(1000)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request,
+            false,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        );
+        assertFalse(responseFuture.isDone());
+
+        GenericGroup group = genericGroup(context, "group-id");
+        assertEquals(0, group.generationId());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+        group.transitionTo(DEAD);
+
+        // Advance clock by initial rebalance delay to complete join phase.
+        
context.timer.advanceClock(context.genericGroupInitialRebalanceDelayMs);
+        assertEquals(0, group.generationId());
+    }
+
+    @Test
+    public void testCompleteJoinPhaseNoMembersRejoinedExtendsJoinPhase() 
throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("first-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(30000)
+            .withRebalanceTimeoutMs(10000)
+            .build();
+
+        // First member joins group and completes join phase.
+        JoinGroupResponseData firstMemberResponse = 
context.joinGenericGroup(request, true, true);
+        assertEquals(Errors.NONE.code(), firstMemberResponse.errorCode());
+        String firstMemberId = firstMemberResponse.memberId();
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        // Second member joins and group goes into rebalancing state.
+        CompletableFuture<JoinGroupResponseData> secondMemberResponseFuture = 
context.sendGenericGroupJoin(request
+            .setGroupInstanceId("second-instance-id"));
+
+        // First static member rejoins and completes join phase.
+        CompletableFuture<JoinGroupResponseData> firstMemberResponseFuture = 
context.sendGenericGroupJoin(request
+            .setMemberId(firstMemberId)
+            .setGroupInstanceId("first-instance-id"));
+
+        assertTrue(firstMemberResponseFuture.isDone());
+        assertTrue(secondMemberResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), firstMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(Errors.NONE.code(), secondMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(2, group.size());
+        assertEquals(2, group.generationId());
+        
+        String secondMemberId = secondMemberResponseFuture.get(5, 
TimeUnit.SECONDS).memberId();
+
+        // Trigger a rebalance. No members rejoined.
+        context.groupMetadataManager.prepareRebalance(group, "trigger 
rebalance");
+
+        assertEquals(2, group.size());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+        assertEquals(0, group.numAwaitingJoinResponse());
+
+        // Advance clock by rebalance timeout to complete join phase. As long 
as both members have not
+        // rejoined, we extend the join phase.
+        context.timer.advanceClock(10000);
+        assertEquals(10000, 
context.timer.operationsByKey.get("join-group-id").remainingMs);
+        context.timer.advanceClock(10000);
+        assertEquals(10000, 
context.timer.operationsByKey.get("join-group-id").remainingMs);
+
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+        assertEquals(2, group.size());
+        assertEquals(2, group.generationId());
+        
+        // Let first and second member rejoin. This should complete the join 
phase.
+        firstMemberResponseFuture = context.sendGenericGroupJoin(request
+            .setMemberId(firstMemberId)
+            .setGroupInstanceId("first-instance-id"));
+
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+        assertEquals(2, group.size());
+        assertEquals(2, group.generationId());
+
+        secondMemberResponseFuture = context.sendGenericGroupJoin(request
+            .setMemberId(secondMemberId)
+            .setGroupInstanceId("second-instance-id"));
+
+        assertTrue(firstMemberResponseFuture.isDone());
+        assertTrue(secondMemberResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), firstMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(Errors.NONE.code(), secondMemberResponseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(2, group.size());
+        assertEquals(3, group.generationId());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testReplaceStaticMemberInStableStateNoError(
+        boolean supportSkippingAssignment
+    ) throws Exception {
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array())
+        );
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("group-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData response = context.joinGenericGroup(request, 
true, supportSkippingAssignment);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        String oldMemberId = response.memberId();
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        // Simulate successful sync group phase
+        group.transitionTo(STABLE);
+
+        // Static member rejoins with UNKNOWN_MEMBER_ID. This should update 
the log with the generated member id.
+        protocols = new JoinGroupRequestProtocolCollection(0);
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array())
+        );
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("roundrobin")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("bar"))).array()));
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request
+                .setProtocols(protocols)
+                .setRebalanceTimeoutMs(7000)
+                .setSessionTimeoutMs(4500),
+            true,
+            supportSkippingAssignment,
+            new ExpectedGenericGroupResult(Errors.NONE, false)
+        );
+        assertTrue(responseFuture.isDone());
+
+        String newMemberId = group.staticMemberId("group-instance-id");
+
+        JoinGroupResponseData expectedResponse = new JoinGroupResponseData()
+            .setMembers(Collections.emptyList())
+            .setLeader(oldMemberId)
+            .setMemberId(newMemberId)
+            .setGenerationId(1)
+            .setProtocolType("consumer")
+            .setProtocolName("range")
+            .setSkipAssignment(supportSkippingAssignment)
+            .setErrorCode(Errors.NONE.code());
+
+        if (supportSkippingAssignment) {
+            expectedResponse
+                .setMembers(Collections.singletonList(
+                    new JoinGroupResponseData.JoinGroupResponseMember()
+                        .setMemberId(newMemberId)
+                        .setGroupInstanceId("group-instance-id")
+                        .setMetadata(protocols.find("range").metadata())
+                    ))
+                .setLeader(newMemberId);
+        }
+
+        GenericGroupMember updatedMember = 
group.member(group.staticMemberId("group-instance-id"));
+
+        assertEquals(expectedResponse, responseFuture.get(5, 
TimeUnit.SECONDS));
+
+        assertEquals(newMemberId, updatedMember.memberId());
+        assertEquals(Optional.of("group-instance-id"), 
updatedMember.groupInstanceId());
+        assertEquals(7000, updatedMember.rebalanceTimeoutMs());
+        assertEquals(4500, updatedMember.sessionTimeoutMs());
+        assertEquals(protocols, updatedMember.supportedProtocols());
+
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(STABLE));
+    }
+
+    @Test
+    public void 
testReplaceStaticMemberInStableStateWithUpdatedProtocolTriggersRebalance() 
throws Exception {
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array())
+        );
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("roundrobin")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("bar"))).array())
+        );
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("group-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData response = context.joinGenericGroup(request, 
true, true);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        // Simulate successful sync group phase
+        group.transitionTo(STABLE);
+
+        // Static member rejoins with UNKNOWN_MEMBER_ID. The selected protocol 
changes and triggers a rebalance.
+        protocols = new JoinGroupRequestProtocolCollection(0);
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("roundrobin")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("bar"))).array())
+        );
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request.setProtocols(protocols)
+        );
+
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.NONE.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(1, group.size());
+        assertEquals(2, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+    }
+
+    @Test
+    public void testReplaceStaticMemberInStableStateErrors() throws Exception {
+        // If the append future fails, we need to revert the soft state to the 
original member.
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array())
+        );
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("group-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData response = context.joinGenericGroup(request, 
false, false);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        GenericGroupMember oldMember = group.member(response.memberId());
+
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        // Simulate successful sync group phase
+        group.transitionTo(STABLE);
+
+        // Static member rejoins with UNKNOWN_MEMBER_ID but the append fails. 
This reverts the soft state of the group.
+        protocols.add(new JoinGroupRequestProtocol()
+                .setName("roundrobin")
+                .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Collections.singletonList("bar"))).array()));
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(
+            request.setSessionTimeoutMs(6000)
+                .setRebalanceTimeoutMs(7000)
+                .setProtocols(protocols),
+            false,
+            false,
+            new ExpectedGenericGroupResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
false)
+        );
+        assertTrue(responseFuture.isDone());
+
+        JoinGroupResponseData expectedResponse = new JoinGroupResponseData()
+            .setMembers(Collections.emptyList())
+            .setLeader(oldMember.memberId())
+            .setMemberId(UNKNOWN_MEMBER_ID)
+            .setGenerationId(1)
+            .setProtocolType("consumer")
+            .setProtocolName("range")
+            .setSkipAssignment(false)
+            .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
+
+        assertEquals(expectedResponse, responseFuture.get(5, 
TimeUnit.SECONDS));
+
+        GenericGroupMember revertedMember = 
group.member(group.staticMemberId("group-instance-id"));
+
+        assertEquals(oldMember.memberId(), revertedMember.memberId());
+        assertEquals(oldMember.groupInstanceId(), 
revertedMember.groupInstanceId());
+        assertEquals(oldMember.rebalanceTimeoutMs(), 
revertedMember.rebalanceTimeoutMs());
+        assertEquals(oldMember.sessionTimeoutMs(), 
revertedMember.sessionTimeoutMs());
+        assertEquals(oldMember.supportedProtocols(), 
revertedMember.supportedProtocols());
+
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(STABLE));
+    }
+
+    @Test
+    public void 
testReplaceStaticMemberInCompletingRebalanceStateTriggersRebalance() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("group-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData response = context.joinGenericGroup(request, 
true, true);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+
+        GenericGroup group = genericGroup(context, "group-id");
+
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        // Static member rejoins with UNKNOWN_MEMBER_ID and triggers a 
rebalance.
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.NONE.code(), responseFuture.get(5, 
TimeUnit.SECONDS).errorCode());
+        assertEquals(1, group.size());
+        assertEquals(2, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+    }
+
+    private <T> void assertUnorderedListEquals(
+        List<T> expected,
+        List<T> actual
+    ) {
+        assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+    }
+
+    private void assertResponseEquals(
+        ConsumerGroupHeartbeatResponseData expected,
+        ConsumerGroupHeartbeatResponseData actual
+    ) {
+        if (!responseEquals(expected, actual)) {
+            assertionFailure()
+                .expected(expected)
+                .actual(actual)
+                .buildAndThrow();
+        }
+    }
+
+    private boolean responseEquals(
+        ConsumerGroupHeartbeatResponseData expected,
+        ConsumerGroupHeartbeatResponseData actual
+    ) {
+        if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
+        if (expected.errorCode() != actual.errorCode()) return false;
+        if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
+        if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
+        if (expected.memberEpoch() != actual.memberEpoch()) return false;
+        if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
+        if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
+        // Unordered comparison of the assignments.
+        return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
+    }
+
+    private boolean responseAssignmentEquals(
+        ConsumerGroupHeartbeatResponseData.Assignment expected,
+        ConsumerGroupHeartbeatResponseData.Assignment actual
+    ) {
+        if (expected == actual) return true;
+        if (expected == null) return false;
+        if (actual == null) return false;
+
+        if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions())))
+            return false;
+
+        return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 
fromAssignment(actual.assignedTopicPartitions()));
+    }
+
+    private Map<Uuid, Set<Integer>> fromAssignment(
+        List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment
+    ) {
+        if (assignment == null) return null;
+
+        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
+        assignment.forEach(topicPartitions -> {
+            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
+        });
+        return assignmentMap;
+    }
+
+    private void assertRecordsEquals(
+        List<Record> expectedRecords,
+        List<Record> actualRecords
+    ) {
+        try {
+            assertEquals(expectedRecords.size(), actualRecords.size());
+
+            for (int i = 0; i < expectedRecords.size(); i++) {
+                Record expectedRecord = expectedRecords.get(i);
+                Record actualRecord = actualRecords.get(i);
+                assertRecordEquals(expectedRecord, actualRecord);
+            }
+        } catch (AssertionFailedError e) {
+            assertionFailure()
+                .expected(expectedRecords)
+                .actual(actualRecords)
+                .buildAndThrow();
+        }
+    }
+
+    private void assertRecordEquals(
+        Record expected,
+        Record actual
+    ) {
+        try {
+            assertApiMessageAndVersionEquals(expected.key(), actual.key());
+            assertApiMessageAndVersionEquals(expected.value(), actual.value());
+        } catch (AssertionFailedError e) {
+            assertionFailure()
+                .expected(expected)
+                .actual(actual)
+                .buildAndThrow();
+        }
+    }
+
+    private void assertApiMessageAndVersionEquals(
+        ApiMessageAndVersion expected,
+        ApiMessageAndVersion actual
+    ) {
+        if (expected == actual) return;
+
+        assertEquals(expected.version(), actual.version());
+
+        if (actual.message() instanceof 
ConsumerGroupCurrentMemberAssignmentValue) {
+            // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
+            // always guaranteed. Therefore, we need a special comparator.
+            ConsumerGroupCurrentMemberAssignmentValue expectedValue =
+                (ConsumerGroupCurrentMemberAssignmentValue) expected.message();
+            ConsumerGroupCurrentMemberAssignmentValue actualValue =
+                (ConsumerGroupCurrentMemberAssignmentValue) actual.message();
+
+            assertEquals(expectedValue.memberEpoch(), 
actualValue.memberEpoch());
+            assertEquals(expectedValue.previousMemberEpoch(), 
actualValue.previousMemberEpoch());
+            assertEquals(expectedValue.targetMemberEpoch(), 
actualValue.targetMemberEpoch());
+            assertEquals(expectedValue.error(), actualValue.error());
+            assertEquals(expectedValue.metadataVersion(), 
actualValue.metadataVersion());
+            assertEquals(expectedValue.metadataBytes(), 
actualValue.metadataBytes());
+
+            // We transform those to Maps before comparing them.
+            
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
+                fromTopicPartitions(actualValue.assignedPartitions()));
+            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
+                
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
+            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
+                
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
+        } else {
+            assertEquals(expected.message(), actual.message());
+        }
+    }
+
+    private Map<Uuid, Set<Integer>> fromTopicPartitions(
+        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
assignment
+    ) {
+        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
+        assignment.forEach(topicPartitions -> {
+            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
         });
         return assignmentMap;
     }
+
+    private static GenericGroup genericGroup(GroupMetadataManagerTestContext 
context, String groupId) {
+        return 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(groupId, false);
+    }
+
+    private List<String> verifyGenericGroupJoinResponses(
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures,
+        int expectedSuccessCount,
+        Errors expectedFailure
+    ) {
+        int successCount = 0;
+        List<String> memberIds = new ArrayList<>();
+        for (CompletableFuture<JoinGroupResponseData> responseFuture : 
responseFutures) {
+            if (!responseFuture.isDone()) {
+                fail("All responseFutures should be completed.");
+            }
+            try {
+                if (responseFuture.get(5, TimeUnit.SECONDS).errorCode() == 
Errors.NONE.code()) {
+                    successCount++;
+                } else {
+                    assertEquals(
+                        expectedFailure.code(),
+                        responseFuture.get(5, TimeUnit.SECONDS).errorCode()
+                    );
+                }
+                memberIds.add(responseFuture.get(5, 
TimeUnit.SECONDS).memberId());
+            } catch (Exception e) {
+                fail("Unexpected exception: " + e.getMessage());
+            }
+        }
+
+        assertEquals(expectedSuccessCount, successCount);
+        return memberIds;
+    }
+
+    /**
+     * Verify the records that should be appended and complete the append 
future based on a
+     * configured error. Run any assertions to verify the result of the future 
completion.
+     *
+     * @param expectedResult  The expected result to compare against.
+     * @param result          The result from expiring a join/heartbeat/sync 
operation.
+     */
+    private static void verifyCoordinatorResult(
+        ExpectedGenericGroupResult expectedResult,
+        CoordinatorResult<Void, Record> result
+    ) {
+        if (expectedResult == null) {
+            assertEquals(EMPTY_RESULT, result);
+        } else {
+            assertEquals(expectedResult.records, result.records());
+            if (expectedResult.mockError == Errors.NONE) {
+                result.appendFuture().complete(null);
+            } else {
+                
result.appendFuture().completeExceptionally(expectedResult.mockError.exception());
+            }

Review Comment:
   removed this method, and now individual tests do the validation. 
   
   one exception is for timer operation expirations - as the majority of the 
cases will not result in any records, i have done the validation inside 
MockCoordinatorTimer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to