lucasbru commented on code in PR #14557: URL: https://github.com/apache/kafka/pull/14557#discussion_r1425393938
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -1104,8 +1137,21 @@ private void clearPendingAssignmentsAndLocalNamesCache() { } private void resetEpoch() { - this.memberEpoch = ConsumerGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH; - commitRequestManager.updateMemberInformation(this.memberId, this.memberEpoch); + updateMemberEpoch(ConsumerGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH); + } + + private void updateMemberEpoch(int newEpoch) { + boolean newEpochReceived = this.memberEpoch != newEpoch; + this.memberEpoch = newEpoch; + // Simply notify based on epoch change only, given that the member will never receive a + // new member ID without an epoch (member ID is only assigned when it joins the group). + if (newEpochReceived) { + if (memberEpoch > 0) { + notifyEpochChange(Optional.ofNullable(memberEpoch), Optional.ofNullable(memberId)); Review Comment: memberEpoch cannot be null ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -624,7 +626,15 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo } // Visible for testing - CompletableFuture<Void> commit(final Map<TopicPartition, OffsetAndMetadata> offsets, final boolean isWakeupable) { + CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Review Comment: Let's not add another "visible for testing" method just to save one argument. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -741,61 +767,86 @@ boolean reconcile() { revokedPartitions ); - CompletableFuture<Void> revocationResult; - if (!revokedPartitions.isEmpty()) { - revocationResult = revokePartitions(revokedPartitions); + // Commit offsets if auto-commit enabled before reconciling a new assignment. Request will + // be retried until it succeeds, fails with non-retriable error, or timer expires. + CompletableFuture<Void> commitResult; + + if (commitRequestManager.autoCommitEnabled()) { Review Comment: `autoCommitAllConsumedNow` checks autoCommitEnabled internally, so we should be able to eliminate this if ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java: ########## @@ -18,17 +18,36 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Timer; import java.util.Collections; import java.util.Map; +import java.util.Optional; public class CommitApplicationEvent extends CompletableApplicationEvent<Void> { + /** + * Offsets to commit per partition. + */ private final Map<TopicPartition, OffsetAndMetadata> offsets; - public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets) { + /** + * Timer to wait for a response, retrying on retriable errors. If not present, the request is + * triggered without waiting for a response or being retried. + */ + private final Optional<Timer> timer; Review Comment: Hmm, we are sending a mutable object between threads that will then be accessed by both threads. Even if that works out in the current code, I'd still prefer to just send the deadline and instantiate a new timer in the background thread so that we don't run into hairy concurrency bugs later on. `Timer` isn't thread safe ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -190,63 +179,90 @@ private static long findMinTime(final Collection<? extends RequestState> request * has elapsed. * * @param offsets Offsets to commit + * @param timer Time to continue retrying the request if it fails with a retriable error. If + * not present, the request will be sent but not retried. * @return Future that will complete when a response is received for the request, or a * completed future if no request is generated. */ - public CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) { - if (!canAutoCommit()) { + private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, + final Optional<Timer> timer, + boolean checkInterval) { + if (!autoCommitState.isPresent()) { + log.debug("Skipping auto-commit because auto-commit config is not enabled."); return CompletableFuture.completedFuture(null); } AutoCommitState autocommit = autoCommitState.get(); - if (!autocommit.shouldAutoCommit()) { + if (checkInterval && !autocommit.shouldAutoCommit()) { + log.debug("Skipping auto-commit, remaining time {}", autocommit.timer.remainingMs()); Review Comment: This is run in every iteration of `poll`, so we should make this a `trace` log message ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -780,6 +977,28 @@ List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) { return Collections.unmodifiableList(unsentRequests); } + /** + * Find the unsent commit requests that have expired, remove them and complete their + * futures with a TimeoutException. + */ + private void failAndRemoveExpiredCommitRequests() { + List<OffsetCommitRequestState> expiredRequests = unsentOffsetCommits.stream() + .filter(req -> req.isExpired()).collect(Collectors.toList()); Review Comment: nit: `OffsetCommitRequestState::isExpired` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -780,6 +977,28 @@ List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) { return Collections.unmodifiableList(unsentRequests); } + /** + * Find the unsent commit requests that have expired, remove them and complete their + * futures with a TimeoutException. + */ + private void failAndRemoveExpiredCommitRequests() { + List<OffsetCommitRequestState> expiredRequests = unsentOffsetCommits.stream() + .filter(req -> req.isExpired()).collect(Collectors.toList()); + unsentOffsetCommits.removeAll(expiredRequests); + expiredRequests.forEach(OffsetCommitRequestState::expire); + } + + /** + * Find the unsent fetch requests that have expired, remove them and complete their + * futures with a TimeoutException. + */ + private void failAndRemoveExpiredFetchRequests() { + List<OffsetFetchRequestState> expiredFetchRequests = unsentOffsetFetches.stream() + .filter(req -> req.isExpired()).collect(Collectors.toList()); Review Comment: nit: `OffsetCommitRequestState::isExpired` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -741,61 +767,86 @@ boolean reconcile() { revokedPartitions ); - CompletableFuture<Void> revocationResult; - if (!revokedPartitions.isEmpty()) { - revocationResult = revokePartitions(revokedPartitions); + // Commit offsets if auto-commit enabled before reconciling a new assignment. Request will + // be retried until it succeeds, fails with non-retriable error, or timer expires. + CompletableFuture<Void> commitResult; + + if (commitRequestManager.autoCommitEnabled()) { + // TODO: review auto commit time boundary. This will be effectively bounded by the Review Comment: Do we have need a ticket for this? Not sure what it means. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -1077,4 +1131,18 @@ public void onUpdate(ClusterResource clusterResource) { reconcile(); } } + + /** + * Register a new listener that will be invoked whenever the member state changes, or a new + * member ID or epoch is received. + * + * @param listener Listener to invoke. + */ + @Override + public void registerStateListener(MemberStateListener listener) { + if (listener == null) { + throw new IllegalArgumentException("State updates listener cannot be null"); + } + this.stateUpdatesListeners.add(listener); + } Review Comment: I'd say YAGNI ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -190,63 +179,90 @@ private static long findMinTime(final Collection<? extends RequestState> request * has elapsed. * * @param offsets Offsets to commit + * @param timer Time to continue retrying the request if it fails with a retriable error. If + * not present, the request will be sent but not retried. * @return Future that will complete when a response is received for the request, or a * completed future if no request is generated. */ - public CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) { - if (!canAutoCommit()) { + private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, + final Optional<Timer> timer, + boolean checkInterval) { + if (!autoCommitState.isPresent()) { Review Comment: could use `autoCommitEnabled` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -190,63 +179,90 @@ private static long findMinTime(final Collection<? extends RequestState> request * has elapsed. * * @param offsets Offsets to commit + * @param timer Time to continue retrying the request if it fails with a retriable error. If + * not present, the request will be sent but not retried. * @return Future that will complete when a response is received for the request, or a * completed future if no request is generated. */ - public CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) { - if (!canAutoCommit()) { + private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, + final Optional<Timer> timer, + boolean checkInterval) { + if (!autoCommitState.isPresent()) { + log.debug("Skipping auto-commit because auto-commit config is not enabled."); return CompletableFuture.completedFuture(null); } AutoCommitState autocommit = autoCommitState.get(); - if (!autocommit.shouldAutoCommit()) { + if (checkInterval && !autocommit.shouldAutoCommit()) { + log.debug("Skipping auto-commit, remaining time {}", autocommit.timer.remainingMs()); return CompletableFuture.completedFuture(null); } - CompletableFuture<Void> result = sendAutoCommit(offsets); + CompletableFuture<Void> result = addOffsetCommitRequest(offsets, timer).whenComplete(autoCommitCallback(offsets)); autocommit.resetTimer(); autocommit.setInflightCommitStatus(true); return result; } /** * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. + * partitions and their current positions. Note on auto-commit timers: this will reset the + * auto-commit timer to the interval before issuing the async commit, and when the async commit + * completes, it will reset the auto-commit timer with the exponential backoff if the request + * failed with a retriable error. * * @return Future that will complete when a response is received for the request, or a * completed future if no request is generated. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumed() { - return maybeAutoCommit(subscriptions.allConsumed()); - } + public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { + if (!autoCommitEnabled()) { + // Early return to ensure that no action/logging is performed. + return CompletableFuture.completedFuture(null); + } + Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); + CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true); + result.whenComplete((__, error) -> { + if (error != null) { + if (error instanceof RetriableCommitFailedException) { + log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); + resetAutoCommitTimer(retryBackoffMs); + } else { + log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + } + } else { + log.debug("Completed asynchronous auto-commit of offsets {}", offsets); + } + }); - boolean canAutoCommit() { - return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); + return result; } /** - * Updates the member ID and epoch upon receiving ConsumerGroupHeartbeatResponse. + * Commit consumed offsets it auto-commit is enabled. Retry while the timer is not expired, + * until the request succeeds or fails with a fatal error. */ - void updateMemberInformation(String memberId, int memberEpoch) { - groupState.generation = new GroupState.Generation(memberEpoch, memberId, null); + public CompletableFuture<Void> autoCommitAllConsumedNow(Optional<Timer> timer) { Review Comment: I suppose to be consistent, this shoukld be called `maybeAutoCommitAllConsumedNow` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -457,120 +535,224 @@ public void onResponse(final ClientResponse response) { } } - private void handleRetriableError(Errors error, ClientResponse response) { - if (error == COORDINATOR_NOT_AVAILABLE || - error == NOT_COORDINATOR || - error == REQUEST_TIMED_OUT) { - coordinatorRequestManager.markCoordinatorUnknown(error.message(), response.receivedTimeMs()); + /** + * Enqueue the request to be retried with exponential backoff. This will fail the request + * without retrying if the request timer expired. + */ + @Override + void retry(long currentTimeMs, Throwable throwable) { + if (!requestTimer.isPresent() || requestTimer.get().isExpired()) { Review Comment: Do you ever update the requestTimer with the currentTime? This would be a good opportunity to do it. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -265,6 +280,8 @@ public MembershipManagerImpl(String groupId, this.assignmentReadyToReconcile = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); this.currentAssignment = new HashMap<>(); this.log = logContext.logger(MembershipManagerImpl.class); + this.stateUpdatesListeners = new ArrayList<>(); + this.time = Time.SYSTEM; Review Comment: You probably want to unit-test this class with mocktime instead of system time, so maybe pass this from the outside. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -457,120 +535,224 @@ public void onResponse(final ClientResponse response) { } } - private void handleRetriableError(Errors error, ClientResponse response) { - if (error == COORDINATOR_NOT_AVAILABLE || - error == NOT_COORDINATOR || - error == REQUEST_TIMED_OUT) { - coordinatorRequestManager.markCoordinatorUnknown(error.message(), response.receivedTimeMs()); + /** + * Enqueue the request to be retried with exponential backoff. This will fail the request + * without retrying if the request timer expired. + */ + @Override + void retry(long currentTimeMs, Throwable throwable) { + if (!requestTimer.isPresent() || requestTimer.get().isExpired()) { + // Fail requests that had no timer (async requests), or for which the timer expired. + future.completeExceptionally(throwable); + return; } - } - private void retry(final long currentTimeMs) { + // Enqueue request to be retried with backoff. Note that this maintains the same + // timer of the initial request, so all the retries are time-bounded. onFailedAttempt(currentTimeMs); pendingRequests.addOffsetCommitRequest(this); } - private void handleFatalError(final Errors error) { - switch (error) { - case GROUP_AUTHORIZATION_FAILED: - future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); - break; - case OFFSET_METADATA_TOO_LARGE: - case INVALID_COMMIT_OFFSET_SIZE: - future.completeExceptionally(error.exception()); - break; - case FENCED_INSTANCE_ID: - log.info("OffsetCommit failed due to group instance id {} fenced: {}", groupInstanceId, error.message()); - future.completeExceptionally(new CommitFailedException()); - break; - case UNKNOWN_MEMBER_ID: - log.info("OffsetCommit failed due to unknown member id memberId {}: {}", null, error.message()); - future.completeExceptionally(error.exception()); - break; - default: - future.completeExceptionally(new KafkaException("Unexpected error in commit: " + error.message())); - break; + boolean isExpired() { + return requestTimer.isPresent() && requestTimer.get().isExpired(); + } + + void expire() { Review Comment: Could be private ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -457,120 +535,224 @@ public void onResponse(final ClientResponse response) { } } - private void handleRetriableError(Errors error, ClientResponse response) { - if (error == COORDINATOR_NOT_AVAILABLE || - error == NOT_COORDINATOR || - error == REQUEST_TIMED_OUT) { - coordinatorRequestManager.markCoordinatorUnknown(error.message(), response.receivedTimeMs()); + /** + * Enqueue the request to be retried with exponential backoff. This will fail the request + * without retrying if the request timer expired. + */ + @Override + void retry(long currentTimeMs, Throwable throwable) { + if (!requestTimer.isPresent() || requestTimer.get().isExpired()) { + // Fail requests that had no timer (async requests), or for which the timer expired. + future.completeExceptionally(throwable); + return; } - } - private void retry(final long currentTimeMs) { + // Enqueue request to be retried with backoff. Note that this maintains the same + // timer of the initial request, so all the retries are time-bounded. onFailedAttempt(currentTimeMs); pendingRequests.addOffsetCommitRequest(this); } - private void handleFatalError(final Errors error) { - switch (error) { - case GROUP_AUTHORIZATION_FAILED: - future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); - break; - case OFFSET_METADATA_TOO_LARGE: - case INVALID_COMMIT_OFFSET_SIZE: - future.completeExceptionally(error.exception()); - break; - case FENCED_INSTANCE_ID: - log.info("OffsetCommit failed due to group instance id {} fenced: {}", groupInstanceId, error.message()); - future.completeExceptionally(new CommitFailedException()); - break; - case UNKNOWN_MEMBER_ID: - log.info("OffsetCommit failed due to unknown member id memberId {}: {}", null, error.message()); - future.completeExceptionally(error.exception()); - break; - default: - future.completeExceptionally(new KafkaException("Unexpected error in commit: " + error.message())); - break; + boolean isExpired() { Review Comment: could be private ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -741,61 +767,86 @@ boolean reconcile() { revokedPartitions ); - CompletableFuture<Void> revocationResult; - if (!revokedPartitions.isEmpty()) { - revocationResult = revokePartitions(revokedPartitions); + // Commit offsets if auto-commit enabled before reconciling a new assignment. Request will + // be retried until it succeeds, fails with non-retriable error, or timer expires. + CompletableFuture<Void> commitResult; + + if (commitRequestManager.autoCommitEnabled()) { + // TODO: review auto commit time boundary. This will be effectively bounded by the + // rebalance timeout. + commitResult = commitRequestManager.autoCommitAllConsumedNow(Optional.of(time.timer(Long.MAX_VALUE))); } else { - revocationResult = CompletableFuture.completedFuture(null); - // Reschedule the auto commit starting from now (new assignment received without any - // revocation). - commitRequestManager.resetAutoCommitTimer(); + commitResult = CompletableFuture.completedFuture(null); } - // Future that will complete when the full reconciliation process completes (revocation - // and assignment, executed sequentially). - CompletableFuture<Void> reconciliationResult = + // Execute commit -> onPartitionsRevoked -> onPartitionsAssigned. + commitResult.whenComplete((commitReqResult, commitReqError) -> { Review Comment: Nesting futures inside futures is something we want to avoid. Can you try to structure it something like this? ``` commitResult .handle((commitReqResult, commitReqError) -> { // log errors }) .thenCompose(__ -> { // revoke partitions }) .thenCompose(__ -> { // reconcile }) .whenComplete((result, error) -> { markReconciliationCompleted(); ... ``` This would also be a good opportunity to extract things into separate methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org