lianetm commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1431498965


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -262,6 +328,7 @@ public void testFencingWhenStateIsReconciling() {
      * scenarios), and when the PREPARE_LEAVING completes it remains 
UNSUBSCRIBED (no last
      * heartbeat sent).
      */
+    @Disabled("Temporarily disabled waiting for callbacks for blocking the 
transition")

Review Comment:
   Definitely, all re-enabled.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -228,28 +242,130 @@ public void 
testAutocommit_ResendAutocommitAfterException() {
                 Errors.NONE));
     }
 
+    // This is the case of the sync auto commit sent when the consumer is 
being closed (sync commit
+    // that should be retried until it succeeds, fails, or timer expires).
+    @ParameterizedTest
+    @MethodSource("offsetCommitExceptionSupplier")
+    public void testSyncAutocommit_RetriedAfterRetriableException(Errors 
error) {
+        long commitInterval = retryBackoffMs * 2;
+        CommitRequestManager commitRequestManger = create(true, 
commitInterval);
+        TopicPartition tp = new TopicPartition("topic", 1);
+        subscriptionState.assignFromUser(Collections.singleton(tp));
+        subscriptionState.seek(tp, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        time.sleep(commitInterval);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+
+        // Auto-commit all consume sync (ex. triggered when the consumer is 
closed).
+        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
+        CompletableFuture<Void> commitResult =
+            
commitRequestManger.maybeAutoCommitAllConsumedNow(Optional.of(expirationTimeMs),
 false);
+        
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManger, 
error, commitResult);
+
+        // We expect that request should have been retried on this sync commit.
+        assertExceptionHandling(commitRequestManger, error, true);
+        assertCoordinatorDisconnect(error);
+    }
+
+    @Test
+    public void 
testOffsetCommitFailsWithCommitFailedExceptionIfUnknownMemberId() {
+        long commitInterval = retryBackoffMs * 2;
+        CommitRequestManager commitRequestManger = create(true, 
commitInterval);
+        TopicPartition tp = new TopicPartition("topic", 1);
+        subscriptionState.assignFromUser(Collections.singleton(tp));
+        subscriptionState.seek(tp, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        time.sleep(commitInterval);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+
+        // Auto-commit all consume sync (ex. triggered when the consumer is 
closed).
+        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
+        CompletableFuture<Void> commitResult =
+            
commitRequestManger.maybeAutoCommitAllConsumedNow(Optional.of(expirationTimeMs),
 false);
+
+        completeOffsetCommitRequestWithError(commitRequestManger, 
Errors.UNKNOWN_MEMBER_ID);
+        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
+        assertEquals(0, res.unsentRequests.size());
+        // Commit should fail with CommitFailedException
+        assertTrue(commitResult.isDone());
+        Throwable t = assertThrows(ExecutionException.class, () -> 
commitResult.get());
+        assertEquals(CommitFailedException.class, t.getCause().getClass());

Review Comment:
   Of course! Updated here and in all other places that used the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to