This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 08caff42b71 [improve][test] Add test for concurrent processing of
pending read Entries (#24519)
08caff42b71 is described below
commit 08caff42b71e619b66094df6b3bcaeb1eba1e97a
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jul 16 22:33:18 2025 +0800
[improve][test] Add test for concurrent processing of pending read Entries
(#24519)
---
.../impl/cache/PendingReadsManagerTest.java | 50 ++++++++++++++++++++++
1 file changed, 50 insertions(+)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
index ebf01cba269..9113c123915 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -32,10 +32,17 @@ import io.opentelemetry.api.OpenTelemetry;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -47,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.tuple.Pair;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterClass;
@@ -60,6 +68,7 @@ public class PendingReadsManagerTest {
static final Object CTX = "foo";
static final Object CTX2 = "far";
static final long LEDGER_ID = 123414L;
+ private final Map<Pair<Long, Long>, AtomicInteger> entryRangeReadCount =
new ConcurrentHashMap<>();
ExecutorService orderedExecutor;
PendingReadsManagerTest() {
@@ -116,6 +125,7 @@ public class PendingReadsManagerTest {
ml = mock(ManagedLedgerImpl.class);
when(ml.getExecutor()).thenReturn(orderedExecutor);
when(rangeEntryCache.getManagedLedger()).thenReturn(ml);
+ entryRangeReadCount.clear();
}
@@ -192,6 +202,8 @@ public class PendingReadsManagerTest {
(invocationOnMock -> {
log.info("readFromStorage from {} to {} shouldCacheEntry
{}", firstEntry, endEntry,
shouldCacheEntry);
+ entryRangeReadCount.computeIfAbsent(Pair.of(firstEntry,
endEntry), __ -> new AtomicInteger(0))
+ .getAndIncrement();
return read;
})
);
@@ -477,4 +489,42 @@ public class PendingReadsManagerTest {
}
+ @Test
+ public void concurrentReadOnOverlappedEntryRanges() throws Exception {
+ final var readFutures = new ArrayList<CapturingReadEntriesCallback>();
+ final BiConsumer<Long, Long> readEntries = (firstEntry, lastEntry) -> {
+ final var callback = new CapturingReadEntriesCallback();
+ pendingReadsManager.readEntries(lh, firstEntry, lastEntry, false,
callback, CTX);
+ readFutures.add(callback);
+ };
+ final BiFunction<Long, Long, PreparedReadFromStorage>
mockReadFromStorage = (firstEntry, lastEntry) ->
+ prepareReadFromStorage(lh, rangeEntryCache, firstEntry,
lastEntry, false);
+
+ final var read0 = mockReadFromStorage.apply(10L, 70L);
+ readEntries.accept(10L, 70L);
+ final var read1 = mockReadFromStorage.apply(80L, 100L);
+ readEntries.accept(80L, 100L);
+ final var read2 = mockReadFromStorage.apply(71L, 79L);
+ readEntries.accept(10L, 100L);
+
+ read1.storageReadCompleted();
+ readFutures.get(1).get(1, TimeUnit.SECONDS);
+ assertEquals(readFutures.get(1).getEntries().size(), 21);
+
+ read0.storageReadCompleted();
+ readFutures.get(0).get(1, TimeUnit.SECONDS);
+ assertEquals(readFutures.get(0).getEntries().size(), 61);
+
+ read2.storageReadCompleted();
+ readFutures.get(2).get(1, TimeUnit.SECONDS);
+ assertEquals(readFutures.get(2).getEntries().size(), 91);
+
+ log.info("entryRangeReadCount: {}", entryRangeReadCount);
+ final var keys = Set.of(Pair.of(10L, 70L), Pair.of(71L, 79L),
+ Pair.of(80L, 100L));
+ assertEquals(entryRangeReadCount.keySet(), keys);
+ for (final var key : keys) {
+ assertEquals(entryRangeReadCount.get(key).get(), 1);
+ }
+ }
}