d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468756144



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -814,10 +821,10 @@ private void validateOffsetsAsync(Map<TopicPartition, 
FetchPosition> partitionsT
                 public void onSuccess(OffsetForEpochResult offsetsResult) {
                     List<SubscriptionState.LogTruncation> truncations = new 
ArrayList<>();
                     if (!offsetsResult.partitionsToRetry().isEmpty()) {
-                        
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
+                        
subscriptions.requestFailed(offsetsResult.partitionsToRetry(), 
time.milliseconds());
                         metadata.requestUpdate();
                     }
-
+                    
subscriptions.requestSucceeded(offsetsResult.endOffsets().keySet(), 
time.milliseconds());

Review comment:
       Yes. It excludes partitions to retry.
   ```
   switch (error) {
                   case NONE:
                       logger().debug("Handling OffsetsForLeaderEpoch response 
for {}. Got offset {} for epoch {}",
                               topicPartition, epochEndOffset.endOffset(), 
epochEndOffset.leaderEpoch());
                       endOffsets.put(topicPartition, epochEndOffset);
                       break;
                   case UNKNOWN_LEADER_EPOCH:
                       logger().debug("Attempt to fetch offsets for partition 
{} failed due to {}, retrying.",
                               topicPartition, error);
                       partitionsToRetry.add(topicPartition);
                       break;
                   case UNKNOWN_TOPIC_OR_PARTITION:
                       logger().warn("Received unknown topic or partition error 
in OffsetsForLeaderEpoch request for partition {}",
                               topicPartition);
                       partitionsToRetry.add(topicPartition);
                       break;
                   case TOPIC_AUTHORIZATION_FAILED:
                       unauthorizedTopics.add(topicPartition.topic());
                       break;
                   default:
                       logger().warn("Attempt to fetch offsets for partition {} 
failed due to: {}, retrying.",
                               topicPartition, error.message());
                       partitionsToRetry.add(topicPartition);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to