d8tltanc commented on a change in pull request #8846: URL: https://github.com/apache/kafka/pull/8846#discussion_r468756144
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -814,10 +821,10 @@ private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsT public void onSuccess(OffsetForEpochResult offsetsResult) { List<SubscriptionState.LogTruncation> truncations = new ArrayList<>(); if (!offsetsResult.partitionsToRetry().isEmpty()) { - subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs); + subscriptions.requestFailed(offsetsResult.partitionsToRetry(), time.milliseconds()); metadata.requestUpdate(); } - + subscriptions.requestSucceeded(offsetsResult.endOffsets().keySet(), time.milliseconds()); Review comment: Yes. It excludes partitions to retry. ``` switch (error) { case UNKNOWN_LEADER_EPOCH: logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", topicPartition, error); partitionsToRetry.add(topicPartition); break; case UNKNOWN_TOPIC_OR_PARTITION: logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}", topicPartition); partitionsToRetry.add(topicPartition); break; case TOPIC_AUTHORIZATION_FAILED: unauthorizedTopics.add(topicPartition.topic()); break; default: logger().warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.", topicPartition, error.message()); partitionsToRetry.add(topicPartition); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org