adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2048256117


##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() {
         assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
     }
 
+    @Test
+    public void testRemoteStorageFetchTryCompleteReturnsFalse() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertFalse(delayedShareFetch.tryComplete());
+        assertFalse(delayedShareFetch.isCompleted());
+        // Remote fetch object gets created for delayed share fetch object.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for local log read topic partitions 
tp0 and tp1.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchTryCompleteThrowsException() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for sp0 and sp1.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp0 and remote storage read 
result for tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), 
Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Exception will be thrown during the 
creation of remoteFetch object.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), any())).thenThrow(new 
RejectedExecutionException("Exception thrown"));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withExceptionHandler(exceptionHandler)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        // tryComplete returns true and goes to forceComplete once the 
exception occurs.
+        assertTrue(delayedShareFetch.tryComplete());
+        assertTrue(delayedShareFetch.isCompleted());
+        // The future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertFalse(future.isCompletedExceptionally());
+        assertEquals(Set.of(tp1), future.join().keySet());
+        // Exception occurred and was handled.
+        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        // Verify the locks are released for both local and remote read topic 
partitions tp0 and tp1 because of exception occurrence.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        Mockito.verify(delayedShareFetch, times(1)).onComplete();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during 
tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock but the broker becomes unavailable.
+        Future<Void> remoteFetchTask = mock(Future.class);
+        doAnswer(invocation -> {
+            when(remoteFetchTask.isCancelled()).thenReturn(true);
+            return false;
+        }).when(remoteFetchTask).cancel(false);
+
+        when(remoteFetchTask.cancel(false)).thenReturn(true);
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(remoteFetchTask);
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+        
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        
assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled());
+        // Partition locks should be released for all 3 topic partitions
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response contained tp0 and tp1 (local fetch) but not 
tp2, since it errored out.
+        assertEquals(Set.of(tp0, tp1), future.join().keySet());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.empty(),
+            Optional.of(new TimeoutException("Error occurred while creating 
remote fetch result")) // Remote fetch result is returned with an error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());

Review Comment:
   As per mocked data, since we are not able to acquire the fetch lock for tp1, 
usage of tp1 is pretty much redundant. Makes sense, I have removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to