This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ddfd46a0b15 [fix][ml]Fix EOFException after enabled topics offloading 
(#24753)
ddfd46a0b15 is described below

commit ddfd46a0b15ded1f2aac02d33100086e2a77551e
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 22 23:48:42 2025 +0800

    [fix][ml]Fix EOFException after enabled topics offloading (#24753)
    
    (cherry picked from commit 3a9876353df9e3e3699efdb63c1de96f8d897ab0)
---
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 235 +++++++++++++++------
 .../offload/jcloud/impl/OffloadIndexEntryImpl.java |  15 ++
 .../impl/BlobStoreBackedReadHandleImplTest.java    | 212 +++++++++++++++++++
 3 files changed, 400 insertions(+), 62 deletions(-)

diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 916e33c3642..77bcfa5cdd8 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.naming.TopicName;
@@ -79,7 +80,8 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle, OffloadedLedge
 
     private volatile long lastAccessTimestamp = System.currentTimeMillis();
 
-    private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock 
index,
+    @VisibleForTesting
+    BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
                                           BackedInputStream inputStream, 
ExecutorService executor,
                                           OffsetsCache entryOffsetsCache) {
         this.ledgerId = ledgerId;
@@ -121,24 +123,20 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle, OffloadedLedge
         return promise;
     }
 
-    @Override
-    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
-        if (log.isDebugEnabled()) {
-            log.debug("Ledger {}: reading {} - {} ({} entries}",
-                    getId(), firstEntry, lastEntry, (1 + lastEntry - 
firstEntry));
+    private class ReadTask implements Runnable {
+        private final long firstEntry;
+        private final long lastEntry;
+        private final CompletableFuture<LedgerEntries> promise;
+        private int seekedAndTryTimes = 0;
+
+        public ReadTask(long firstEntry, long lastEntry, 
CompletableFuture<LedgerEntries> promise) {
+            this.firstEntry = firstEntry;
+            this.lastEntry = lastEntry;
+            this.promise = promise;
         }
-        CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
 
-        // Ledger handles will be only marked idle when "pendingRead" is "0", 
it is not needed to update
-        // "lastAccessTimestamp" if "pendingRead" is larger than "0".
-        // Rather than update "lastAccessTimestamp" when starts a reading, 
updating it when a reading task is finished
-        // is better.
-        PENDING_READ_UPDATER.incrementAndGet(this);
-        promise.whenComplete((__, ex) -> {
-            lastAccessTimestamp = System.currentTimeMillis();
-            
PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this);
-        });
-        executor.execute(() -> {
+        @Override
+        public void run() {
             if (state == State.Closed) {
                 log.warn("Reading a closed read handler. Ledger ID: {}, Read 
range: {}-{}",
                         ledgerId, firstEntry, lastEntry);
@@ -146,83 +144,191 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle, OffloadedLedge
                 return;
             }
 
-            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
-            boolean seeked = false;
+            List<LedgerEntry> entryCollector = new ArrayList<LedgerEntry>();
             try {
                 if (firstEntry > lastEntry
-                    || firstEntry < 0
-                    || lastEntry > getLastAddConfirmed()) {
+                        || firstEntry < 0
+                        || lastEntry > getLastAddConfirmed()) {
                     promise.completeExceptionally(new 
BKException.BKIncorrectParameterException());
                     return;
                 }
                 long entriesToRead = (lastEntry - firstEntry) + 1;
-                long nextExpectedId = firstEntry;
-
-                // checking the data stream has enough data to read to avoid 
throw EOF exception when reading data.
-                // 12 bytes represent the stream have the length and entryID 
to read.
-                if (dataStream.available() < 12) {
-                    log.warn("There hasn't enough data to read, current 
available data has {} bytes,"
-                        + " seek to the first entry {} to avoid EOF 
exception", inputStream.available(), firstEntry);
-                    seekToEntry(firstEntry);
-                }
+                long expectedEntryId = firstEntry;
+                seekToEntryOffset(firstEntry);
+                seekedAndTryTimes++;
 
                 while (entriesToRead > 0) {
                     long currentPosition = inputStream.getCurrentPosition();
                     int length = dataStream.readInt();
                     if (length < 0) { // hit padding or new block
-                        seekToEntry(nextExpectedId);
+                        seekToEntryOffset(expectedEntryId);
                         continue;
                     }
                     long entryId = dataStream.readLong();
-
-                    if (entryId == nextExpectedId) {
+                    if (entryId == expectedEntryId) {
                         entryOffsetsCache.put(ledgerId, entryId, 
currentPosition);
                         ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
-                        entries.add(LedgerEntryImpl.create(ledgerId, entryId, 
length, buf));
+                        entryCollector.add(LedgerEntryImpl.create(ledgerId, 
entryId, length, buf));
                         int toWrite = length;
                         while (toWrite > 0) {
                             toWrite -= buf.writeBytes(dataStream, toWrite);
                         }
                         entriesToRead--;
-                        nextExpectedId++;
-                    } else if (entryId > nextExpectedId && entryId < 
lastEntry) {
-                        log.warn("The read entry {} is not the expected entry 
{} but in the range of {} - {},"
-                            + " seeking to the right position", entryId, 
nextExpectedId, nextExpectedId, lastEntry);
-                        seekToEntry(nextExpectedId);
-                    } else if (entryId < nextExpectedId
-                        && 
!index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId)))
 {
-                        log.warn("Read an unexpected entry id {} which is 
smaller than the next expected entry id {}"
-                        + ", seeking to the right position", entryId, 
nextExpectedId);
-                        seekToEntry(nextExpectedId);
-                    } else if (entryId > lastEntry) {
-                        // in the normal case, the entry id should increment 
in order. But if there has random access in
-                        // the read method, we should allow to seek to the 
right position and the entry id should
-                        // never over to the last entry again.
-                        if (!seeked) {
-                            seekToEntry(nextExpectedId);
-                            seeked = true;
-                            continue;
-                        }
-                        log.info("Expected to read {}, but read {}, which is 
greater than last entry {}",
-                            nextExpectedId, entryId, lastEntry);
-                        throw new BKException.BKUnexpectedConditionException();
+                        expectedEntryId++;
                     } else {
-                        long ignore = inputStream.skip(length);
+                        handleUnexpectedEntryId(expectedEntryId, entryId);
                     }
                 }
-
-                promise.complete(LedgerEntriesImpl.create(entries));
+                promise.complete(LedgerEntriesImpl.create(entryCollector));
             } catch (Throwable t) {
-                log.error("Failed to read entries {} - {} from the offloader 
in ledger {}",
-                    firstEntry, lastEntry, ledgerId, t);
+                log.error("Failed to read entries {} - {} from the offloader 
in ledger {}, current position of input"
+                        + " stream is {}", firstEntry, lastEntry, ledgerId, 
inputStream.getCurrentPosition(), t);
                 if (t instanceof KeyNotFoundException) {
                     promise.completeExceptionally(new 
BKException.BKNoSuchLedgerExistsException());
                 } else {
                     promise.completeExceptionally(t);
                 }
-                entries.forEach(LedgerEntry::close);
+                entryCollector.forEach(LedgerEntry::close);
+            }
+        }
+
+        // in the normal case, the entry id should increment in order. But if 
there has random access in
+        // the read method, we should allow to seek to the right position and 
the entry id should
+        // never over to the last entry again.
+        private void handleUnexpectedEntryId(long expectedId, long actEntryId) 
throws Exception {
+            LedgerMetadata ledgerMetadata = getLedgerMetadata();
+            OffloadIndexEntry offsetOfExpectedId = 
index.getIndexEntryForEntry(expectedId);
+            OffloadIndexEntry offsetOfActId = actEntryId <= 
getLedgerMetadata().getLastEntryId() && actEntryId >= 0
+                    ? index.getIndexEntryForEntry(actEntryId) : null;
+            String logLine = String.format("Failed to read [ %s ~ %s ] of the 
ledger %s."
+                    + " Because got a incorrect entry id %s, the offset is %s."
+                    + " The expected entry id is %s, the offset is %s."
+                    + " Have seeked and retry read times: %s. LAC is %s.",
+                    firstEntry, lastEntry, ledgerId,
+                    actEntryId, offsetOfActId == null ? "null because it does 
not exist"
+                            : String.valueOf(offsetOfActId),
+                    expectedId, String.valueOf(offsetOfExpectedId),
+                    seekedAndTryTimes, ledgerMetadata != null ? 
ledgerMetadata.getLastEntryId() : "unknown");
+            // If it still fails after tried entries count times, throw the 
exception.
+            long maxTryTimes = Math.max(3, (lastEntry - firstEntry + 1) >> 2);
+            if (seekedAndTryTimes > maxTryTimes) {
+                log.error(logLine);
+                throw new BKException.BKUnexpectedConditionException();
+            } else {
+                log.warn(logLine);
+            }
+            seekToEntryOffset(expectedId);
+            seekedAndTryTimes++;
+        }
+
+        private void skipPreviousEntry(long startEntryId, long 
expectedEntryId) throws IOException, BKException {
+            long nextExpectedEntryId = startEntryId;
+            while (nextExpectedEntryId < expectedEntryId) {
+                long offset = inputStream.getCurrentPosition();
+                int len = dataStream.readInt();
+                if (len < 0) {
+                    LedgerMetadata ledgerMetadata = getLedgerMetadata();
+                    OffloadIndexEntry offsetOfExpectedId = 
index.getIndexEntryForEntry(expectedEntryId);
+                    log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+                        + " Because failed to skip a previous entry {}, len: 
{}, got a negative len."
+                        + " The expected entry id is {}, the offset is {}."
+                        + " Have seeked and retry read times: {}. LAC is {}.",
+                        firstEntry, lastEntry, ledgerId,
+                        nextExpectedEntryId, len,
+                        expectedEntryId, String.valueOf(offsetOfExpectedId),
+                        seekedAndTryTimes, ledgerMetadata != null ? 
ledgerMetadata.getLastEntryId() : "unknown");
+                    throw new BKException.BKUnexpectedConditionException();
+                }
+                long entryId = dataStream.readLong();
+                if (entryId == nextExpectedEntryId) {
+                    long skipped = inputStream.skip(len);
+                    if (skipped != len) {
+                        LedgerMetadata ledgerMetadata = getLedgerMetadata();
+                        OffloadIndexEntry offsetOfExpectedId = 
index.getIndexEntryForEntry(expectedEntryId);
+                        log.error("Failed to read [ {} ~ {} ] of the ledger 
{}."
+                            + " Because failed to skip a previous entry {}, 
offset: {}, len: {}, there is no more data."
+                            + " The expected entry id is {}, the offset is {}."
+                            + " Have seeked and retry read times: {}. LAC is 
{}.",
+                            firstEntry, lastEntry, ledgerId,
+                            entryId, offset, len,
+                            expectedEntryId, 
String.valueOf(offsetOfExpectedId),
+                            seekedAndTryTimes, ledgerMetadata != null ? 
ledgerMetadata.getLastEntryId() : "unknown");
+                        throw new BKException.BKUnexpectedConditionException();
+                    }
+                    nextExpectedEntryId++;
+                } else {
+                    LedgerMetadata ledgerMetadata = getLedgerMetadata();
+                    OffloadIndexEntry offsetOfExpectedId = 
index.getIndexEntryForEntry(expectedEntryId);
+                    log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+                        + " Because got a incorrect entry id {},."
+                        + " The expected entry id is {}, the offset is {}."
+                        + " Have seeked and retry read times: {}. LAC is {}.",
+                        firstEntry, lastEntry, ledgerId,
+                        entryId, expectedEntryId, 
String.valueOf(offsetOfExpectedId),
+                        seekedAndTryTimes, ledgerMetadata != null ? 
ledgerMetadata.getLastEntryId() : "unknown");
+                    throw new BKException.BKUnexpectedConditionException();
+                }
+            }
+        }
+
+        private void seekToEntryOffset(long expectedEntryId) throws 
IOException, BKException {
+            // 1. Try to find the precise index.
+            // 1-1. Precise cached indexes.
+            Long cachedPreciseIndex = entryOffsetsCache.getIfPresent(ledgerId, 
expectedEntryId);
+            if (cachedPreciseIndex != null) {
+                inputStream.seek(cachedPreciseIndex);
+                return;
+            }
+            // 1-2. Precise persistent indexes.
+            OffloadIndexEntry indexOfNearestEntry = 
index.getIndexEntryForEntry(expectedEntryId);
+            if (indexOfNearestEntry.getEntryId() == expectedEntryId) {
+                inputStream.seek(indexOfNearestEntry.getDataOffset());
+                return;
             }
+            // 2. Try to use the previous index. Since the entry-0 must have a 
precise index, we can skip to check
+            //    whether "expectedEntryId" is larger than 0;
+            Long cachedPreviousKnownOffset = 
entryOffsetsCache.getIfPresent(ledgerId, expectedEntryId - 1);
+            if (cachedPreviousKnownOffset != null) {
+                inputStream.seek(cachedPreviousKnownOffset);
+                skipPreviousEntry(expectedEntryId - 1, expectedEntryId);
+                return;
+            }
+            // 3. Use the persistent index of the nearest entry that is 
smaller than "expectedEntryId".
+            //    Because it is a sparse index, some entries need to be 
skipped.
+            if (indexOfNearestEntry.getEntryId() < expectedEntryId) {
+                inputStream.seek(indexOfNearestEntry.getDataOffset());
+                skipPreviousEntry(indexOfNearestEntry.getEntryId(), 
expectedEntryId);
+            } else {
+                LedgerMetadata ledgerMetadata = getLedgerMetadata();
+                log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+                    + " Because got a incorrect index {} of the entry {}, 
which is greater than expected."
+                    + " Have seeked and retry read times: {}. LAC is {}.",
+                    firstEntry, lastEntry, ledgerId,
+                    String.valueOf(indexOfNearestEntry), expectedEntryId,
+                    seekedAndTryTimes, ledgerMetadata != null ? 
ledgerMetadata.getLastEntryId() : "unknown");
+                throw new BKException.BKUnexpectedConditionException();
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
+        if (log.isDebugEnabled()) {
+            log.debug("Ledger {}: reading {} - {} ({} entries}",
+                    getId(), firstEntry, lastEntry, (1 + lastEntry - 
firstEntry));
+        }
+        CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+
+        // Ledger handles will be only marked idle when "pendingRead" is "0", 
it is not needed to update
+        // "lastAccessTimestamp" if "pendingRead" is larger than "0".
+        // Rather than update "lastAccessTimestamp" when starts a reading, 
updating it when a reading task is finished
+        // is better.
+        PENDING_READ_UPDATER.incrementAndGet(this);
+        promise.whenComplete((__, ex) -> {
+            lastAccessTimestamp = System.currentTimeMillis();
+            
PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this);
         });
+        executor.execute(new ReadTask(firstEntry, lastEntry, promise));
         return promise;
     }
 
@@ -238,6 +344,11 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle, OffloadedLedge
         }
     }
 
+    private void seekToEntry(OffloadIndexEntry offloadIndexEntry) throws 
IOException {
+        long dataOffset = offloadIndexEntry.getDataOffset();
+        inputStream.seek(dataOffset);
+    }
+
     @Override
     public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
         return readAsync(firstEntry, lastEntry);
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
index 20e9dd68cd2..2faffa7e25c 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
+import java.util.Objects;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 
 /**
@@ -65,5 +66,19 @@ public class OffloadIndexEntryImpl implements 
OffloadIndexEntry {
         return String.format("[eid:%d, part:%d, offset:%d, doffset:%d]",
                 entryId, partId, offset, getDataOffset());
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof OffloadIndexEntryImpl that)) {
+            return false;
+        }
+        return entryId == that.entryId && partId == that.partId && offset == 
that.offset
+                && blockHeaderSize == that.blockHeaderSize;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entryId, partId, offset, blockHeaderSize);
+    }
 }
 
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java
new file mode 100644
index 00000000000..bf116e5aeca
--- /dev/null
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class BlobStoreBackedReadHandleImplTest {
+
+    private OffsetsCache offsetsCache = new OffsetsCache();
+
+    private ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(2);
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        if (executor != null) {
+            executor.shutdown();
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+        }
+        if (offsetsCache != null) {
+            offsetsCache.close();
+        }
+    }
+
+    @AfterClass
+    public void clearCache() throws Exception {
+        offsetsCache.clear();
+    }
+
+    private String getExpectedEntryContent(int entryId) {
+        return "Entry " + entryId;
+    }
+
+    private Pair<BlobStoreBackedReadHandleImpl, ByteBuf> createReadHandle(
+            long ledgerId, int entries, boolean hasDirtyData) throws Exception 
{
+        // Build data.
+        List<Pair<Integer, Integer>> offsets = new ArrayList<>();
+        int totalLen = 0;
+        ByteBuf data = ByteBufAllocator.DEFAULT.heapBuffer(1024);
+        data.writeInt(0);
+        data.writerIndex(128);
+        //data.readerIndex(128);
+        for (int i = 0; i < entries; i++) {
+            if (hasDirtyData && i == 1) {
+                data.writeBytes("dirty data".getBytes(UTF_8));
+            }
+            offsets.add(Pair.of(i, data.writerIndex()));
+            offsetsCache.put(ledgerId, i, data.writerIndex());
+            byte[] entryContent = getExpectedEntryContent(i).getBytes(UTF_8);
+            totalLen += entryContent.length;
+            data.writeInt(entryContent.length);
+            data.writeLong(i);
+            data.writeBytes(entryContent);
+        }
+        // Build metadata.
+        LedgerMetadata metadata = LedgerMetadataBuilder.create()
+                .withId(ledgerId)
+                .withEnsembleSize(1)
+                .withWriteQuorumSize(1)
+                .withAckQuorumSize(1)
+                .withDigestType(DigestType.CRC32C)
+                .withPassword("pwd".getBytes(UTF_8))
+                .withClosedState()
+                .withLastEntryId(entries)
+                .withLength(totalLen)
+                .newEnsembleEntry(0L, 
Arrays.asList(BookieId.parse("127.0.0.1:3181")))
+                .build();
+        BackedInputStreamImpl inputStream = new BackedInputStreamImpl(data);
+        // Since we have written data to "offsetsCache", the index will never 
be used.
+        OffloadIndexBlock mockIndex = mock(OffloadIndexBlock.class);
+        when(mockIndex.getLedgerMetadata()).thenReturn(metadata);
+        for (Pair<Integer, Integer> pair : offsets) {
+            when(mockIndex.getIndexEntryForEntry(pair.getLeft())).thenReturn(
+                    OffloadIndexEntryImpl.of(pair.getLeft(), 0, 
pair.getRight(), 0));
+        }
+        // Build obj.
+        return Pair.of(new BlobStoreBackedReadHandleImpl(ledgerId, mockIndex, 
inputStream, executor, offsetsCache),
+                data);
+    }
+
+    private static class BackedInputStreamImpl extends BackedInputStream {
+
+        private ByteBuf data;
+
+        private BackedInputStreamImpl(ByteBuf data){
+            this.data = data;
+        }
+
+        @Override
+        public void seek(long position) {
+            data.readerIndex((int) position);
+        }
+
+        @Override
+        public void seekForward(long position) throws IOException {
+            data.readerIndex((int) position);
+        }
+
+        @Override
+        public long getCurrentPosition() {
+            return data.readerIndex();
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (data.readableBytes() == 0) {
+                throw new EOFException("The input-stream has no bytes to 
read");
+            }
+            return data.readByte();
+        }
+
+        @Override
+        public int available() throws IOException {
+            return data.readableBytes();
+        }
+    }
+
+    @DataProvider
+    public Object[][] streamStartAt() {
+        return new Object[][] {
+            // It gives a 0 value of the entry length.
+            { 0, false },
+            // It gives a 0 value of the entry length.
+            { 1, false },
+            // The first entry starts at 128.
+            { 128, false },
+            // It gives a 0 value of the entry length.
+            { 0, true },
+            // It gives a 0 value of the entry length.
+            { 1, true },
+            // The first entry starts at 128.
+            { 128, true }
+        };
+    }
+
+    @Test(dataProvider = "streamStartAt")
+    public void testRead(int streamStartAt, boolean hasDirtyData) throws 
Exception {
+        int entryCount = 5;
+        Pair<BlobStoreBackedReadHandleImpl, ByteBuf> ledgerDataPair =
+                createReadHandle(1, entryCount, hasDirtyData);
+        BlobStoreBackedReadHandleImpl ledger = ledgerDataPair.getLeft();
+        ByteBuf data = ledgerDataPair.getRight();
+        data.readerIndex(streamStartAt);
+        // Teat read each entry.
+        for (int i = 0; i < 5; i++) {
+            LedgerEntries entries = ledger.read(i, i);
+            assertEquals(new 
String(entries.iterator().next().getEntryBytes()), getExpectedEntryContent(i));
+        }
+        // Test read all entries.
+        LedgerEntries entries1 = ledger.read(0, entryCount - 1);
+        Iterator<LedgerEntry> iterator1 = entries1.iterator();
+        for (int i = 0; i < entryCount; i++) {
+            assertEquals(new String(iterator1.next().getEntryBytes()), 
getExpectedEntryContent(i));
+        }
+        // Test a special case.
+        // 1. Read from 0 to "lac - 1".
+        // 2. Any reading.
+        LedgerEntries entries2 = ledger.read(0, entryCount - 2);
+        Iterator<LedgerEntry> iterator2 = entries2.iterator();
+        for (int i = 0; i < entryCount - 1; i++) {
+            assertEquals(new String(iterator2.next().getEntryBytes()), 
getExpectedEntryContent(i));
+        }
+        LedgerEntries entries3 = ledger.read(0, entryCount - 1);
+        Iterator<LedgerEntry> iterator3 = entries3.iterator();
+        for (int i = 0; i < entryCount; i++) {
+            assertEquals(new String(iterator3.next().getEntryBytes()), 
getExpectedEntryContent(i));
+        }
+        // cleanup.
+        ledger.close();
+    }
+}

Reply via email to