lianetm commented on code in PR #14557: URL: https://github.com/apache/kafka/pull/14557#discussion_r1431496640
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -190,63 +183,95 @@ private static long findMinTime(final Collection<? extends RequestState> request * has elapsed. * * @param offsets Offsets to commit + * @param expirationTimeMs Time until which the request will continue to be retried if it fails + * with a retriable error. If not present, the request will be sent + * but not retried. * @return Future that will complete when a response is received for the request, or a * completed future if no request is generated. */ - public CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) { - if (!canAutoCommit()) { + private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, + final Optional<Long> expirationTimeMs, + boolean checkInterval, + boolean retryOnStaleEpoch) { + if (!autoCommitEnabled()) { + log.debug("Skipping auto-commit because auto-commit config is not enabled."); return CompletableFuture.completedFuture(null); } AutoCommitState autocommit = autoCommitState.get(); - if (!autocommit.shouldAutoCommit()) { + if (checkInterval && !autocommit.shouldAutoCommit()) { return CompletableFuture.completedFuture(null); } - CompletableFuture<Void> result = sendAutoCommit(offsets); + CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) + .whenComplete(autoCommitCallback(offsets)); autocommit.resetTimer(); autocommit.setInflightCommitStatus(true); return result; } /** * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. + * partitions and their current positions. Note on auto-commit timers: this will reset the + * auto-commit timer to the interval before issuing the async commit, and when the async commit + * completes, it will reset the auto-commit timer with the exponential backoff if the request + * failed with a retriable error. * * @return Future that will complete when a response is received for the request, or a * completed future if no request is generated. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumed() { - return maybeAutoCommit(subscriptions.allConsumed()); - } + public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { + if (!autoCommitEnabled()) { + // Early return to ensure that no action/logging is performed. + return CompletableFuture.completedFuture(null); + } + Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); + CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); + result.whenComplete((__, error) -> { + if (error != null) { + if (error instanceof RetriableCommitFailedException) { + log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); + resetAutoCommitTimer(retryBackoffMs); + } else { + log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + } + } else { + log.debug("Completed asynchronous auto-commit of offsets {}", offsets); + } + }); - boolean canAutoCommit() { - return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); + return result; } /** - * Updates the member ID and epoch upon receiving ConsumerGroupHeartbeatResponse. + * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, + * until the request succeeds or fails with a fatal error. */ - void updateMemberInformation(String memberId, int memberEpoch) { - groupState.generation = new GroupState.Generation(memberEpoch, memberId, null); + public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( + final Optional<Long> expirationTimeMs, + final boolean retryOnStaleEpoch) { + return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + } Review Comment: You got it right, the `checkInterval` false param is the one allowing the exception for the auto-commit on rebalance, that will be always sent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org