hachikuji commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r434725093



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -822,19 +822,25 @@ public void onSuccess(OffsetForEpochResult offsetsResult) 
{
                         FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
 
                         if (respEndOffset.hasUndefinedEpochOrOffset()) {
-                            handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
-                                "Failed leader offset epoch validation for " + 
respEndOffset
-                                + " since no end offset larger than current 
fetch epoch was reported");
+                            try {
+                                handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
+                                    "Failed leader offset epoch validation for 
" + requestPosition
+                                        + " since no end offset larger than 
current fetch epoch was reported");
+                            } catch (OffsetOutOfRangeException e) {
+                                // Swallow the OffsetOutOfRangeException to 
finish all partitions validation.

Review comment:
       I don't feel great about having this in the code, even if it's supposed 
to be temporary. I think we should just fix the exception propagation bug in 
this patch. It seems like it would be straightforward to do something similar 
to what is done in the `onFailure` path.
   ```java
                       if (!(e instanceof RetriableException) && 
!cachedOffsetForLeaderException.compareAndSet(null, e)) {
                           log.error("Discarding error in OffsetsForLeaderEpoch 
because another error is pending", e);
                       }
   ```
   Above may not be ideal, but at least it provides a way to propagate 
individual errors.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -822,19 +822,25 @@ public void onSuccess(OffsetForEpochResult offsetsResult) 
{
                         FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
 
                         if (respEndOffset.hasUndefinedEpochOrOffset()) {
-                            handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
-                                "Failed leader offset epoch validation for " + 
respEndOffset
-                                + " since no end offset larger than current 
fetch epoch was reported");
+                            try {
+                                handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
+                                    "Failed leader offset epoch validation for 
" + requestPosition
+                                        + " since no end offset larger than 
current fetch epoch was reported");
+                            } catch (OffsetOutOfRangeException e) {
+                                // Swallow the OffsetOutOfRangeException to 
finish all partitions validation.
+                            }
                         } else {
                             Optional<OffsetAndMetadata> divergentOffsetOpt = 
subscriptions.maybeCompleteValidation(
                                 respTopicPartition, requestPosition, 
respEndOffset);
                             divergentOffsetOpt.ifPresent(
-                                divergentOffset -> 
truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset));
+                                divergentOffset -> {
+                                    log.info("Detected log truncation with 
diverging offset: {}", divergentOffset);

Review comment:
       Can you add the topic partition to this message?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to