hachikuji commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r434725093
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -822,19 +822,25 @@ public void onSuccess(OffsetForEpochResult offsetsResult) { FetchPosition requestPosition = fetchPositions.get(respTopicPartition); if (respEndOffset.hasUndefinedEpochOrOffset()) { - handleOffsetOutOfRange(requestPosition, respTopicPartition, - "Failed leader offset epoch validation for " + respEndOffset - + " since no end offset larger than current fetch epoch was reported"); + try { + handleOffsetOutOfRange(requestPosition, respTopicPartition, + "Failed leader offset epoch validation for " + requestPosition + + " since no end offset larger than current fetch epoch was reported"); + } catch (OffsetOutOfRangeException e) { + // Swallow the OffsetOutOfRangeException to finish all partitions validation. Review comment: I don't feel great about having this in the code, even if it's supposed to be temporary. I think we should just fix the exception propagation bug in this patch. It seems like it would be straightforward to do something similar to what is done in the `onFailure` path. ```java if (!(e instanceof RetriableException) && !cachedOffsetForLeaderException.compareAndSet(null, e)) { log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", e); } ``` Above may not be ideal, but at least it provides a way to propagate individual errors. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -822,19 +822,25 @@ public void onSuccess(OffsetForEpochResult offsetsResult) { FetchPosition requestPosition = fetchPositions.get(respTopicPartition); if (respEndOffset.hasUndefinedEpochOrOffset()) { - handleOffsetOutOfRange(requestPosition, respTopicPartition, - "Failed leader offset epoch validation for " + respEndOffset - + " since no end offset larger than current fetch epoch was reported"); + try { + handleOffsetOutOfRange(requestPosition, respTopicPartition, + "Failed leader offset epoch validation for " + requestPosition + + " since no end offset larger than current fetch epoch was reported"); + } catch (OffsetOutOfRangeException e) { + // Swallow the OffsetOutOfRangeException to finish all partitions validation. + } } else { Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation( respTopicPartition, requestPosition, respEndOffset); divergentOffsetOpt.ifPresent( - divergentOffset -> truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset)); + divergentOffset -> { + log.info("Detected log truncation with diverging offset: {}", divergentOffset); Review comment: Can you add the topic partition to this message? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org