showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r923994624


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:
+        // 1. offset commit haven't done (and joinPrepareTime not expired)
+        // 2. failed with retryable exception (and joinPrepareTime not expired)

Review Comment:
   nit: joinPrepareTime -> joinPrepareTime[r]



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:

Review Comment:
    keep retrying the offset commit when: ->  [K]eep retrying[/waiting] the 
offset commit when:



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);

Review Comment:
   I think we should add a comment above L752, ex:
   `// We should complete onJoinPrepare before rebalanceTimeout, and continue 
to join group to avoid member got kicked out from group`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -809,11 +850,13 @@ else if (future.failed() && !future.isRetriable()) {
 
         isLeader = false;
         subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;

Review Comment:
   We should update timer before return.
   `timer.update();`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void 
testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);

Review Comment:
   This retry behavior is already tested in 
`testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry`, right? We can 
remove it here, and test non-retriable exception directly. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);

Review Comment:
   1. Why should we do `time.sleep(150)` each time?
   2. There are some unnecessary codes. We can simplify them into below codes:
   
   ```java
           Timer pollTimer = time.timer(100L);
           client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
           boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
           assertFalse(res);
   
           pollTimer = time.timer(100L);
           client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.NONE)));
           res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
           assertTrue(res);
   ```
   
   Same comments apply to below new added tests



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void 
testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void 
testOnJoinPrepareWithInflightCommitOffestShouldKeepJoinAfterRebalanceTimeout() {

Review Comment:
   I know why we should assert `coordinatorUnknown` at the end. Sorry, I didn't 
know that. So, I think we should remove this test because if the coordinator is 
unknown, we'll never complete the rebalance, which is not what we want to test. 
Please remove it. Sorry and thanks.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void 
testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void 
testOnJoinPrepareWithInflightCommitOffestShouldKeepJoinAfterRebalanceTimeout() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(100);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(rebalanceTimeoutMs);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        // commit offset should timeout and mark coordinator unknown
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void 
testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);

Review Comment:
   duplicated test.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1251,6 +1252,83 @@ public void 
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
         assertFalse(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testPatternSubscribeAndRejoinGroupAfterTopicDelete() {

Review Comment:
   What do we want to test in this test? Is it related to our change?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+    "org.apache.kafka.clients.consumer.RangeAssignor"))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"rebalance-and-rejoin-group")
+    
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
+        if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+        }
+        try {
+          generationId1 = consumer1.groupMetadata().generationId()
+          memberId1 = consumer1.groupMetadata().memberId()
+        } finally {
+          lock.unlock()
+        }
+      }
+    }
+    val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), 
Set.empty, customRebalanceListener)
+    consumerPoller1.start()
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == 
expectedAssignment,
+      s"Timed out while awaiting expected assignment change to 
$expectedAssignment.")
+
+    // Since the consumer1 already completed the rebalance,
+    // the `onPartitionsAssigned` rebalance listener will be invoked to set 
the generationId and memberId
+    var stableGeneration = -1
+    var stableMemberId1 = ""
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      stableGeneration = generationId1
+      stableMemberId1 = memberId1
+    } finally {
+      lock.unlock()
+    }
+
+    val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, 
List(topic))
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 
1,
+      s"Timed out while awaiting expected assignment change to 1.")

Review Comment:
   nit: Timed out while awaiting expected assignment [size] change to 1."
   Same comments applied to next line.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));

Review Comment:
   Why do we need `groupInstanceId`? Could we use class variable: 
`rebalanceConfig` directly here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -191,7 +191,7 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
      * @param memberId The identifier of this member in the previous group or 
"" if there was none
      * @return true If onJoinPrepare async commit succeeded, false otherwise
      */
-    protected abstract boolean onJoinPrepare(int generation, String memberId);
+    protected abstract boolean onJoinPrepare(Timer timer, int generation, 
String memberId);

Review Comment:
   We should also add param `timer` into javadoc above



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();

Review Comment:
   Sorry, I think we don't need to keep updating `timer`, since we don't check 
timer expiration within `onJoinPrepare`. We can update it once at the end of 
`onJoinPrepare`.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+    "org.apache.kafka.clients.consumer.RangeAssignor"))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"rebalance-and-rejoin-group")
+    
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
+        if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+        }
+        try {
+          generationId1 = consumer1.groupMetadata().generationId()
+          memberId1 = consumer1.groupMetadata().memberId()
+        } finally {
+          lock.unlock()
+        }
+      }
+    }
+    val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), 
Set.empty, customRebalanceListener)
+    consumerPoller1.start()
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == 
expectedAssignment,
+      s"Timed out while awaiting expected assignment change to 
$expectedAssignment.")
+
+    // Since the consumer1 already completed the rebalance,
+    // the `onPartitionsAssigned` rebalance listener will be invoked to set 
the generationId and memberId
+    var stableGeneration = -1
+    var stableMemberId1 = ""
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      stableGeneration = generationId1
+      stableMemberId1 = memberId1
+    } finally {
+      lock.unlock()
+    }
+
+    val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, 
List(topic))
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 
1,
+      s"Timed out while awaiting expected assignment change to 1.")
+    TestUtils.waitUntilTrue(() => consumerPoller2.consumerAssignment().size == 
1,
+      s"Timed out while awaiting expected assignment change to 1.")
+
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      if 
(assignmentStrategy.equals(classOf[CooperativeStickyAssignor].getName)) {
+        // cooperative rebalance should rebalance twice before finally stable
+        assertEquals(stableGeneration + 2, generationId1)
+      } else {
+        // eager rebalance should rebalance once once before finally stable

Review Comment:
   nit: additional `once`: eager rebalance should rebalance once [once] before 
finally stable



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:
+        // 1. offset commit haven't done (and joinPrepareTime not expired)
+        // 2. failed with retryable exception (and joinPrepareTime not expired)
+        // Otherwise, continue to revoke partitions, ex:
+        // 1. if joinPrepareTime has expired
+        // 2. if offset commit failed with no-retryable exception
+        // 3. if offset commit success
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: 
joinPrepare timeout. Will continue to join group");
+            } else if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+                log.debug("Asynchronous auto-commit of offsets failed with 
retryable error: {}. Will retry it.",
+                          
autoCommitOffsetRequestFuture.exception().getMessage());
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && 
!autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}. 
Will continue to join group.",
+                          
autoCommitOffsetRequestFuture.exception().getMessage());
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(Math.min(timer.remainingMs(), 
rebalanceConfig.retryBackoffMs));

Review Comment:
   I think we should sleep with `pollTimer` here, otherwise, if 
`joinPrepareTimer.remainingMs()` is less than timer.remainingMs() and 
rebalanceConfig.retryBackoffMs, we'll expire it, right? That is:
   ```
   pollTimer.sleep(Math.min(pollTimer.remainingMs(), 
rebalanceConfig.retryBackoffMs));
   timer.update();
   return false;
   ```
   Also, since we won't use `joinPrepareTimer` until next time entering 
`onJoinPrepare`, and we already update `joinPrepareTimer` when entering 
`onJoinPrepare`, so we don't need to update it here. Does that make sense?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));

Review Comment:
   Why can't we call `prepareCoordinatorForCloseTest` here? I think the only 
difference is we don't `coordinator.poll`, right? If so, I think we can add a 
parameter into `prepareCoordinatorForCloseTest` to determine if we need to poll 
or not, otherwise, there are many duplicated codes here. Ex:
   ```java
   private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean 
useGroupManagement,
                                                                  final boolean 
autoCommit,
                                                                  final 
Optional<String> groupInstanceId,
                                                                  final 
shouldPoll) {
   
   ...
   if (shouldPoll) {
     coordinator.poll(time.timer(Long.MAX_VALUE));
   }
   }
   ```
   
   WDYT?
   
   Same comments applies to below new added tests.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));

Review Comment:
   Also, since we created a new coordinator, we should close it at the end of 
test. That's If you agree with my above suggestion, we can do like this to 
close it:
   ```java
   try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(...)) {
   
   
   }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare 
haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = 
joinPrepareTimer.isExpired();
+            } else if (autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", 
autoCommitOffsetRequestFuture.exception().getMessage());
+            } else if (joinPrepareTimer != null && 
joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: 
joinPrepare timeout");
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(rebalanceConfig.retryBackoffMs);

Review Comment:
   > Since we cannot ensure that UnknownTopicOrPartitionException is caused by 
topic deletion(as said in 
[KAFKA-13310](https://issues.apache.org/jira/browse/KAFKA-13310)) , do you 
think wait rebalanceTimeout if commit offset failed is acceptable here?
   
   I think since we can't ensure what causes `UnknownTopicOrPartitionException` 
or other exception here, we should just retry until timeout here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to