junrao commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2047511436
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -605,27 +606,26 @@ private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchIn // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform // fetch for multiple remote fetch topic partition in a single share fetch request - remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get())); + topicPartitionRemoteFetchInfoOpt = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult)); partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); break; } } - return remoteStorageFetchMetadataMap; + return topicPartitionRemoteFetchInfoOpt; } private boolean maybeProcessRemoteFetch( LinkedHashMap<TopicIdPartition, Long> topicPartitionData, - TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, - LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ) throws Exception { topicPartitionData.keySet().forEach(topicIdPartition -> { // topic partitions for which fetch would not be happening in this share fetch request. if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. - releasePartitionLocks(Set.of(topicIdPartition)); + releasePartitionLocksAndAddToActionQueue(Set.of(topicIdPartition)); Review Comment: It's better to release all the locks first and then add to the action queue. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -529,8 +583,304 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + topicPartitionRemoteFetchInfoOpt = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult)); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return topicPartitionRemoteFetchInfoOpt; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) throws Exception { + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. + releasePartitionLocksAndAddToActionQueue(Set.of(topicIdPartition)); + } + }); + Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo); + if (exceptionOpt.isPresent()) { + remoteStorageFetchException = exceptionOpt; + throw exceptionOpt.get(); + } + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. + */ + private Optional<Exception> processRemoteFetchOrException( + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) { + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get(); + + Future<Void> remoteFetchTask; + CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>(); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); + } + ); + } catch (RejectedExecutionException e) { + // Return the error if any in scheduling the remote fetch task. + log.warn("Unable to fetch data from remote storage", e); + return Optional.of(e); + } catch (Exception e) { + return Optional.of(e); + } + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); + return Optional.empty(); + } + + /** + * This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent(). + * The operation can be completed if: + * Case a: The partition is in an offline log directory on this broker + * Case b: This broker does not know the partition it tries to fetch + * Case c: This broker is no longer the leader of the partition it tries to fetch + * Case d: The remote storage read request completed (succeeded or failed) + * @return boolean representing whether the remote fetch is completed or not. + */ + private boolean maybeCompletePendingRemoteFetch() { + boolean canComplete = false; + + TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition(); + try { + replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + } catch (KafkaStorageException e) { // Case a + log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (UnknownTopicOrPartitionException e) { // Case b + log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (NotLeaderOrFollowerException e) { // Case c + log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } + + if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d + boolean completedByMe = forceComplete(); + // If invocation of forceComplete is not successful, then that means the request is already completed + // hence release the acquired locks. + if (!completedByMe) { + releasePartitionLocks(partitionsAcquired.keySet()); + } + return completedByMe; + } else + return false; + } + + /** + * This function completes a share fetch request for which we have identified erroneous remote storage fetch in tryComplete() + * It should only be called when we know that there is remote fetch in-flight/completed. + */ + private void completeErroneousRemoteShareFetchRequest() { + try { + handleFetchException(shareFetch, partitionsAcquired.keySet(), remoteStorageFetchException.get()); + } finally { + releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet()); + } + + } + + private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topicIdPartitions) { + // Releasing the lock to move ahead with the next request in queue. + releasePartitionLocks(topicIdPartitions); + // If we have a fetch request completed for a topic-partition, we release the locks for that partition, + // then we should check if there is a pending share fetch request for the topic-partition and complete it. + // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if + // we directly call delayedShareFetchPurgatory.checkAndComplete + replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> + replicaManager.completeDelayedShareFetchRequest( + new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + } + + /** + * This function completes a share fetch request for which we have identified remoteFetch during tryComplete() + * Note - This function should only be called when we know that there is remote fetch in-flight/completed/expired. Review Comment: remote fetch in-flight/completed/expired => remote fetch ? Ditto below. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -277,9 +323,15 @@ public boolean tryComplete() { return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - releasePartitionLocks(topicPartitionData.keySet()); - partitionsAcquired.clear(); - partitionsAlreadyFetched.clear(); + // In case we have a remote fetch exception, we have already released locks for partitions which have potential + // local log read. We do not release locks for partitions which have a remote storage read because we need to Review Comment: Hmm, this seems not quite right. It's possible that the operation is already completed when we get here. In this case, `forceComplete()` won't release the lock for the remote fetch. ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() { assertEquals(1, delayedShareFetch.expiredRequestMeter().count()); } + @Test + public void testRemoteStorageFetchTryCompleteReturnsFalse() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp1 and remote storage read result for tp2. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertFalse(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + // Remote fetch object gets created for delayed share fetch object. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for local log read topic partitions tp0 and tp1. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompleteThrowsException() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for sp0 and sp1. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp0 and remote storage read result for tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Exception will be thrown during the creation of remoteFetch object. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenThrow(new RejectedExecutionException("Exception thrown")); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + // tryComplete returns true and goes to forceComplete once the exception occurs. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // The future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertFalse(future.isCompletedExceptionally()); + assertEquals(Set.of(tp1), future.join().keySet()); + // Exception occurred and was handled. + Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + // Verify the locks are released for both local and remote read topic partitions tp0 and tp1 because of exception occurrence. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + Mockito.verify(delayedShareFetch, times(1)).onComplete(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock but the broker becomes unavailable. + Future<Void> remoteFetchTask = mock(Future.class); + doAnswer(invocation -> { + when(remoteFetchTask.isCancelled()).thenReturn(true); + return false; + }).when(remoteFetchTask).cancel(false); + + when(remoteFetchTask.cancel(false)).thenReturn(true); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(remoteFetchTask); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled()); + // Partition locks should be released for all 3 topic partitions + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response contained tp0 and tp1 (local fetch) but not tp2, since it errored out. + assertEquals(Set.of(tp0, tp1), future.join().keySet()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.empty(), + Optional.of(new TimeoutException("Error occurred while creating remote fetch result")) // Remote fetch result is returned with an error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); Review Comment: Do we need tp1 in this test? We are not returning its result anyway. ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() { assertEquals(1, delayedShareFetch.expiredRequestMeter().count()); } + @Test + public void testRemoteStorageFetchTryCompleteReturnsFalse() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp1 and remote storage read result for tp2. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertFalse(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + // Remote fetch object gets created for delayed share fetch object. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for local log read topic partitions tp0 and tp1. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompleteThrowsException() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for sp0 and sp1. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp0 and remote storage read result for tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Exception will be thrown during the creation of remoteFetch object. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenThrow(new RejectedExecutionException("Exception thrown")); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + // tryComplete returns true and goes to forceComplete once the exception occurs. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // The future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertFalse(future.isCompletedExceptionally()); + assertEquals(Set.of(tp1), future.join().keySet()); + // Exception occurred and was handled. + Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + // Verify the locks are released for both local and remote read topic partitions tp0 and tp1 because of exception occurrence. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + Mockito.verify(delayedShareFetch, times(1)).onComplete(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock but the broker becomes unavailable. + Future<Void> remoteFetchTask = mock(Future.class); + doAnswer(invocation -> { + when(remoteFetchTask.isCancelled()).thenReturn(true); + return false; + }).when(remoteFetchTask).cancel(false); + + when(remoteFetchTask.cancel(false)).thenReturn(true); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(remoteFetchTask); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled()); + // Partition locks should be released for all 3 topic partitions + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response contained tp0 and tp1 (local fetch) but not tp2, since it errored out. + assertEquals(Set.of(tp0, tp1), future.join().keySet()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.empty(), + Optional.of(new TimeoutException("Error occurred while creating remote fetch result")) // Remote fetch result is returned with an error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); + + // Except tp3, all the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp3.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + sharePartitions.put(tp3, sp3); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset does not match with the cached entry for sp0, sp1 and sp2. Hence, a replica manager fetch will happen for all of them in tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp0, tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withReplicaManager(replicaManager) + .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // the future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0, tp1, tp2), future.join().keySet()); + // Verify the locks are released for both local log and remote storage read topic partitions tp0, tp1 and tp2. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), future.join().get(tp1).errorCode()); + assertEquals(Errors.NONE.code(), future.join().get(tp2).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 and sp1 are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for both. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + LinkedHashSet<TopicIdPartition> remoteStorageFetchPartitions = new LinkedHashSet<>(); + remoteStorageFetchPartitions.add(tp0); + remoteStorageFetchPartitions.add(tp1); + + // Mocking remote storage read result for tp0 and tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), remoteStorageFetchPartitions)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released separately for tp1 (from tryComplete). + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + // From onComplete, the locks will be released for both tp0 and tp1. tp0 because it was acquired from + // tryComplete and has remote fetch processed. tp1 will be reacquired in onComplete when we check for local log read. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response only contains the first remote storage fetch topic partition - tp0. Review Comment: Is that accurate? It seems that we mock `sp1.acquire` too. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -529,8 +582,300 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get())); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return remoteStorageFetchMetadataMap; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) throws Exception { + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. + releasePartitionLocks(Set.of(topicIdPartition)); + } + }); + Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse); + if (exceptionOpt.isPresent()) { + remoteStorageFetchException = exceptionOpt; + throw exceptionOpt.get(); + } + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. + * @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions + */ + private Optional<Exception> processRemoteFetchOrException( + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.remoteStorageFetchInfo(); + LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition); + + Future<Void> remoteFetchTask; + CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>(); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); + } + ); + } catch (RejectedExecutionException e) { + // Return the error if any in scheduling the remote fetch task. + log.warn("Unable to fetch data from remote storage", e); + return Optional.of(e); + } catch (Exception e) { + return Optional.of(e); + } + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); Review Comment: Could we at least set both instance variables in the same method? Otherwise, it's a bit hard to track where the internal states are changed. ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() { assertEquals(1, delayedShareFetch.expiredRequestMeter().count()); } + @Test + public void testRemoteStorageFetchTryCompleteReturnsFalse() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp1 and remote storage read result for tp2. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertFalse(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + // Remote fetch object gets created for delayed share fetch object. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for local log read topic partitions tp0 and tp1. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompleteThrowsException() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for sp0 and sp1. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp0 and remote storage read result for tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Exception will be thrown during the creation of remoteFetch object. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenThrow(new RejectedExecutionException("Exception thrown")); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + // tryComplete returns true and goes to forceComplete once the exception occurs. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // The future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertFalse(future.isCompletedExceptionally()); + assertEquals(Set.of(tp1), future.join().keySet()); + // Exception occurred and was handled. + Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + // Verify the locks are released for both local and remote read topic partitions tp0 and tp1 because of exception occurrence. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + Mockito.verify(delayedShareFetch, times(1)).onComplete(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock but the broker becomes unavailable. + Future<Void> remoteFetchTask = mock(Future.class); + doAnswer(invocation -> { + when(remoteFetchTask.isCancelled()).thenReturn(true); + return false; + }).when(remoteFetchTask).cancel(false); + + when(remoteFetchTask.cancel(false)).thenReturn(true); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(remoteFetchTask); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled()); + // Partition locks should be released for all 3 topic partitions + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response contained tp0 and tp1 (local fetch) but not tp2, since it errored out. + assertEquals(Set.of(tp0, tp1), future.join().keySet()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.empty(), + Optional.of(new TimeoutException("Error occurred while creating remote fetch result")) // Remote fetch result is returned with an error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); + + // Except tp3, all the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp3.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + sharePartitions.put(tp3, sp3); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset does not match with the cached entry for sp0, sp1 and sp2. Hence, a replica manager fetch will happen for all of them in tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp0, tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) Review Comment: Do we need tp3 in this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org