This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ddfd46a0b15 [fix][ml]Fix EOFException after enabled topics offloading
(#24753)
ddfd46a0b15 is described below
commit ddfd46a0b15ded1f2aac02d33100086e2a77551e
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 22 23:48:42 2025 +0800
[fix][ml]Fix EOFException after enabled topics offloading (#24753)
(cherry picked from commit 3a9876353df9e3e3699efdb63c1de96f8d897ab0)
---
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 235 +++++++++++++++------
.../offload/jcloud/impl/OffloadIndexEntryImpl.java | 15 ++
.../impl/BlobStoreBackedReadHandleImplTest.java | 212 +++++++++++++++++++
3 files changed, 400 insertions(+), 62 deletions(-)
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 916e33c3642..77bcfa5cdd8 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.TopicName;
@@ -79,7 +80,8 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle, OffloadedLedge
private volatile long lastAccessTimestamp = System.currentTimeMillis();
- private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock
index,
+ @VisibleForTesting
+ BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream,
ExecutorService executor,
OffsetsCache entryOffsetsCache) {
this.ledgerId = ledgerId;
@@ -121,24 +123,20 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle, OffloadedLedge
return promise;
}
- @Override
- public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
- if (log.isDebugEnabled()) {
- log.debug("Ledger {}: reading {} - {} ({} entries}",
- getId(), firstEntry, lastEntry, (1 + lastEntry -
firstEntry));
+ private class ReadTask implements Runnable {
+ private final long firstEntry;
+ private final long lastEntry;
+ private final CompletableFuture<LedgerEntries> promise;
+ private int seekedAndTryTimes = 0;
+
+ public ReadTask(long firstEntry, long lastEntry,
CompletableFuture<LedgerEntries> promise) {
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.promise = promise;
}
- CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
- // Ledger handles will be only marked idle when "pendingRead" is "0",
it is not needed to update
- // "lastAccessTimestamp" if "pendingRead" is larger than "0".
- // Rather than update "lastAccessTimestamp" when starts a reading,
updating it when a reading task is finished
- // is better.
- PENDING_READ_UPDATER.incrementAndGet(this);
- promise.whenComplete((__, ex) -> {
- lastAccessTimestamp = System.currentTimeMillis();
-
PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this);
- });
- executor.execute(() -> {
+ @Override
+ public void run() {
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read
range: {}-{}",
ledgerId, firstEntry, lastEntry);
@@ -146,83 +144,191 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle, OffloadedLedge
return;
}
- List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
- boolean seeked = false;
+ List<LedgerEntry> entryCollector = new ArrayList<LedgerEntry>();
try {
if (firstEntry > lastEntry
- || firstEntry < 0
- || lastEntry > getLastAddConfirmed()) {
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
promise.completeExceptionally(new
BKException.BKIncorrectParameterException());
return;
}
long entriesToRead = (lastEntry - firstEntry) + 1;
- long nextExpectedId = firstEntry;
-
- // checking the data stream has enough data to read to avoid
throw EOF exception when reading data.
- // 12 bytes represent the stream have the length and entryID
to read.
- if (dataStream.available() < 12) {
- log.warn("There hasn't enough data to read, current
available data has {} bytes,"
- + " seek to the first entry {} to avoid EOF
exception", inputStream.available(), firstEntry);
- seekToEntry(firstEntry);
- }
+ long expectedEntryId = firstEntry;
+ seekToEntryOffset(firstEntry);
+ seekedAndTryTimes++;
while (entriesToRead > 0) {
long currentPosition = inputStream.getCurrentPosition();
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
- seekToEntry(nextExpectedId);
+ seekToEntryOffset(expectedEntryId);
continue;
}
long entryId = dataStream.readLong();
-
- if (entryId == nextExpectedId) {
+ if (entryId == expectedEntryId) {
entryOffsetsCache.put(ledgerId, entryId,
currentPosition);
ByteBuf buf =
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
- entries.add(LedgerEntryImpl.create(ledgerId, entryId,
length, buf));
+ entryCollector.add(LedgerEntryImpl.create(ledgerId,
entryId, length, buf));
int toWrite = length;
while (toWrite > 0) {
toWrite -= buf.writeBytes(dataStream, toWrite);
}
entriesToRead--;
- nextExpectedId++;
- } else if (entryId > nextExpectedId && entryId <
lastEntry) {
- log.warn("The read entry {} is not the expected entry
{} but in the range of {} - {},"
- + " seeking to the right position", entryId,
nextExpectedId, nextExpectedId, lastEntry);
- seekToEntry(nextExpectedId);
- } else if (entryId < nextExpectedId
- &&
!index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId)))
{
- log.warn("Read an unexpected entry id {} which is
smaller than the next expected entry id {}"
- + ", seeking to the right position", entryId,
nextExpectedId);
- seekToEntry(nextExpectedId);
- } else if (entryId > lastEntry) {
- // in the normal case, the entry id should increment
in order. But if there has random access in
- // the read method, we should allow to seek to the
right position and the entry id should
- // never over to the last entry again.
- if (!seeked) {
- seekToEntry(nextExpectedId);
- seeked = true;
- continue;
- }
- log.info("Expected to read {}, but read {}, which is
greater than last entry {}",
- nextExpectedId, entryId, lastEntry);
- throw new BKException.BKUnexpectedConditionException();
+ expectedEntryId++;
} else {
- long ignore = inputStream.skip(length);
+ handleUnexpectedEntryId(expectedEntryId, entryId);
}
}
-
- promise.complete(LedgerEntriesImpl.create(entries));
+ promise.complete(LedgerEntriesImpl.create(entryCollector));
} catch (Throwable t) {
- log.error("Failed to read entries {} - {} from the offloader
in ledger {}",
- firstEntry, lastEntry, ledgerId, t);
+ log.error("Failed to read entries {} - {} from the offloader
in ledger {}, current position of input"
+ + " stream is {}", firstEntry, lastEntry, ledgerId,
inputStream.getCurrentPosition(), t);
if (t instanceof KeyNotFoundException) {
promise.completeExceptionally(new
BKException.BKNoSuchLedgerExistsException());
} else {
promise.completeExceptionally(t);
}
- entries.forEach(LedgerEntry::close);
+ entryCollector.forEach(LedgerEntry::close);
+ }
+ }
+
+ // in the normal case, the entry id should increment in order. But if
there has random access in
+ // the read method, we should allow to seek to the right position and
the entry id should
+ // never over to the last entry again.
+ private void handleUnexpectedEntryId(long expectedId, long actEntryId)
throws Exception {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ OffloadIndexEntry offsetOfExpectedId =
index.getIndexEntryForEntry(expectedId);
+ OffloadIndexEntry offsetOfActId = actEntryId <=
getLedgerMetadata().getLastEntryId() && actEntryId >= 0
+ ? index.getIndexEntryForEntry(actEntryId) : null;
+ String logLine = String.format("Failed to read [ %s ~ %s ] of the
ledger %s."
+ + " Because got a incorrect entry id %s, the offset is %s."
+ + " The expected entry id is %s, the offset is %s."
+ + " Have seeked and retry read times: %s. LAC is %s.",
+ firstEntry, lastEntry, ledgerId,
+ actEntryId, offsetOfActId == null ? "null because it does
not exist"
+ : String.valueOf(offsetOfActId),
+ expectedId, String.valueOf(offsetOfExpectedId),
+ seekedAndTryTimes, ledgerMetadata != null ?
ledgerMetadata.getLastEntryId() : "unknown");
+ // If it still fails after tried entries count times, throw the
exception.
+ long maxTryTimes = Math.max(3, (lastEntry - firstEntry + 1) >> 2);
+ if (seekedAndTryTimes > maxTryTimes) {
+ log.error(logLine);
+ throw new BKException.BKUnexpectedConditionException();
+ } else {
+ log.warn(logLine);
+ }
+ seekToEntryOffset(expectedId);
+ seekedAndTryTimes++;
+ }
+
+ private void skipPreviousEntry(long startEntryId, long
expectedEntryId) throws IOException, BKException {
+ long nextExpectedEntryId = startEntryId;
+ while (nextExpectedEntryId < expectedEntryId) {
+ long offset = inputStream.getCurrentPosition();
+ int len = dataStream.readInt();
+ if (len < 0) {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ OffloadIndexEntry offsetOfExpectedId =
index.getIndexEntryForEntry(expectedEntryId);
+ log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+ + " Because failed to skip a previous entry {}, len:
{}, got a negative len."
+ + " The expected entry id is {}, the offset is {}."
+ + " Have seeked and retry read times: {}. LAC is {}.",
+ firstEntry, lastEntry, ledgerId,
+ nextExpectedEntryId, len,
+ expectedEntryId, String.valueOf(offsetOfExpectedId),
+ seekedAndTryTimes, ledgerMetadata != null ?
ledgerMetadata.getLastEntryId() : "unknown");
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ long entryId = dataStream.readLong();
+ if (entryId == nextExpectedEntryId) {
+ long skipped = inputStream.skip(len);
+ if (skipped != len) {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ OffloadIndexEntry offsetOfExpectedId =
index.getIndexEntryForEntry(expectedEntryId);
+ log.error("Failed to read [ {} ~ {} ] of the ledger
{}."
+ + " Because failed to skip a previous entry {},
offset: {}, len: {}, there is no more data."
+ + " The expected entry id is {}, the offset is {}."
+ + " Have seeked and retry read times: {}. LAC is
{}.",
+ firstEntry, lastEntry, ledgerId,
+ entryId, offset, len,
+ expectedEntryId,
String.valueOf(offsetOfExpectedId),
+ seekedAndTryTimes, ledgerMetadata != null ?
ledgerMetadata.getLastEntryId() : "unknown");
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ nextExpectedEntryId++;
+ } else {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ OffloadIndexEntry offsetOfExpectedId =
index.getIndexEntryForEntry(expectedEntryId);
+ log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+ + " Because got a incorrect entry id {},."
+ + " The expected entry id is {}, the offset is {}."
+ + " Have seeked and retry read times: {}. LAC is {}.",
+ firstEntry, lastEntry, ledgerId,
+ entryId, expectedEntryId,
String.valueOf(offsetOfExpectedId),
+ seekedAndTryTimes, ledgerMetadata != null ?
ledgerMetadata.getLastEntryId() : "unknown");
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ }
+ }
+
+ private void seekToEntryOffset(long expectedEntryId) throws
IOException, BKException {
+ // 1. Try to find the precise index.
+ // 1-1. Precise cached indexes.
+ Long cachedPreciseIndex = entryOffsetsCache.getIfPresent(ledgerId,
expectedEntryId);
+ if (cachedPreciseIndex != null) {
+ inputStream.seek(cachedPreciseIndex);
+ return;
+ }
+ // 1-2. Precise persistent indexes.
+ OffloadIndexEntry indexOfNearestEntry =
index.getIndexEntryForEntry(expectedEntryId);
+ if (indexOfNearestEntry.getEntryId() == expectedEntryId) {
+ inputStream.seek(indexOfNearestEntry.getDataOffset());
+ return;
}
+ // 2. Try to use the previous index. Since the entry-0 must have a
precise index, we can skip to check
+ // whether "expectedEntryId" is larger than 0;
+ Long cachedPreviousKnownOffset =
entryOffsetsCache.getIfPresent(ledgerId, expectedEntryId - 1);
+ if (cachedPreviousKnownOffset != null) {
+ inputStream.seek(cachedPreviousKnownOffset);
+ skipPreviousEntry(expectedEntryId - 1, expectedEntryId);
+ return;
+ }
+ // 3. Use the persistent index of the nearest entry that is
smaller than "expectedEntryId".
+ // Because it is a sparse index, some entries need to be
skipped.
+ if (indexOfNearestEntry.getEntryId() < expectedEntryId) {
+ inputStream.seek(indexOfNearestEntry.getDataOffset());
+ skipPreviousEntry(indexOfNearestEntry.getEntryId(),
expectedEntryId);
+ } else {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+ + " Because got a incorrect index {} of the entry {},
which is greater than expected."
+ + " Have seeked and retry read times: {}. LAC is {}.",
+ firstEntry, lastEntry, ledgerId,
+ String.valueOf(indexOfNearestEntry), expectedEntryId,
+ seekedAndTryTimes, ledgerMetadata != null ?
ledgerMetadata.getLastEntryId() : "unknown");
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ledger {}: reading {} - {} ({} entries}",
+ getId(), firstEntry, lastEntry, (1 + lastEntry -
firstEntry));
+ }
+ CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+
+ // Ledger handles will be only marked idle when "pendingRead" is "0",
it is not needed to update
+ // "lastAccessTimestamp" if "pendingRead" is larger than "0".
+ // Rather than update "lastAccessTimestamp" when starts a reading,
updating it when a reading task is finished
+ // is better.
+ PENDING_READ_UPDATER.incrementAndGet(this);
+ promise.whenComplete((__, ex) -> {
+ lastAccessTimestamp = System.currentTimeMillis();
+
PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this);
});
+ executor.execute(new ReadTask(firstEntry, lastEntry, promise));
return promise;
}
@@ -238,6 +344,11 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle, OffloadedLedge
}
}
+ private void seekToEntry(OffloadIndexEntry offloadIndexEntry) throws
IOException {
+ long dataOffset = offloadIndexEntry.getDataOffset();
+ inputStream.seek(dataOffset);
+ }
+
@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long
firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry);
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
index 20e9dd68cd2..2faffa7e25c 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+import java.util.Objects;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
/**
@@ -65,5 +66,19 @@ public class OffloadIndexEntryImpl implements
OffloadIndexEntry {
return String.format("[eid:%d, part:%d, offset:%d, doffset:%d]",
entryId, partId, offset, getDataOffset());
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof OffloadIndexEntryImpl that)) {
+ return false;
+ }
+ return entryId == that.entryId && partId == that.partId && offset ==
that.offset
+ && blockHeaderSize == that.blockHeaderSize;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entryId, partId, offset, blockHeaderSize);
+ }
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java
new file mode 100644
index 00000000000..bf116e5aeca
--- /dev/null
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class BlobStoreBackedReadHandleImplTest {
+
+ private OffsetsCache offsetsCache = new OffsetsCache();
+
+ private ScheduledExecutorService executor =
Executors.newScheduledThreadPool(2);
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ if (executor != null) {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ if (offsetsCache != null) {
+ offsetsCache.close();
+ }
+ }
+
+ @AfterClass
+ public void clearCache() throws Exception {
+ offsetsCache.clear();
+ }
+
+ private String getExpectedEntryContent(int entryId) {
+ return "Entry " + entryId;
+ }
+
+ private Pair<BlobStoreBackedReadHandleImpl, ByteBuf> createReadHandle(
+ long ledgerId, int entries, boolean hasDirtyData) throws Exception
{
+ // Build data.
+ List<Pair<Integer, Integer>> offsets = new ArrayList<>();
+ int totalLen = 0;
+ ByteBuf data = ByteBufAllocator.DEFAULT.heapBuffer(1024);
+ data.writeInt(0);
+ data.writerIndex(128);
+ //data.readerIndex(128);
+ for (int i = 0; i < entries; i++) {
+ if (hasDirtyData && i == 1) {
+ data.writeBytes("dirty data".getBytes(UTF_8));
+ }
+ offsets.add(Pair.of(i, data.writerIndex()));
+ offsetsCache.put(ledgerId, i, data.writerIndex());
+ byte[] entryContent = getExpectedEntryContent(i).getBytes(UTF_8);
+ totalLen += entryContent.length;
+ data.writeInt(entryContent.length);
+ data.writeLong(i);
+ data.writeBytes(entryContent);
+ }
+ // Build metadata.
+ LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
+ .withEnsembleSize(1)
+ .withWriteQuorumSize(1)
+ .withAckQuorumSize(1)
+ .withDigestType(DigestType.CRC32C)
+ .withPassword("pwd".getBytes(UTF_8))
+ .withClosedState()
+ .withLastEntryId(entries)
+ .withLength(totalLen)
+ .newEnsembleEntry(0L,
Arrays.asList(BookieId.parse("127.0.0.1:3181")))
+ .build();
+ BackedInputStreamImpl inputStream = new BackedInputStreamImpl(data);
+ // Since we have written data to "offsetsCache", the index will never
be used.
+ OffloadIndexBlock mockIndex = mock(OffloadIndexBlock.class);
+ when(mockIndex.getLedgerMetadata()).thenReturn(metadata);
+ for (Pair<Integer, Integer> pair : offsets) {
+ when(mockIndex.getIndexEntryForEntry(pair.getLeft())).thenReturn(
+ OffloadIndexEntryImpl.of(pair.getLeft(), 0,
pair.getRight(), 0));
+ }
+ // Build obj.
+ return Pair.of(new BlobStoreBackedReadHandleImpl(ledgerId, mockIndex,
inputStream, executor, offsetsCache),
+ data);
+ }
+
+ private static class BackedInputStreamImpl extends BackedInputStream {
+
+ private ByteBuf data;
+
+ private BackedInputStreamImpl(ByteBuf data){
+ this.data = data;
+ }
+
+ @Override
+ public void seek(long position) {
+ data.readerIndex((int) position);
+ }
+
+ @Override
+ public void seekForward(long position) throws IOException {
+ data.readerIndex((int) position);
+ }
+
+ @Override
+ public long getCurrentPosition() {
+ return data.readerIndex();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (data.readableBytes() == 0) {
+ throw new EOFException("The input-stream has no bytes to
read");
+ }
+ return data.readByte();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return data.readableBytes();
+ }
+ }
+
+ @DataProvider
+ public Object[][] streamStartAt() {
+ return new Object[][] {
+ // It gives a 0 value of the entry length.
+ { 0, false },
+ // It gives a 0 value of the entry length.
+ { 1, false },
+ // The first entry starts at 128.
+ { 128, false },
+ // It gives a 0 value of the entry length.
+ { 0, true },
+ // It gives a 0 value of the entry length.
+ { 1, true },
+ // The first entry starts at 128.
+ { 128, true }
+ };
+ }
+
+ @Test(dataProvider = "streamStartAt")
+ public void testRead(int streamStartAt, boolean hasDirtyData) throws
Exception {
+ int entryCount = 5;
+ Pair<BlobStoreBackedReadHandleImpl, ByteBuf> ledgerDataPair =
+ createReadHandle(1, entryCount, hasDirtyData);
+ BlobStoreBackedReadHandleImpl ledger = ledgerDataPair.getLeft();
+ ByteBuf data = ledgerDataPair.getRight();
+ data.readerIndex(streamStartAt);
+ // Teat read each entry.
+ for (int i = 0; i < 5; i++) {
+ LedgerEntries entries = ledger.read(i, i);
+ assertEquals(new
String(entries.iterator().next().getEntryBytes()), getExpectedEntryContent(i));
+ }
+ // Test read all entries.
+ LedgerEntries entries1 = ledger.read(0, entryCount - 1);
+ Iterator<LedgerEntry> iterator1 = entries1.iterator();
+ for (int i = 0; i < entryCount; i++) {
+ assertEquals(new String(iterator1.next().getEntryBytes()),
getExpectedEntryContent(i));
+ }
+ // Test a special case.
+ // 1. Read from 0 to "lac - 1".
+ // 2. Any reading.
+ LedgerEntries entries2 = ledger.read(0, entryCount - 2);
+ Iterator<LedgerEntry> iterator2 = entries2.iterator();
+ for (int i = 0; i < entryCount - 1; i++) {
+ assertEquals(new String(iterator2.next().getEntryBytes()),
getExpectedEntryContent(i));
+ }
+ LedgerEntries entries3 = ledger.read(0, entryCount - 1);
+ Iterator<LedgerEntry> iterator3 = entries3.iterator();
+ for (int i = 0; i < entryCount; i++) {
+ assertEquals(new String(iterator3.next().getEntryBytes()),
getExpectedEntryContent(i));
+ }
+ // cleanup.
+ ledger.close();
+ }
+}