showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r922938170


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1413,10 +1458,55 @@ public void testOnJoinPrepareWithOffsetCommit() throws 
InterruptedException {
         time.sleep(150);
         res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
         assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void 
testOnJoinPrepareWithOffsetCommit_KeepJoinAfterRebalanceTimeout() {

Review Comment:
   nit: In Kafka, we usually don't have underscore `_` in the test name. Could 
we replace it with `should`, ex: 
`testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout()`, and 
other tests also apply. Thanks.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1413,10 +1458,55 @@ public void testOnJoinPrepareWithOffsetCommit() throws 
InterruptedException {
         time.sleep(150);
         res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
         assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void 
testOnJoinPrepareWithOffsetCommit_KeepJoinAfterRebalanceTimeout() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
 
         pollTimer = time.timer(100L);
         time.sleep(150);
-        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(rebalanceTimeoutMs);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));

Review Comment:
   Could we add a test and only testing `KeepJoinAfterRebalanceTimeout`? That 
is, 
   ```
           pollTimer = time.timer(100L);
           time.sleep(rebalanceTimeoutMs);
           // no offset commit response
           res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
           // still return true
           assertTrue(res);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to