dajac commented on code in PR #13023:
URL: https://github.com/apache/kafka/pull/13023#discussion_r1053276546

##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -4762,6 +4763,39 @@ public void 
testFetchDisconnectedShouldClearPreferredReadReplica() {
         assertEquals(-1, selected.id());
     }
 
+    @Test
+    public void 
testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(), new BytesDeserializer(),
+            Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, 
Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds, false));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, fetcher.sendFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Verify
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+        assertEquals(1, selected.id());
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Disconnect and remove tp0 from assignment
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0), true);
+        subscriptions.assignFromUser(emptySet());
+
+        // Preferred read replica should not be cleared
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();

Review Comment:
   Should we add the following assertion to be consistent with other tests?
   ```
           selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
           assertEquals(-1, selected.id());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -346,7 +346,11 @@ public void onFailure(RuntimeException e) {
                             FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
                             if (handler != null) {
                                 handler.handleError(e);
-                                
handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
+                                // Make sure to filter out topic partitions 
that are not part of the assignment
+                                // anymore when the request fails.
+                                handler.sessionTopicPartitions().stream()
+                                    .filter(subscriptions::isAssigned)
+                                    
.forEach(subscriptions::clearPreferredReadReplica);

Review Comment:
   What would happen if the partition is removed from the subscriptions between 
these two lines? It seems to me that we could face the same issue.
   
   It may be better to push this check into `clearPreferredReadReplica`. 
Something like this:
   
   ```
       public synchronized Optional<Integer> 
clearPreferredReadReplica(TopicPartition tp) {
           final TopicPartitionState topicPartitionState = 
assignedStateOrNull(tp);
           if (topicPartitionState == null) {
               return Optional.empty();
           } else {
               return topicPartitionState.clearPreferredReadReplica();
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to