lianetm commented on code in PR #14557: URL: https://github.com/apache/kafka/pull/14557#discussion_r1431498965
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -262,6 +328,7 @@ public void testFencingWhenStateIsReconciling() { * scenarios), and when the PREPARE_LEAVING completes it remains UNSUBSCRIBED (no last * heartbeat sent). */ + @Disabled("Temporarily disabled waiting for callbacks for blocking the transition") Review Comment: Definitely, all re-enabled. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -228,28 +242,130 @@ public void testAutocommit_ResendAutocommitAfterException() { Errors.NONE)); } + // This is the case of the sync auto commit sent when the consumer is being closed (sync commit + // that should be retried until it succeeds, fails, or timer expires). + @ParameterizedTest + @MethodSource("offsetCommitExceptionSupplier") + public void testSyncAutocommit_RetriedAfterRetriableException(Errors error) { + long commitInterval = retryBackoffMs * 2; + CommitRequestManager commitRequestManger = create(true, commitInterval); + TopicPartition tp = new TopicPartition("topic", 1); + subscriptionState.assignFromUser(Collections.singleton(tp)); + subscriptionState.seek(tp, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + time.sleep(commitInterval); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + + // Auto-commit all consume sync (ex. triggered when the consumer is closed). + long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; + CompletableFuture<Void> commitResult = + commitRequestManger.maybeAutoCommitAllConsumedNow(Optional.of(expirationTimeMs), false); + sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManger, error, commitResult); + + // We expect that request should have been retried on this sync commit. + assertExceptionHandling(commitRequestManger, error, true); + assertCoordinatorDisconnect(error); + } + + @Test + public void testOffsetCommitFailsWithCommitFailedExceptionIfUnknownMemberId() { + long commitInterval = retryBackoffMs * 2; + CommitRequestManager commitRequestManger = create(true, commitInterval); + TopicPartition tp = new TopicPartition("topic", 1); + subscriptionState.assignFromUser(Collections.singleton(tp)); + subscriptionState.seek(tp, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + time.sleep(commitInterval); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + + // Auto-commit all consume sync (ex. triggered when the consumer is closed). + long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; + CompletableFuture<Void> commitResult = + commitRequestManger.maybeAutoCommitAllConsumedNow(Optional.of(expirationTimeMs), false); + + completeOffsetCommitRequestWithError(commitRequestManger, Errors.UNKNOWN_MEMBER_ID); + NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); + assertEquals(0, res.unsentRequests.size()); + // Commit should fail with CommitFailedException + assertTrue(commitResult.isDone()); + Throwable t = assertThrows(ExecutionException.class, () -> commitResult.get()); + assertEquals(CommitFailedException.class, t.getCause().getClass()); Review Comment: Of course! Updated here and in all other places that used the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org