adixitconfluent commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2048256117
########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() { assertEquals(1, delayedShareFetch.expiredRequestMeter().count()); } + @Test + public void testRemoteStorageFetchTryCompleteReturnsFalse() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp1 and remote storage read result for tp2. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertFalse(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + // Remote fetch object gets created for delayed share fetch object. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for local log read topic partitions tp0 and tp1. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompleteThrowsException() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for sp0 and sp1. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp0 and remote storage read result for tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Exception will be thrown during the creation of remoteFetch object. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenThrow(new RejectedExecutionException("Exception thrown")); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + // tryComplete returns true and goes to forceComplete once the exception occurs. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // The future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertFalse(future.isCompletedExceptionally()); + assertEquals(Set.of(tp1), future.join().keySet()); + // Exception occurred and was handled. + Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + // Verify the locks are released for both local and remote read topic partitions tp0 and tp1 because of exception occurrence. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + Mockito.verify(delayedShareFetch, times(1)).onComplete(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock but the broker becomes unavailable. + Future<Void> remoteFetchTask = mock(Future.class); + doAnswer(invocation -> { + when(remoteFetchTask.isCancelled()).thenReturn(true); + return false; + }).when(remoteFetchTask).cancel(false); + + when(remoteFetchTask.cancel(false)).thenReturn(true); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(remoteFetchTask); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled()); + // Partition locks should be released for all 3 topic partitions + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response contained tp0 and tp1 (local fetch) but not tp2, since it errored out. + assertEquals(Set.of(tp0, tp1), future.join().keySet()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.empty(), + Optional.of(new TimeoutException("Error occurred while creating remote fetch result")) // Remote fetch result is returned with an error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); Review Comment: As per mocked data, since we are not able to acquire the fetch lock for tp1, usage of tp1 is pretty much redundant. Makes sense, I have removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org