hachikuji commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r434208948



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -611,7 +611,7 @@ public void 
testHeartbeatIllegalGenerationResponseWithOldGeneration() throws Int
 
         final AbstractCoordinator.Generation currGen = 
coordinator.generation();
 
-        // let the heartbeat request to send out a request
+        // let the heartbeat thread to send out a request

Review comment:
       nit: while we're at it, drop the "to"?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
##########
@@ -86,4 +84,9 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(error, leaderEpoch, endOffset);
     }
+
+    public boolean hasUndefinedEpochOrOffset() {
+        return this.endOffset == UNDEFINED_EPOCH_OFFSET ||

Review comment:
       Older versions did not return the epoch, so it was possible to see an 
offset defined without an epoch. However, the version that the consumer relies 
on should always have both or neither. Anyway, I think it is reasonable to be a 
little stricter here.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/EpochEndOffsetTest.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import static org.apache.kafka.common.requests.EpochEndOffset.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.EpochEndOffset.UNDEFINED_EPOCH_OFFSET;
+import static org.junit.Assert.assertEquals;
+
+public class EpochEndOffsetTest {
+
+    @Test
+    public void testConstructor() {
+        int leaderEpoch = 5;
+        long endOffset = 10L;
+        EpochEndOffset epochEndOffset = new 
EpochEndOffset(Errors.FENCED_LEADER_EPOCH, leaderEpoch, endOffset);
+
+        verify(leaderEpoch, endOffset, true, Errors.FENCED_LEADER_EPOCH, 
false, epochEndOffset);

Review comment:
       nit: a matter of taste I guess, but I find the tests are easier to 
follow when the assertions are inline.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -812,13 +813,25 @@ public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
                     // For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
                     // for the partition. If so, it means we have experienced 
log truncation and need to reposition
                     // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and 
epoch are valid. If not, then we should reset
+                    // its offset if reset policy is configured, or throw out 
of range exception.
                     offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = 
fetchPostitions.get(respTopicPartition);
-                        Optional<OffsetAndMetadata> divergentOffsetOpt = 
subscriptions.maybeCompleteValidation(
+                        SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
+
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            if (subscriptions.hasDefaultOffsetResetPolicy()) {

Review comment:
       I think this is basically the same code we have handling out of range 
errors from fetch responses. Does it makes sense to add a helper?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -812,13 +813,25 @@ public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
                     // For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
                     // for the partition. If so, it means we have experienced 
log truncation and need to reposition
                     // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and 
epoch are valid. If not, then we should reset
+                    // its offset if reset policy is configured, or throw out 
of range exception.
                     offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = 
fetchPostitions.get(respTopicPartition);
-                        Optional<OffsetAndMetadata> divergentOffsetOpt = 
subscriptions.maybeCompleteValidation(
+                        SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
+
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            if (subscriptions.hasDefaultOffsetResetPolicy()) {
+                                log.info("Fetch offset {} is out of range for 
partition {}, resetting offset", requestPosition, respTopicPartition);

Review comment:
       Could we add some detail to this message? Maybe something like "Leader 
reported no end offset larger than current fetch epoch" or something like that.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -812,13 +813,25 @@ public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
                     // For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
                     // for the partition. If so, it means we have experienced 
log truncation and need to reposition
                     // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and 
epoch are valid. If not, then we should reset
+                    // its offset if reset policy is configured, or throw out 
of range exception.
                     offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = 
fetchPostitions.get(respTopicPartition);
-                        Optional<OffsetAndMetadata> divergentOffsetOpt = 
subscriptions.maybeCompleteValidation(
+                        SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
+
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            if (subscriptions.hasDefaultOffsetResetPolicy()) {
+                                log.info("Fetch offset {} is out of range for 
partition {}, resetting offset", requestPosition, respTopicPartition);
+                                
subscriptions.requestOffsetReset(respTopicPartition);
+                            } else {
+                                throw new 
OffsetOutOfRangeException(Collections.singletonMap(respTopicPartition, 
requestPosition.offset));

Review comment:
       Also would be useful if we could include the epoch in the exception 
message as well as similar change to emphasize that this was raised during 
epoch validation.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3751,6 +3755,85 @@ public void testOffsetValidationSkippedForOldBroker() {
         }
     }
 
+    @Test
+    public void testOffsetValidationSkippedForOldResponse() {
+        // Old responses may provide unreliable leader epoch,
+        // so we should skip offset validation and not send the request.
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+
+        final int epochOne = 1;
+
+        
metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 
1,
+            Collections.emptyMap(), partitionCounts, tp -> epochOne), false, 
0L);
+
+        Node node = metadata.fetch().nodes().get(0);
+        assertFalse(client.isConnected(node.idString()));
+
+        // Seek with a position and leader+epoch
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+            metadata.currentLeader(tp0).leader, Optional.of(epochOne));
+        subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
+        assertFalse(client.isConnected(node.idString()));
+        assertTrue(subscriptions.awaitingValidation(tp0));
+
+        // Inject an older version of the metadata response
+        final short responseVersion = 8;
+        
metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 
1,
+            Collections.emptyMap(), partitionCounts, responseVersion), false, 
0L);
+        fetcher.validateOffsetsIfNeeded();
+        // Offset validation is skipped
+        assertFalse(subscriptions.awaitingValidation(tp0));
+    }
+
+    @Test
+    public void testOffsetValidationResetOffsetForUndefinedEpoch() {
+        testOffsetValidationWithGivenEpochOffset(new 
EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 0L));
+
+    }
+    @Test
+    public void testOffsetValidationResetOffsetForUndefinedOffset() {
+        testOffsetValidationWithGivenEpochOffset(new EpochEndOffset(2, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
+    }
+
+    private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset 
epochEndOffset) {

Review comment:
       Is it useful also to check the case when no reset policy is defined?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -149,7 +148,11 @@
     private TopicPartition tp1 = new TopicPartition(topicName, 1);
     private TopicPartition tp2 = new TopicPartition(topicName, 2);
     private TopicPartition tp3 = new TopicPartition(topicName, 3);
-    private MetadataResponse initialUpdateResponse = 
TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+        TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
+    private MetadataResponse initialUpdateResponseWithLeaderEpoch =

Review comment:
       nit: if one of these is an uncommon case, maybe we can make it local to 
the test cases that need it.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -337,7 +345,7 @@ public void testFetchSkipsBlackedOutNodes() {
 
         assignFromUser(singleton(tp0));
         subscriptions.seek(tp0, 0);
-        client.updateMetadata(initialUpdateResponse);
+        client.updateMetadata(initialUpdateResponseWithLeaderEpoch);

Review comment:
       Do we need to change some of these other tests? We're still using 
`initialUpdateResponse` above in `assignFromUser` and below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to