lucasbru commented on code in PR #13023:
URL: https://github.com/apache/kafka/pull/13023#discussion_r1053392985


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -4762,6 +4763,39 @@ public void 
testFetchDisconnectedShouldClearPreferredReadReplica() {
         assertEquals(-1, selected.id());
     }
 
+    @Test
+    public void 
testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, fetcher.sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect and remove tp0 from assignment
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0), true);
+        subscriptions.assignFromUser(emptySet());
+
+        // Preferred read replica should not be cleared
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -346,7 +346,11 @@ public void onFailure(RuntimeException e) {
                             FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
                             if (handler != null) {
                                 handler.handleError(e);
-                                
handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
+                                // Make sure to filter out topic partitions 
that are not part of the assignment
+                                // anymore when the request fails.
+                                handler.sessionTopicPartitions().stream()
+                                    .filter(subscriptions::isAssigned)
+                                    
.forEach(subscriptions::clearPreferredReadReplica);

Review Comment:
   Good point! I was thinking the `Fetcher` object monitor protects us, but you 
are right, 
    it doesn't. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to