abbccdda commented on a change in pull request #10006:
URL: https://github.com/apache/kafka/pull/10006#discussion_r567159871



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -1745,51 +1742,77 @@ public void testSeekWithInFlightReset() {
         assertEquals(237L, subscriptions.position(tp0).offset);
     }
 
-    @Timeout(10)
+    private boolean listOffsetMatchesExpectedReset(
+        TopicPartition tp,
+        OffsetResetStrategy strategy,
+        AbstractRequest request
+    ) {
+        if (!(request instanceof ListOffsetsRequest)) {
+            return false;
+        }
+
+        ListOffsetsRequest req = (ListOffsetsRequest) request;
+        if (req.data().topics().size() != 1) {
+            return false;
+        }
+
+        ListOffsetsTopic listTopic = req.data().topics().get(0);
+        if (!listTopic.name().equals(tp.topic())) {
+            return false;
+        }
+
+        if (listTopic.partitions().size() != 1) {
+            return false;
+        }
+
+        ListOffsetsPartition listPartition = listTopic.partitions().get(0);
+        if (listPartition.partitionIndex() != tp.partition()) {
+            return false;
+        }
+
+        long timestamp = listPartition.timestamp();
+        if (strategy == OffsetResetStrategy.EARLIEST && timestamp != 
ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+            return false;
+        }
+
+        if (strategy == OffsetResetStrategy.LATEST && timestamp != 
ListOffsetsRequest.LATEST_TIMESTAMP) {
+            return false;
+        }
+
+        return true;
+    }
+
     @Test
-    public void testEarlierOffsetResetArrivesLate() throws 
InterruptedException {
-        LogContext lc = new LogContext();
-        buildFetcher(spy(new SubscriptionState(lc, 
OffsetResetStrategy.EARLIEST)), lc);
+    public void testEarlierOffsetResetArrivesLate() {
+        buildFetcher();

Review comment:
       Why we don't need to set reset strategy here?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -1745,51 +1742,77 @@ public void testSeekWithInFlightReset() {
         assertEquals(237L, subscriptions.position(tp0).offset);
     }
 
-    @Timeout(10)
+    private boolean listOffsetMatchesExpectedReset(
+        TopicPartition tp,
+        OffsetResetStrategy strategy,
+        AbstractRequest request
+    ) {
+        if (!(request instanceof ListOffsetsRequest)) {
+            return false;
+        }
+
+        ListOffsetsRequest req = (ListOffsetsRequest) request;
+        if (req.data().topics().size() != 1) {
+            return false;
+        }
+
+        ListOffsetsTopic listTopic = req.data().topics().get(0);
+        if (!listTopic.name().equals(tp.topic())) {
+            return false;
+        }
+
+        if (listTopic.partitions().size() != 1) {
+            return false;
+        }
+
+        ListOffsetsPartition listPartition = listTopic.partitions().get(0);
+        if (listPartition.partitionIndex() != tp.partition()) {
+            return false;
+        }
+
+        long timestamp = listPartition.timestamp();
+        if (strategy == OffsetResetStrategy.EARLIEST && timestamp != 
ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+            return false;
+        }
+
+        if (strategy == OffsetResetStrategy.LATEST && timestamp != 
ListOffsetsRequest.LATEST_TIMESTAMP) {
+            return false;
+        }
+
+        return true;
+    }
+
     @Test
-    public void testEarlierOffsetResetArrivesLate() throws 
InterruptedException {
-        LogContext lc = new LogContext();
-        buildFetcher(spy(new SubscriptionState(lc, 
OffsetResetStrategy.EARLIEST)), lc);
+    public void testEarlierOffsetResetArrivesLate() {
+        buildFetcher();
         assignFromUser(singleton(tp0));
 
-        ExecutorService es = Executors.newSingleThreadExecutor();
-        CountDownLatch latchLatestStart = new CountDownLatch(1);
-        CountDownLatch latchEarliestStart = new CountDownLatch(1);
-        CountDownLatch latchEarliestDone = new CountDownLatch(1);
-        CountDownLatch latchEarliestFinish = new CountDownLatch(1);
-        try {
-            doAnswer(invocation -> {
-                latchLatestStart.countDown();
-                latchEarliestStart.await();
-                Object result = invocation.callRealMethod();
-                latchEarliestDone.countDown();
-                return result;
-            }).when(subscriptions).maybeSeekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(0L,
-                Optional.empty(), metadata.currentLeader(tp0)), 
OffsetResetStrategy.EARLIEST);
-
-            es.submit(() -> {
-                subscriptions.requestOffsetReset(tp0, 
OffsetResetStrategy.EARLIEST);
-                fetcher.resetOffsetsIfNeeded();
-                consumerClient.pollNoWakeup();
-                client.respond(listOffsetResponse(Errors.NONE, 1L, 0L));
-                consumerClient.pollNoWakeup();
-                latchEarliestFinish.countDown();
-            }, Void.class);
-
-            latchLatestStart.await();
-            subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
-            fetcher.resetOffsetsIfNeeded();
-            consumerClient.pollNoWakeup();
-            client.respond(listOffsetResponse(Errors.NONE, 1L, 10L));
-            latchEarliestStart.countDown();
-            latchEarliestDone.await();
-            consumerClient.pollNoWakeup();
-            latchEarliestFinish.await();
-            assertEquals(10, subscriptions.position(tp0).offset);
-        } finally {
-            es.shutdown();
-            es.awaitTermination(10000, TimeUnit.MILLISECONDS);
-        }
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+        fetcher.resetOffsetsIfNeeded();
+
+        client.prepareResponse(req -> {
+            if (listOffsetMatchesExpectedReset(tp0, 
OffsetResetStrategy.EARLIEST, req)) {
+                // Before the response is handled, we get a request to reset 
to the latest offset
+                subscriptions.requestOffsetReset(tp0, 
OffsetResetStrategy.LATEST);
+                return true;
+            } else {
+                return false;
+            }
+        }, listOffsetResponse(Errors.NONE, 1L, 0L));
+        consumerClient.pollNoWakeup();
+
+        // The list offset result should be ignored
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+        assertEquals(OffsetResetStrategy.LATEST, 
subscriptions.resetStrategy(tp0));
+
+        fetcher.resetOffsetsIfNeeded();
+        client.prepareResponse(req -> {
+            return listOffsetMatchesExpectedReset(tp0, 
OffsetResetStrategy.LATEST, req);

Review comment:
       `return` is not necessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to