This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new dbc8221b462 feat: introduce SegmentRangeReader interface and
PartialSegmentFileMapperV10 (#19282)
dbc8221b462 is described below
commit dbc8221b462d8738d26507406b48b8f137228ac3
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Apr 17 02:16:39 2026 -0700
feat: introduce SegmentRangeReader interface and
PartialSegmentFileMapperV10 (#19282)
changes:
* adds the building blocks for supporting partial segment download when
using vsf segment cache
* adds new `SegmentRangeReader` extension point interface for byte-range
reads from segment files in deep storage
* adds `PartialSegmentFileMapperV10` a `SegmentFileMapper` implementation
that downloads internal files on demand from deep storage via
`SegmentRangeReader`, not wired to anything yet other than tests
* extracted `SegmentFileMetadataReader` which is a shared utility for
parsing V10 header + metadata from any `InputStream` from
`SegmentFileMapperV10.create()` so it can be shared with
`PartialSegmentFileMapperV10`
* adds `openRangeReader()` method to `LoadSpec` with a default
implementation that returns null
* `SegmentFileMetadata` now interns string keys in files and column
descriptor maps using `SmooshedFileMapper.STRING_INTERNER`
---
.../segment/file/PartialSegmentFileMapperV10.java | 563 +++++++++++++++++++
.../druid/segment/file/SegmentFileBuilderV10.java | 32 +-
.../druid/segment/file/SegmentFileMapperV10.java | 118 ++--
.../druid/segment/file/SegmentFileMetadata.java | 19 +-
.../segment/file/SegmentFileMetadataReader.java | 170 ++++++
.../org/apache/druid/segment/loading/LoadSpec.java | 20 +
.../druid/segment/loading/SegmentRangeReader.java | 52 ++
.../file/PartialSegmentFileMapperV10Test.java | 612 +++++++++++++++++++++
8 files changed, 1489 insertions(+), 97 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java
b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java
new file mode 100644
index 00000000000..79d52ab384f
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A {@link SegmentFileMapper} that downloads internal files on demand from
deep storage via a
+ * {@link SegmentRangeReader}. This enables partial segment downloads where
only the files needed for a query are
+ * fetched, rather than downloading the entire segment.
+ * <p>
+ * Locally, this mapper mirrors the original V10 container structure: each
container from the segment file is
+ * represented as a local sparse file at its original size, and only the byte
ranges for downloaded internal files are
+ * populated. This means the number of local files (and mmaps) equals the
number of containers (typically 1-3 at
+ * 'standard' segment sizes) rather than the number of internal files (which
could be hundreds).
+ * <p>
+ * The {@link MappedByteBuffer} for each container is created once and then
the channel is closed immediately (same
+ * pattern as {@link SegmentFileMapperV10}). Writes of downloaded file data
use short-lived {@link RandomAccessFile}
+ * instances. The mmap reflects writes through the shared page cache.
+ * <p>
+ * State is persisted to disk so that the mapper can be restored after a
process restart without re-fetching metadata
+ * from deep storage. The raw V10 header bytes are written to a local file,
and a compact bitmap file is appended to
+ * the end of it to track which internal files have been downloaded (one bit
per file, updated after each download). On
+ * subsequent calls, the metadata is parsed from the local file instead of
range-reading from deep storage.
+ * <p>
+ * External segment files are supported via child {@link
PartialSegmentFileMapperV10} instances, each targeting a
+ * different file in the segment's storage location.
+ * <p>
+ * Thread-safe for concurrent access from multiple queries. Per-file locks
prevent duplicate downloads of the same
+ * internal file.
+ *
+ * @see SegmentFileMapperV10
+ * @see SegmentRangeReader
+ */
+public class PartialSegmentFileMapperV10 implements SegmentFileMapper
+{
+ static final String METADATA_HEADER_SUFFIX = ".header";
+
+ /**
+ * Create (or restore) a lazy mapper for the main segment file with attached
external file mappers. If persisted state
+ * exists locally from a previous session, metadata is read from disk.
Otherwise, metadata is fetched from deep
+ * storage via range reads and persisted locally.
+ */
+ public static PartialSegmentFileMapperV10 create(
+ SegmentRangeReader rangeReader,
+ ObjectMapper jsonMapper,
+ File localCacheDir,
+ String targetFilename,
+ List<String> externals
+ ) throws IOException
+ {
+ final PartialSegmentFileMapperV10 entryPoint = createForFile(
+ rangeReader,
+ jsonMapper,
+ localCacheDir,
+ targetFilename
+ );
+
+ final Map<String, PartialSegmentFileMapperV10> externalMappers = new
HashMap<>();
+ try {
+ for (String filename : externals) {
+ externalMappers.put(
+ filename,
+ createForFile(rangeReader, jsonMapper, localCacheDir, filename)
+ );
+ }
+ }
+ catch (Throwable t) {
+ Closer closer = Closer.create();
+ closer.register(entryPoint);
+ closer.registerAll(externalMappers.values());
+ throw CloseableUtils.closeAndWrapInCatch(t, closer);
+ }
+
+ entryPoint.externalMappers.putAll(externalMappers);
+ return entryPoint;
+ }
+
+ @VisibleForTesting
+ static PartialSegmentFileMapperV10 createForFile(
+ SegmentRangeReader rangeReader,
+ ObjectMapper jsonMapper,
+ File localCacheDir,
+ String targetFilename
+ ) throws IOException
+ {
+ FileUtils.mkdirp(localCacheDir);
+ final File headerFile = new File(localCacheDir, targetFilename +
METADATA_HEADER_SUFFIX);
+
+ // try to load from existing local file, re-fetching from deep storage if
missing or corrupted
+ SegmentFileMetadataReader.Result result = null;
+ MappedByteBuffer bitmapBuffer = null;
+
+ if (headerFile.exists()) {
+ try {
+ result = parseHeaderFile(headerFile, jsonMapper);
+ bitmapBuffer = mmapBitmap(headerFile, result);
+ }
+ catch (Exception e) {
+ // corrupted file (partial write, truncated bitmap, bad JSON, etc.) —
delete and re-fetch
+ result = null;
+ headerFile.delete();
+ }
+ }
+
+ if (result == null) {
+ fetchAndPersistHeader(rangeReader, targetFilename, headerFile);
+ result = parseHeaderFile(headerFile, jsonMapper);
+ bitmapBuffer = mmapBitmap(headerFile, result);
+ }
+
+ final PartialSegmentFileMapperV10 mapper = new PartialSegmentFileMapperV10(
+ result.getMetadata(),
+ result.getHeaderSize(),
+ rangeReader,
+ targetFilename,
+ localCacheDir,
+ bitmapBuffer
+ );
+
+ // restore downloaded files from the bitmap
+ for (int i = 0; i < mapper.sortedFileNames.size(); i++) {
+ final int byteIndex = i / 8;
+ final int bitIndex = i % 8;
+ if ((bitmapBuffer.get(byteIndex) & (1 << bitIndex)) != 0) {
+ final String name = mapper.sortedFileNames.get(i);
+ final SegmentInternalFileMetadata fileMetadata =
result.getMetadata().getFiles().get(name);
+ if (fileMetadata != null) {
+ mapper.ensureContainerInitialized(fileMetadata.getContainer());
+ mapper.downloadedFiles.add(name);
+ mapper.downloadedBytes.addAndGet(fileMetadata.getSize());
+ }
+ }
+ }
+
+ return mapper;
+ }
+
+ private final SegmentFileMetadata metadata;
+ private final long headerSize;
+ private final SegmentRangeReader rangeReader;
+ private final String targetFilename;
+ private final File localCacheDir;
+
+ // stable sorted ordering of file names for bitmap indexing
+ private final List<String> sortedFileNames;
+ private final Map<String, Integer> fileNameToIndex;
+
+ // per-container state, lazily initialized
+ private final MappedByteBuffer[] containers;
+ private final File[] containerFiles;
+ private final ReentrantLock[] containerLocks;
+
+ // external file mappers
+ private final Map<String, PartialSegmentFileMapperV10> externalMappers = new
HashMap<>();
+
+ // track which internal files have been downloaded
+ private final Set<String> downloadedFiles = ConcurrentHashMap.newKeySet();
+ private final ConcurrentHashMap<String, ReentrantLock> fileLocks = new
ConcurrentHashMap<>();
+ private final ReentrantLock bitmapLock;
+ private final MappedByteBuffer bitmapBuffer;
+ private final AtomicLong downloadedBytes = new AtomicLong(0);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private PartialSegmentFileMapperV10(
+ SegmentFileMetadata metadata,
+ long headerSize,
+ SegmentRangeReader rangeReader,
+ String targetFilename,
+ File localCacheDir,
+ MappedByteBuffer bitmapBuffer
+ )
+ {
+ this.metadata = metadata;
+ this.headerSize = headerSize;
+ this.rangeReader = rangeReader;
+ this.targetFilename = targetFilename;
+ this.localCacheDir = localCacheDir;
+ this.bitmapBuffer = bitmapBuffer;
+
+ // build stable file name ordering for bitmap indexing
+ this.sortedFileNames = new ArrayList<>(new
TreeSet<>(metadata.getFiles().keySet()));
+ this.fileNameToIndex = new HashMap<>();
+ for (int i = 0; i < sortedFileNames.size(); i++) {
+ fileNameToIndex.put(sortedFileNames.get(i), i);
+ }
+
+ final int numContainers = metadata.getContainers().size();
+ this.containers = new MappedByteBuffer[numContainers];
+ this.containerFiles = new File[numContainers];
+ this.containerLocks = new ReentrantLock[numContainers];
+ for (int i = 0; i < numContainers; i++) {
+ this.containerLocks[i] = new ReentrantLock();
+ }
+ this.bitmapLock = new ReentrantLock();
+ }
+
+ public SegmentFileMetadata getSegmentFileMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public Set<String> getInternalFilenames()
+ {
+ return metadata.getFiles().keySet();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer mapFile(String name) throws IOException
+ {
+ checkClosed();
+
+ final SegmentInternalFileMetadata fileMetadata =
metadata.getFiles().get(name);
+ if (fileMetadata == null) {
+ return null;
+ }
+
+ ensureFileDownloaded(name, fileMetadata);
+
+ // slice from the container mmap
+ final MappedByteBuffer container = containers[fileMetadata.getContainer()];
+ final ByteBuffer view = container.asReadOnlyBuffer();
+ view.position(Ints.checkedCast(fileMetadata.getStartOffset()))
+ .limit(Ints.checkedCast(fileMetadata.getStartOffset() +
fileMetadata.getSize()));
+ return view.slice();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer mapExternalFile(String filename, String name) throws
IOException
+ {
+ checkClosed();
+ final PartialSegmentFileMapperV10 externalMapper =
externalMappers.get(filename);
+ if (externalMapper == null) {
+ throw DruidException.defensive("external file[%s] containing[%s] not
found", filename, name);
+ }
+ return externalMapper.mapFile(name);
+ }
+
+ /**
+ * Pre-download a set of internal files so that subsequent {@link
#mapFile(String)} calls for these files will not
+ * trigger individual downloads. Files that are already downloaded are
skipped. This is useful for batch-downloading
+ * all files for a projection at once.
+ */
+ public void ensureFilesAvailable(Set<String> fileNames) throws IOException
+ {
+ for (String name : fileNames) {
+ final SegmentInternalFileMetadata fileMetadata =
metadata.getFiles().get(name);
+ if (fileMetadata != null) {
+ ensureFileDownloaded(name, fileMetadata);
+ }
+ }
+ }
+
+ /**
+ * Total bytes downloaded so far across all internal files, including
external mappers.
+ */
+ public long getDownloadedBytes()
+ {
+ long total = downloadedBytes.get();
+ for (PartialSegmentFileMapperV10 ext : externalMappers.values()) {
+ total += ext.getDownloadedBytes();
+ }
+ return total;
+ }
+
+ @Override
+ public void close()
+ {
+ if (closed.compareAndSet(false, true)) {
+ final Closer closer = Closer.create();
+ closer.register(() -> ByteBufferUtils.unmap(bitmapBuffer));
+ for (MappedByteBuffer buffer : containers) {
+ if (buffer != null) {
+ closer.register(() -> ByteBufferUtils.unmap(buffer));
+ }
+ }
+ closer.registerAll(externalMappers.values());
+ CloseableUtils.closeAndWrapExceptions(closer);
+ }
+ }
+
+ private void checkClosed()
+ {
+ if (closed.get()) {
+ throw DruidException.defensive("PartialSegmentFileMapperV10 is closed");
+ }
+ }
+
+ /**
+ * Compute the absolute byte offset of an internal file within the segment
file in deep storage.
+ */
+ private long computeAbsoluteOffset(SegmentInternalFileMetadata fileMetadata)
+ {
+ final SegmentFileContainerMetadata container =
metadata.getContainers().get(fileMetadata.getContainer());
+ return headerSize + container.getStartOffset() +
fileMetadata.getStartOffset();
+ }
+
+ private void ensureFileDownloaded(String name, SegmentInternalFileMetadata
fileMetadata) throws IOException
+ {
+ // already downloaded, nothing to do
+ if (downloadedFiles.contains(name)) {
+ return;
+ }
+
+ final ReentrantLock lock = fileLocks.computeIfAbsent(name, k -> new
ReentrantLock());
+ lock.lock();
+ try {
+ checkClosed();
+
+ if (downloadedFiles.contains(name)) {
+ return;
+ }
+
+ ensureContainerInitialized(fileMetadata.getContainer());
+ downloadFileToContainer(name, fileMetadata);
+ downloadedFiles.add(name);
+ markDownloadedInBitmap(name);
+ }
+ finally {
+ lock.unlock();
+ fileLocks.remove(name, lock);
+ }
+ }
+
+ /**
+ * Initialize a local container file if not already done. Creates a sparse
file at the original container size
+ * and memory-maps it. The channel is closed immediately after mapping, the
mmap persists independently, backed by
+ * the kernel page cache. This avoids the risk of channel closure from
thread interruption.
+ */
+ private void ensureContainerInitialized(int containerIndex) throws
IOException
+ {
+ if (containers[containerIndex] != null) {
+ return;
+ }
+
+ containerLocks[containerIndex].lock();
+ try {
+ if (containers[containerIndex] != null) {
+ return;
+ }
+
+ final SegmentFileContainerMetadata containerMeta =
metadata.getContainers().get(containerIndex);
+ final File localFile = new File(
+ localCacheDir,
+ StringUtils.format("%s.container.%05d", targetFilename,
containerIndex)
+ );
+
+ // create sparse file at original container size, mmap it, then close
the channel immediately.
+ // set containerFiles before containers so that when another thread sees
containers[i] != null
+ // (the fast-path check), containerFiles[i] is guaranteed to be set
already.
+ try (RandomAccessFile raf = new RandomAccessFile(localFile, "rw");
FileChannel channel = raf.getChannel()) {
+ raf.setLength(containerMeta.getSize());
+ containerFiles[containerIndex] = localFile;
+ containers[containerIndex] = channel.map(
+ FileChannel.MapMode.READ_ONLY,
+ 0,
+ containerMeta.getSize()
+ );
+ }
+ }
+ finally {
+ containerLocks[containerIndex].unlock();
+ }
+ }
+
+ /**
+ * Download an internal file from deep storage and write it to the correct
position in its local container file.
+ * Uses a short-lived {@link RandomAccessFile} for writing. The mmap sees
the written data through the shared page
+ * cache.
+ */
+ private void downloadFileToContainer(String name,
SegmentInternalFileMetadata fileMetadata) throws IOException
+ {
+ final long absoluteOffset = computeAbsoluteOffset(fileMetadata);
+ final long size = fileMetadata.getSize();
+
+ // stream directly from deep storage to the local container file to avoid
holding the entire file in heap
+ try (InputStream is = rangeReader.readRange(targetFilename,
absoluteOffset, size);
+ RandomAccessFile raf = new
RandomAccessFile(containerFiles[fileMetadata.getContainer()], "rw")) {
+ raf.seek(fileMetadata.getStartOffset());
+ final byte[] buf = new byte[8192];
+ long remaining = size;
+ while (remaining > 0) {
+ final int toRead = (int) Math.min(buf.length, remaining);
+ final int read = is.read(buf, 0, toRead);
+ if (read < 0) {
+ throw DruidException.defensive(
+ "unexpected end of stream for file[%s], expected[%s] more bytes",
+ name,
+ remaining
+ );
+ }
+ raf.write(buf, 0, read);
+ remaining -= read;
+ }
+ }
+
+ downloadedBytes.addAndGet(size);
+ }
+
+ /**
+ * Set the bit for a downloaded file in the bitmap. Single-byte
read-modify-write on the mmap under
+ * {@link #bitmapLock}. The OS flushes the mmap to disk.
+ */
+ private void markDownloadedInBitmap(String name)
+ {
+ final Integer index = fileNameToIndex.get(name);
+ if (index == null) {
+ return;
+ }
+ final int byteIndex = index / 8;
+ final int bitMask = 1 << (index % 8);
+
+ bitmapLock.lock();
+ try {
+ final byte existing = bitmapBuffer.get(byteIndex);
+ bitmapBuffer.put(byteIndex, (byte) (existing | bitMask));
+ }
+ finally {
+ bitmapLock.unlock();
+ }
+ }
+
+ /**
+ * Fetch the raw V10 header bytes from deep storage and write them to a
local file. The bitmap region is not
+ * included, it is created by {@link #mmapBitmap} after parsing. The file is
parseable by
+ * {@link SegmentFileMetadataReader#read(InputStream, ObjectMapper)}.
+ */
+ private static void fetchAndPersistHeader(
+ SegmentRangeReader rangeReader,
+ String targetFilename,
+ File headerFile
+ ) throws IOException
+ {
+ // read the fixed header to determine the metadata size, plus extra int
possibly containing compressed length if
+ // compression is enabled, else worst case only a few extra bytes
+ final byte[] fixedHeader = new byte[SegmentFileMetadataReader.HEADER_SIZE
+ Integer.BYTES];
+ try (InputStream headerStream = rangeReader.readRange(targetFilename, 0,
fixedHeader.length)) {
+ int offset = 0;
+ while (offset < fixedHeader.length) {
+ int read = headerStream.read(fixedHeader, offset, fixedHeader.length -
offset);
+ if (read < 0) {
+ break;
+ }
+ offset += read;
+ }
+ }
+
+ final ByteBuffer headerBuf =
ByteBuffer.wrap(fixedHeader).order(ByteOrder.LITTLE_ENDIAN);
+ final int metaLength = headerBuf.getInt(2);
+ final CompressionStrategy compressionStrategy =
CompressionStrategy.forId(headerBuf.get(1));
+
+ // compute the remaining bytes, either metaLength, or if compressed, read
the extra int we read
+ final long remainingBytes;
+ final int actualHeaderSize;
+ if (CompressionStrategy.NONE == compressionStrategy) {
+ remainingBytes = metaLength;
+ actualHeaderSize = SegmentFileMetadataReader.HEADER_SIZE;
+ } else {
+ remainingBytes = headerBuf.getInt(SegmentFileMetadataReader.HEADER_SIZE);
+ actualHeaderSize = fixedHeader.length;
+ }
+
+ // write fixed header + remaining metadata bytes to a local file
atomically (write to temp, then rename)
+ // to avoid leaving a partial file on disk if the process crashes mid-write
+ FileUtils.mkdirp(headerFile.getParentFile());
+ FileUtils.writeAtomically(headerFile, out -> {
+ out.write(fixedHeader, 0, actualHeaderSize);
+ try (InputStream remainingStream = rangeReader.readRange(
+ targetFilename,
+ actualHeaderSize,
+ remainingBytes
+ )) {
+ ByteStreams.limit(remainingStream, remainingBytes).transferTo(out);
+ }
+ return null;
+ });
+ }
+
+ /**
+ * Parse metadata from the header file. Throws on any parse failure (corrupt
JSON, truncated header, etc.).
+ */
+ private static SegmentFileMetadataReader.Result parseHeaderFile(
+ File headerFile,
+ ObjectMapper jsonMapper
+ ) throws IOException
+ {
+ try (FileInputStream fis = new FileInputStream(headerFile)) {
+ return SegmentFileMetadataReader.read(fis, jsonMapper);
+ }
+ }
+
+ /**
+ * Mmap the bitmap region of the header file as read-write. Extends the file
if the bitmap region doesn't exist yet.
+ * The channel is closed immediately after mapping.
+ */
+ private static MappedByteBuffer mmapBitmap(
+ File headerFile,
+ SegmentFileMetadataReader.Result result
+ ) throws IOException
+ {
+ final int numBitmapBytes = (result.getMetadata().getFiles().size() + 7) /
8;
+ final long expectedSize = result.getHeaderSize() + numBitmapBytes;
+ try (RandomAccessFile raf = new RandomAccessFile(headerFile, "rw");
+ FileChannel channel = raf.getChannel()) {
+ if (raf.length() < expectedSize) {
+ raf.setLength(expectedSize);
+ }
+ return channel.map(FileChannel.MapMode.READ_WRITE,
result.getHeaderSize(), numBitmapBytes);
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
index 1020c0d273c..6986a2759fe 100644
---
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
+++
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
@@ -181,16 +181,9 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
final FileChannel channel = closer.register(outputStream.getChannel());
final ByteBuffer intBuffer =
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
- outputStream.write(new byte[]{IndexIO.V10_VERSION,
metadataCompression.getId()});
- // write uncompressed metadata length
- intBuffer.putInt(metadataBytes.length);
- intBuffer.flip();
- outputStream.write(intBuffer.array());
-
- if (CompressionStrategy.NONE == metadataCompression) {
- // no compression, just write the plain metadata bytes
- outputStream.write(metadataBytes);
- } else {
+ // compress the data if specified, however we throw it out if
compression is larger than uncompressed
+ final ByteBuffer compressed;
+ if (CompressionStrategy.NONE != metadataCompression) {
// compress the data using the strategy, write the compressed length,
then the compressed blob
final CompressionStrategy.Compressor compressor =
metadataCompression.getCompressor();
final ByteBuffer inBuffer =
compressor.allocateInBuffer(metadataBytes.length, closer)
@@ -200,8 +193,25 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
final ByteBuffer outBuffer =
compressor.allocateOutBuffer(metadataBytes.length, closer)
.order(ByteOrder.nativeOrder());
- final ByteBuffer compressed = compressor.compress(inBuffer, outBuffer);
+ compressed = compressor.compress(inBuffer, outBuffer);
+ } else {
+ compressed = null;
+ }
+ final boolean shouldCompress = compressed != null && (4 +
compressed.remaining()) < metadataBytes.length;
+ outputStream.write(new byte[]{
+ IndexIO.V10_VERSION,
+ shouldCompress ? metadataCompression.getId() :
CompressionStrategy.NONE.getId()
+ });
+ // write uncompressed metadata length
+ intBuffer.putInt(metadataBytes.length);
+ intBuffer.flip();
+ outputStream.write(intBuffer.array());
+
+ if (CompressionStrategy.NONE == metadataCompression || !shouldCompress) {
+ // no compression, just write the plain metadata bytes
+ outputStream.write(metadataBytes);
+ } else {
// write compression length
intBuffer.position(0);
intBuffer.putInt(compressed.remaining());
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMapperV10.java
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMapperV10.java
index 2a35a17158f..042ab5ba084 100644
---
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMapperV10.java
+++
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMapperV10.java
@@ -25,8 +25,6 @@ import com.google.common.primitives.Ints;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
@@ -35,7 +33,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
@@ -55,7 +52,7 @@ public class SegmentFileMapperV10 implements SegmentFileMapper
/**
* Create a v10 {@link SegmentFileMapper} with 'external' attached v10
segment files
*
- * @param segmentFile v10 segment file with name {@link
IndexIO#V10_FILE_NAME}
+ * @param segmentFile v10 segment file with name {@link
org.apache.druid.segment.IndexIO#V10_FILE_NAME}
* @param mapper json mapper to deserialize metadata
* @param externals list of 'external' v10 segment files to attach to this
mapper and files that can be referenced
* using {@link #mapExternalFile(String, String)}
@@ -101,91 +98,44 @@ public class SegmentFileMapperV10 implements
SegmentFileMapper
ObjectMapper mapper
) throws IOException
{
+ final SegmentFileMetadataReader.Result metadataResult;
try (FileInputStream fis = new FileInputStream(segmentFile)) {
- // version (byte) | metadata compression (byte) | metadata length (int)
- final byte[] header = new byte[1 + 1 + Integer.BYTES];
- int read = fis.read(header);
- if (read < header.length) {
- throw DruidException.defensive("expected at least [%s] bytes, but only
read [%s]", header.length, read);
- }
- final ByteBuffer headerBuffer =
ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN);
-
- if (headerBuffer.get(0) != IndexIO.V10_VERSION) {
- throw DruidException.defensive("not v10, got[%s] instead",
headerBuffer.get(0));
- }
-
- final byte compression = headerBuffer.get(1);
- final CompressionStrategy compressionStrategy =
CompressionStrategy.forId(compression);
-
- final int metaLength = headerBuffer.getInt(2);
- final byte[] meta = new byte[metaLength];
-
- final int startOffset;
- if (CompressionStrategy.NONE == compressionStrategy) {
- startOffset = header.length + meta.length;
- read = fis.read(meta);
- if (read < meta.length) {
- throw DruidException.defensive("read[%s] which is less than expected
metadata length[%s]", read, metaLength);
- }
- } else {
- final byte[] compressedLengthBytes = new byte[Integer.BYTES];
- read = fis.read(compressedLengthBytes);
- if (read != Integer.BYTES) {
- throw DruidException.defensive("read[%s] which is less than expected
[%s]", read, Integer.BYTES);
- }
- final ByteBuffer compressedLengthBuffer =
ByteBuffer.wrap(compressedLengthBytes).order(ByteOrder.LITTLE_ENDIAN);
- final int compressedLength = compressedLengthBuffer.getInt(0);
- startOffset = header.length + Integer.BYTES + compressedLength;
-
- final byte[] compressed = new byte[compressedLength];
- read = fis.read(compressed);
- if (read < compressed.length) {
- throw DruidException.defensive(
- "read[%s] which is less than expected compressed metadata
length[%s]",
- read,
- compressedLength
- );
- }
-
- final ByteBuffer inBuffer =
ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN);
- final ByteBuffer outBuffer =
ByteBuffer.wrap(meta).order(ByteOrder.LITTLE_ENDIAN);
- final CompressionStrategy.Decompressor decompressor =
compressionStrategy.getDecompressor();
- decompressor.decompress(inBuffer, compressedLength, outBuffer);
- }
-
- final SegmentFileMetadata metadata = mapper.readValue(meta,
SegmentFileMetadata.class);
- final List<MappedByteBuffer> containers =
Lists.newArrayListWithCapacity(metadata.getContainers().size());
+ metadataResult = SegmentFileMetadataReader.read(fis, mapper);
+ }
- // eagerly map all container buffers so we can ensure they all share the
same file descriptor without needing to
- // maintain an open channel (which could be closed during an interrupt
for example)
- try (RandomAccessFile f = new RandomAccessFile(segmentFile, "r");
- FileChannel channel = f.getChannel()) {
- for (SegmentFileContainerMetadata containerMetadata :
metadata.getContainers()) {
- containers.add(
- channel.map(
- FileChannel.MapMode.READ_ONLY,
- startOffset + containerMetadata.getStartOffset(),
- containerMetadata.getSize()
- )
- );
- }
+ final SegmentFileMetadata metadata = metadataResult.getMetadata();
+ final long startOffset = metadataResult.getHeaderSize();
+ final List<MappedByteBuffer> containers =
Lists.newArrayListWithCapacity(metadata.getContainers().size());
+
+ // eagerly map all container buffers so we can ensure they all share the
same file descriptor without needing to
+ // maintain an open channel (which could be closed during an interrupt for
example)
+ try (RandomAccessFile f = new RandomAccessFile(segmentFile, "r");
+ FileChannel channel = f.getChannel()) {
+ for (SegmentFileContainerMetadata containerMetadata :
metadata.getContainers()) {
+ containers.add(
+ channel.map(
+ FileChannel.MapMode.READ_ONLY,
+ startOffset + containerMetadata.getStartOffset(),
+ containerMetadata.getSize()
+ )
+ );
}
- catch (IOException e) {
- Closer closer = Closer.create();
- for (MappedByteBuffer buffer : containers) {
- closer.register(() -> ByteBufferUtils.unmap(buffer));
- }
- CloseableUtils.closeAndWrapExceptions(closer);
- throw DruidException.defensive(e, "Problem mapping segment file[%s]",
segmentFile.getAbsolutePath());
+ }
+ catch (IOException e) {
+ Closer closer = Closer.create();
+ for (MappedByteBuffer buffer : containers) {
+ closer.register(() -> ByteBufferUtils.unmap(buffer));
}
-
- return new SegmentFileMapperV10(
- segmentFile,
- metadata,
- List.copyOf(containers),
- Map.of()
- );
+ CloseableUtils.closeAndWrapExceptions(closer);
+ throw DruidException.defensive(e, "Problem mapping segment file[%s]",
segmentFile.getAbsolutePath());
}
+
+ return new SegmentFileMapperV10(
+ segmentFile,
+ metadata,
+ List.copyOf(containers),
+ Map.of()
+ );
}
private final File segmentFile;
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadata.java
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadata.java
index d1e3b91197a..18635220a22 100644
---
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadata.java
+++
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadata.java
@@ -22,11 +22,13 @@ package org.apache.druid.segment.file;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.projections.ProjectionMetadata;
import javax.annotation.Nullable;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,13 +76,26 @@ public class SegmentFileMetadata
)
{
this.containers = containers;
- this.files = files;
+ this.files = internKeys(files);
this.interval = interval;
- this.columnDescriptors = columnDescriptors;
+ this.columnDescriptors = internKeys(columnDescriptors);
this.projections = projections;
this.bitmapEncoding = bitmapEncoding;
}
+ @Nullable
+ private static <V> Map<String, V> internKeys(@Nullable Map<String, V> map)
+ {
+ if (map == null) {
+ return null;
+ }
+ final Map<String, V> interned = new HashMap<>();
+ for (Map.Entry<String, V> entry : map.entrySet()) {
+ interned.put(SmooshedFileMapper.STRING_INTERNER.intern(entry.getKey()),
entry.getValue());
+ }
+ return interned;
+ }
+
@JsonProperty
public List<SegmentFileContainerMetadata> getContainers()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadataReader.java
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadataReader.java
new file mode 100644
index 00000000000..a31d83876a9
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadataReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * Shared utility for reading the V10 segment file header and metadata. Both
{@link SegmentFileMapperV10} (eager,
+ * from a local file) and {@link PartialSegmentFileMapperV10} (lazy, from a
range-read stream) need to parse the same
+ * header format.
+ * <p>
+ * V10 file format header:
+ * {@code | version (byte) | meta compression (byte) | meta length (int) |
[compressed length (int)] | meta json |}
+ * <p>
+ * When metadata is uncompressed, {@code meta length} is the size of the JSON
bytes that follow directly.
+ * When compressed, an additional 4-byte {@code compressed length} precedes
the compressed bytes, and
+ * {@code meta length} is the uncompressed size.
+ */
+public class SegmentFileMetadataReader
+{
+ /**
+ * Size of the fixed portion of the V10 header: version (1) + compression
(1) + meta length (4)
+ */
+ public static final int HEADER_SIZE = 1 + 1 + Integer.BYTES;
+
+ /**
+ * Result of reading the V10 header and metadata from a stream.
+ */
+ public static class Result
+ {
+ private final SegmentFileMetadata metadata;
+ private final long headerSize;
+
+ public Result(SegmentFileMetadata metadata, long headerSize)
+ {
+ this.metadata = metadata;
+ this.headerSize = headerSize;
+ }
+
+ /**
+ * The parsed segment file metadata.
+ */
+ public SegmentFileMetadata getMetadata()
+ {
+ return metadata;
+ }
+
+ /**
+ * The total size of the header in bytes (everything before the first
container). This is needed to compute
+ * absolute byte offsets of internal files within the segment file:
+ * {@code headerSize + container.startOffset + file.startOffset}
+ */
+ public long getHeaderSize()
+ {
+ return headerSize;
+ }
+ }
+
+ /**
+ * Read the V10 metadata from an {@link InputStream}. The stream must be
positioned at the start of the V10 file
+ * (i.e. at the version byte). After this method returns, the stream will
have been read through the end of the
+ * metadata section.
+ *
+ * @param in input stream positioned at the start of the V10 file
+ * @param mapper object mapper for deserializing {@link SegmentFileMetadata}
+ * @return the parsed metadata and the total header size in bytes
+ */
+ public static Result read(InputStream in, ObjectMapper mapper) throws
IOException
+ {
+ final byte[] header = new byte[HEADER_SIZE];
+ int read = readFully(in, header);
+ if (read < header.length) {
+ throw DruidException.defensive("expected at least [%s] bytes, but only
read [%s]", header.length, read);
+ }
+ final ByteBuffer headerBuffer =
ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN);
+
+ if (headerBuffer.get(0) != IndexIO.V10_VERSION) {
+ throw DruidException.defensive("not v10, got[%s] instead",
headerBuffer.get(0));
+ }
+
+ final byte compression = headerBuffer.get(1);
+ final CompressionStrategy compressionStrategy =
CompressionStrategy.forId(compression);
+
+ final int metaLength = headerBuffer.getInt(2);
+ final byte[] meta = new byte[metaLength];
+
+ final long headerSize;
+ if (CompressionStrategy.NONE == compressionStrategy) {
+ headerSize = HEADER_SIZE + metaLength;
+ read = readFully(in, meta);
+ if (read < meta.length) {
+ throw DruidException.defensive("read[%s] which is less than expected
metadata length[%s]", read, metaLength);
+ }
+ } else {
+ final byte[] compressedLengthBytes = new byte[Integer.BYTES];
+ read = readFully(in, compressedLengthBytes);
+ if (read != Integer.BYTES) {
+ throw DruidException.defensive("read[%s] which is less than expected
[%s]", read, Integer.BYTES);
+ }
+ final ByteBuffer compressedLengthBuffer =
ByteBuffer.wrap(compressedLengthBytes).order(ByteOrder.LITTLE_ENDIAN);
+ final int compressedLength = compressedLengthBuffer.getInt(0);
+ headerSize = HEADER_SIZE + Integer.BYTES + compressedLength;
+
+ final byte[] compressed = new byte[compressedLength];
+ read = readFully(in, compressed);
+ if (read < compressed.length) {
+ throw DruidException.defensive(
+ "read[%s] which is less than expected compressed metadata
length[%s]",
+ read,
+ compressedLength
+ );
+ }
+
+ final ByteBuffer inBuffer =
ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN);
+ final ByteBuffer outBuffer =
ByteBuffer.wrap(meta).order(ByteOrder.LITTLE_ENDIAN);
+ final CompressionStrategy.Decompressor decompressor =
compressionStrategy.getDecompressor();
+ decompressor.decompress(inBuffer, compressedLength, outBuffer);
+ }
+
+ final SegmentFileMetadata metadata = mapper.readValue(meta,
SegmentFileMetadata.class);
+ return new Result(metadata, headerSize);
+ }
+
+ /**
+ * Read bytes from the stream, retrying until the buffer is full or the
stream is exhausted.
+ * {@link InputStream#read(byte[])} is not guaranteed to fill the buffer in
a single call.
+ */
+ private static int readFully(InputStream in, byte[] buf) throws IOException
+ {
+ int offset = 0;
+ while (offset < buf.length) {
+ int read = in.read(buf, offset, buf.length - offset);
+ if (read < 0) {
+ break;
+ }
+ offset += read;
+ }
+ return offset;
+ }
+
+ private SegmentFileMetadataReader()
+ {
+ // utility class
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/loading/LoadSpec.java
b/processing/src/main/java/org/apache/druid/segment/loading/LoadSpec.java
index bc60ad91f13..a5c1b7284d3 100644
--- a/processing/src/main/java/org/apache/druid/segment/loading/LoadSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/loading/LoadSpec.java
@@ -22,7 +22,9 @@ package org.apache.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.guice.annotations.ExtensionPoint;
+import javax.annotation.Nullable;
import java.io.File;
+import java.io.IOException;
/**
* A means of pulling segment files into a destination directory
@@ -38,6 +40,24 @@ public interface LoadSpec
*/
LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException;
+ /**
+ * Open a {@link SegmentRangeReader} for reading byte ranges from the
segment file in deep storage. This enables
+ * partial segment downloads where only the needed portions of a segment
file are fetched.
+ * <p>
+ * Returns {@code null} if range reads are not supported by this storage
backend or for this particular segment
+ * (e.g. if the segment is stored in a compressed archive format that does
not support random access). When
+ * {@code null} is returned, callers should fall back to downloading the
full segment via {@link #loadSegment(File)}.
+ * <p>
+ * The caller is responsible for closing any streams opened by the returned
reader when done.
+ *
+ * @return a {@link SegmentRangeReader} for this segment, or {@code null} if
range reads are not supported
+ */
+ @Nullable
+ default SegmentRangeReader openRangeReader() throws IOException
+ {
+ return null;
+ }
+
// Hold interesting data about the results of the segment load
class LoadSpecResult
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/loading/SegmentRangeReader.java
b/processing/src/main/java/org/apache/druid/segment/loading/SegmentRangeReader.java
new file mode 100644
index 00000000000..f6bf422c56e
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/loading/SegmentRangeReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Provides byte-range read access to segment files in deep storage. This is
the extension point for supporting
+ * partial segment downloads from different storage backends (S3, local, GCS,
HDFS, etc.).
+ * <p>
+ * The {@code filename} parameter identifies which file to read within the
segment's storage location. For the main
+ * segment file this is {@link
org.apache.druid.segment.IndexIO#V10_FILE_NAME}, and for external segment files
it is
+ * the external file's name.
+ * <p>
+ * Implementations must be safe to use from multiple threads, as concurrent
queries may trigger range reads for
+ * different parts of the same segment file simultaneously.
+ *
+ * @see LoadSpec#openRangeReader()
+ */
+public interface SegmentRangeReader
+{
+ /**
+ * Read a contiguous byte range from a file in the segment's storage
location.
+ *
+ * @param filename the name of the file to read, relative to the segment's
storage location
+ * @param offset absolute byte offset in the file
+ * @param length maximum number of bytes to read. The returned stream may
contain fewer bytes if {@code offset +
+ * length} exceeds the file size.
+ * @return an {@link InputStream} containing up to {@code length} bytes
starting at {@code offset}. The caller is
+ * responsible for closing this stream.
+ * @throws IOException if the range read fails
+ */
+ InputStream readRange(String filename, long offset, long length) throws
IOException;
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10Test.java
b/processing/src/test/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10Test.java
new file mode 100644
index 00000000000..790ba10ece7
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10Test.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Files;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class PartialSegmentFileMapperV10Test
+{
+ private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
+
+ @TempDir
+ File tempDir;
+
+ @Test
+ void testMapFileDownloadsOnDemand() throws IOException
+ {
+ final File segmentFile = buildTestSegment(20, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("demand");
+
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentFile.getParentFile());
+
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ // reset count after create fetched metadata
+ rangeReader.resetCount();
+
+ // access a single file - should trigger exactly one download
+ ByteBuffer buf = mapper.mapFile("5");
+ Assertions.assertNotNull(buf);
+ Assertions.assertEquals(0, buf.position());
+ Assertions.assertEquals(4, buf.remaining());
+ Assertions.assertEquals(5, buf.getInt());
+ Assertions.assertEquals(1, rangeReader.getReadCount());
+ Assertions.assertEquals(4, mapper.getDownloadedBytes());
+
+ // access the same file again - should NOT trigger another download
+ ByteBuffer buf2 = mapper.mapFile("5");
+ Assertions.assertNotNull(buf2);
+ Assertions.assertEquals(5, buf2.getInt());
+ Assertions.assertEquals(1, rangeReader.getReadCount());
+ }
+ }
+
+ @Test
+ void testMapFileCompressedMetadata() throws IOException
+ {
+ final File segmentFile = buildTestSegment(20, CompressionStrategy.ZSTD);
+ final File cacheDir = newCacheDir("compressed");
+
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentFile.getParentFile());
+
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ for (int i = 0; i < 20; ++i) {
+ ByteBuffer buf = mapper.mapFile(String.valueOf(i));
+ Assertions.assertNotNull(buf);
+ Assertions.assertEquals(i, buf.getInt());
+ }
+ }
+ }
+
+ @Test
+ void testMapFileReturnsNullForMissingFile() throws IOException
+ {
+ final File segmentFile = buildTestSegment(5, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("missing");
+
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(segmentFile.getParentFile());
+
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ Assertions.assertNull(mapper.mapFile("nonexistent"));
+ }
+ }
+
+ @Test
+ void testGetInternalFilenames() throws IOException
+ {
+ final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("filenames");
+
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(segmentFile.getParentFile());
+
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ Set<String> expected = new HashSet<>();
+ for (int i = 0; i < 10; ++i) {
+ expected.add(String.valueOf(i));
+ }
+ Assertions.assertEquals(expected, mapper.getInternalFilenames());
+ }
+ }
+
+ @Test
+ void testEnsureFilesAvailable() throws IOException
+ {
+ final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("ensure");
+
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentFile.getParentFile());
+
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ rangeReader.resetCount();
+
+ Set<String> filesToLoad = Set.of("2", "5", "7");
+ mapper.ensureFilesAvailable(filesToLoad);
+
+ // should have downloaded exactly 3 files
+ Assertions.assertEquals(3, rangeReader.getReadCount());
+ Assertions.assertEquals(12, mapper.getDownloadedBytes());
+
+ // accessing these files should not trigger additional downloads
+ for (String name : filesToLoad) {
+ ByteBuffer buf = mapper.mapFile(name);
+ Assertions.assertNotNull(buf);
+ Assertions.assertEquals(Integer.parseInt(name), buf.getInt());
+ }
+ Assertions.assertEquals(3, rangeReader.getReadCount());
+ }
+ }
+
+ @Test
+ void testSparseContainerFiles() throws IOException
+ {
+ final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("sparse");
+
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(segmentFile.getParentFile());
+
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ // download just 2 files
+ mapper.mapFile("3");
+ mapper.mapFile("7");
+
+ // verify local container files exist (all files share one container in
this test)
+ File containerFile = new File(cacheDir, IndexIO.V10_FILE_NAME +
".container.00000");
+ Assertions.assertTrue(containerFile.exists());
+
+ // the container should be at the full original container size (sparse
on supported filesystems)
+ try (FileInputStream fis = new FileInputStream(segmentFile)) {
+ SegmentFileMetadataReader.Result metadataResult =
SegmentFileMetadataReader.read(fis, JSON_MAPPER);
+ SegmentFileContainerMetadata containerMeta =
metadataResult.getMetadata().getContainers().get(0);
+ Assertions.assertEquals(containerMeta.getSize(),
containerFile.length());
+ }
+ }
+ }
+
+ @Test
+ void testConcurrentMapFile() throws Exception
+ {
+ final File segmentFile = buildTestSegment(20, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("concurrent");
+
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentFile.getParentFile());
+
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ rangeReader.resetCount();
+
+ final int numThreads = 8;
+ final ExecutorService exec = Execs.multiThreaded(numThreads,
"lazy-test-%d");
+ try {
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final List<Future<Void>> futures = new ArrayList<>();
+
+ // all threads try to access all 20 files concurrently
+ for (int t = 0; t < numThreads; t++) {
+ futures.add(exec.submit(() -> {
+ startLatch.await();
+ for (int i = 0; i < 20; ++i) {
+ ByteBuffer buf = mapper.mapFile(String.valueOf(i));
+ Assertions.assertNotNull(buf);
+ Assertions.assertEquals(4, buf.remaining());
+ Assertions.assertEquals(i, buf.getInt());
+ }
+ return null;
+ }));
+ }
+
+ startLatch.countDown();
+
+ for (Future<Void> f : futures) {
+ f.get();
+ }
+
+ // each file should have been downloaded exactly once despite
concurrent access
+ Assertions.assertEquals(20, rangeReader.getReadCount());
+ Assertions.assertEquals(80, mapper.getDownloadedBytes());
+ }
+ finally {
+ exec.shutdownNow();
+ }
+ }
+ }
+
+ @Test
+ void testProjectionStyleFileNames() throws IOException
+ {
+ // test with names like "projectionName/columnName" which is how V10
projections name their files
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ for (int i = 0; i < 5; ++i) {
+ File tmpFile = new File(tempDir, StringUtils.format("col-%s.bin", i));
+ Files.write(Ints.toByteArray(i * 100), tmpFile);
+ builder.add(StringUtils.format("myProjection/col_%d", i), tmpFile);
+ }
+ }
+
+ final File cacheDir = newCacheDir("proj_names");
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(baseDir);
+
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ for (int i = 0; i < 5; ++i) {
+ ByteBuffer buf =
mapper.mapFile(StringUtils.format("myProjection/col_%d", i));
+ Assertions.assertNotNull(buf);
+ Assertions.assertEquals(i * 100, buf.getInt());
+ }
+ }
+ }
+
+ @Test
+ void testMatchesEagerMapper() throws IOException
+ {
+ // verify that the lazy mapper produces identical ByteBuffer contents as
the eager mapper
+ final File segmentFile = buildTestSegment(20, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("match_eager");
+
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(segmentFile.getParentFile());
+ try (SegmentFileMapperV10 eager = SegmentFileMapperV10.create(segmentFile,
JSON_MAPPER);
+ PartialSegmentFileMapperV10 lazy = createMapper(rangeReader, cacheDir)
+ ) {
+ Assertions.assertEquals(eager.getInternalFilenames(),
lazy.getInternalFilenames());
+ for (String name : eager.getInternalFilenames()) {
+ ByteBuffer eagerBuf = eager.mapFile(name);
+ ByteBuffer lazyBuf = lazy.mapFile(name);
+ Assertions.assertNotNull(eagerBuf);
+ Assertions.assertNotNull(lazyBuf);
+ Assertions.assertEquals(eagerBuf, lazyBuf);
+ }
+ }
+
+ }
+
+ @Test
+ void testExternalFiles() throws IOException
+ {
+ final String externalName = "external.segment";
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ for (int i = 0; i < 10; ++i) {
+ File tmpFile = new File(tempDir, StringUtils.format("main-%s.bin", i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ builder.add(StringUtils.format("%d", i), tmpFile);
+ }
+ SegmentFileBuilder external = builder.getExternalBuilder(externalName);
+ for (int i = 10; i < 20; ++i) {
+ File tmpFile = new File(tempDir, StringUtils.format("ext-%s.bin", i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ external.add(StringUtils.format("%d", i), tmpFile);
+ }
+ }
+
+ final File cacheDir = newCacheDir("ext");
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(baseDir);
+
+ try (PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ JSON_MAPPER,
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ List.of(externalName)
+ )) {
+ // verify main file internal files
+ for (int i = 0; i < 10; ++i) {
+ ByteBuffer buf = mapper.mapFile(String.valueOf(i));
+ Assertions.assertNotNull(buf);
+ Assertions.assertEquals(i, buf.getInt());
+ }
+
+ // verify external file internal files via mapExternalFile
+ for (int i = 10; i < 20; ++i) {
+ ByteBuffer buf = mapper.mapExternalFile(externalName,
String.valueOf(i));
+ Assertions.assertNotNull(buf);
+ Assertions.assertEquals(i, buf.getInt());
+ }
+ }
+ }
+
+ @Test
+ void testExternalFilesMatchEagerMapper() throws IOException
+ {
+ final String externalName = "external.segment";
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ for (int i = 0; i < 5; ++i) {
+ File tmpFile = new File(tempDir, StringUtils.format("main-%s.bin", i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ builder.add(StringUtils.format("%d", i), tmpFile);
+ }
+ SegmentFileBuilder external = builder.getExternalBuilder(externalName);
+ for (int i = 5; i < 10; ++i) {
+ File tmpFile = new File(tempDir, StringUtils.format("ext-%s.bin", i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ external.add(StringUtils.format("%d", i), tmpFile);
+ }
+ }
+
+ final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+ final File cacheDir = newCacheDir("ext_match");
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(baseDir);
+
+ try (SegmentFileMapperV10 eager = SegmentFileMapperV10.create(segmentFile,
JSON_MAPPER, List.of(externalName));
+ PartialSegmentFileMapperV10 lazy = PartialSegmentFileMapperV10.create(
+ rangeReader,
+ JSON_MAPPER,
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ List.of(externalName)
+ )
+ ) {
+ // verify main files match
+ for (int i = 0; i < 5; ++i) {
+ ByteBuffer eagerBuf = eager.mapFile(String.valueOf(i));
+ ByteBuffer lazyBuf = lazy.mapFile(String.valueOf(i));
+ Assertions.assertNotNull(eagerBuf);
+ Assertions.assertNotNull(lazyBuf);
+ Assertions.assertEquals(eagerBuf, lazyBuf);
+ }
+
+ // verify external files match
+ for (int i = 5; i < 10; ++i) {
+ ByteBuffer eagerBuf = eager.mapExternalFile(externalName,
String.valueOf(i));
+ ByteBuffer lazyBuf = lazy.mapExternalFile(externalName,
String.valueOf(i));
+ Assertions.assertNotNull(eagerBuf);
+ Assertions.assertNotNull(lazyBuf);
+ Assertions.assertEquals(eagerBuf, lazyBuf);
+ }
+ }
+ }
+
+ @Test
+ void testCreatePersistsAndRestores() throws IOException
+ {
+ final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("persist");
+
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(segmentFile.getParentFile());
+
+ // fetches from range reader and persists header + bitmap
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ mapper.mapFile("2");
+ mapper.mapFile("5");
+ mapper.mapFile("8");
+ Assertions.assertEquals(12, mapper.getDownloadedBytes());
+ }
+
+ // verify persisted header file exists (contains metadata + bitmap)
+ Assertions.assertTrue(
+ new File(cacheDir, IndexIO.V10_FILE_NAME +
PartialSegmentFileMapperV10.METADATA_HEADER_SUFFIX).exists()
+ );
+
+ // reads metadata from local header file, restores downloaded files from
bitmap
+ final DirectoryBackedRangeReader freshReader = new
DirectoryBackedRangeReader(segmentFile.getParentFile());
+ try (PartialSegmentFileMapperV10 restored = createMapper(freshReader,
cacheDir)) {
+ Assertions.assertEquals(12, restored.getDownloadedBytes());
+
+ // previously downloaded files should be available
+ ByteBuffer buf2 = restored.mapFile("2");
+ Assertions.assertNotNull(buf2);
+ Assertions.assertEquals(2, buf2.getInt());
+
+ ByteBuffer buf5 = restored.mapFile("5");
+ Assertions.assertNotNull(buf5);
+ Assertions.assertEquals(5, buf5.getInt());
+
+ ByteBuffer buf8 = restored.mapFile("8");
+ Assertions.assertNotNull(buf8);
+ Assertions.assertEquals(8, buf8.getInt());
+
+ // downloading a new file should still work after restore
+ ByteBuffer buf0 = restored.mapFile("0");
+ Assertions.assertNotNull(buf0);
+ Assertions.assertEquals(0, buf0.getInt());
+ Assertions.assertEquals(16, restored.getDownloadedBytes());
+ }
+ }
+
+ @Test
+ void testCreateWithExternals() throws IOException
+ {
+ final String externalName = "external.segment";
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ for (int i = 0; i < 5; ++i) {
+ File tmpFile = new File(tempDir, StringUtils.format("main-%s.bin", i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ builder.add(StringUtils.format("%d", i), tmpFile);
+ }
+ SegmentFileBuilder external = builder.getExternalBuilder(externalName);
+ for (int i = 5; i < 10; ++i) {
+ File tmpFile = new File(tempDir, StringUtils.format("ext-%s.bin", i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ external.add(StringUtils.format("%d", i), tmpFile);
+ }
+ }
+
+ final File cacheDir = newCacheDir("ext_persist");
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(baseDir);
+
+ // create with externals, download some files
+ try (PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ JSON_MAPPER,
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ List.of(externalName)
+ )) {
+ mapper.mapFile("1");
+ mapper.mapExternalFile(externalName, "7");
+ }
+
+ // restore, previously downloaded files should be available
+ final DirectoryBackedRangeReader freshReader = new
DirectoryBackedRangeReader(baseDir);
+ try (PartialSegmentFileMapperV10 restored =
PartialSegmentFileMapperV10.create(
+ freshReader,
+ JSON_MAPPER,
+ cacheDir,
+ IndexIO.V10_FILE_NAME,
+ List.of(externalName)
+ )) {
+ ByteBuffer buf1 = restored.mapFile("1");
+ Assertions.assertNotNull(buf1);
+ Assertions.assertEquals(1, buf1.getInt());
+
+ ByteBuffer buf7 = restored.mapExternalFile(externalName, "7");
+ Assertions.assertNotNull(buf7);
+ Assertions.assertEquals(7, buf7.getInt());
+ }
+ }
+
+ @Test
+ void testCorruptHeaderFileRecovery() throws IOException
+ {
+ final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE);
+ final File cacheDir = newCacheDir("corrupt");
+
+ final DirectoryBackedRangeReader rangeReader = new
DirectoryBackedRangeReader(segmentFile.getParentFile());
+
+ // populate normally
+ try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader,
cacheDir)) {
+ mapper.mapFile("3");
+ }
+
+ // corrupt the header file
+ final File headerFile = new File(cacheDir, IndexIO.V10_FILE_NAME +
PartialSegmentFileMapperV10.METADATA_HEADER_SUFFIX);
+ Assertions.assertTrue(headerFile.exists());
+ try (RandomAccessFile raf = new RandomAccessFile(headerFile, "rw")) {
+ raf.setLength(2); // truncate to something unparseable
+ }
+
+ // should detect corruption, re-fetch from deep storage, and work normally
+ // (previously downloaded files are lost since bitmap was corrupted too,
but new downloads work)
+ try (PartialSegmentFileMapperV10 recovered = createMapper(rangeReader,
cacheDir)) {
+ Assertions.assertEquals(0, recovered.getDownloadedBytes());
+
+ ByteBuffer buf = recovered.mapFile("3");
+ Assertions.assertNotNull(buf);
+ Assertions.assertEquals(3, buf.getInt());
+ }
+ }
+
+ private File newCacheDir(String name) throws IOException
+ {
+ final File dir = new File(tempDir, name + "_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(dir);
+ return dir;
+ }
+
+ private File buildTestSegment(int numFiles, CompressionStrategy compression)
throws IOException
+ {
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir, compression)) {
+ for (int i = 0; i < numFiles; ++i) {
+ File tmpFile = new File(tempDir, StringUtils.format("smoosh-%s.bin",
i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ builder.add(StringUtils.format("%d", i), tmpFile);
+ }
+ }
+
+ return new File(baseDir, IndexIO.V10_FILE_NAME);
+ }
+
+ private static PartialSegmentFileMapperV10 createMapper(
+ SegmentRangeReader rangeReader,
+ File localCacheDir
+ ) throws IOException
+ {
+ return PartialSegmentFileMapperV10.createForFile(
+ rangeReader,
+ JSON_MAPPER,
+ localCacheDir,
+ IndexIO.V10_FILE_NAME
+ );
+ }
+
+ /**
+ * A {@link SegmentRangeReader} backed by a directory of files, supporting
both main and external file reads.
+ */
+ static class DirectoryBackedRangeReader implements SegmentRangeReader
+ {
+ private final File directory;
+
+ DirectoryBackedRangeReader(File directory)
+ {
+ this.directory = directory;
+ }
+
+ @Override
+ public InputStream readRange(String filename, long offset, long length)
throws IOException
+ {
+ File target = new File(directory, filename);
+ try (RandomAccessFile raf = new RandomAccessFile(target, "r")) {
+ final int available = (int) Math.min(length, Math.max(0, raf.length()
- offset));
+ byte[] data = new byte[available];
+ raf.seek(offset);
+ raf.readFully(data);
+ return new ByteArrayInputStream(data);
+ }
+ }
+ }
+
+ /**
+ * A {@link DirectoryBackedRangeReader} that counts range reads (excluding
metadata fetches).
+ */
+ static class CountingRangeReader extends DirectoryBackedRangeReader
+ {
+ private final AtomicInteger readCount = new AtomicInteger(0);
+
+ CountingRangeReader(File directory)
+ {
+ super(directory);
+ }
+
+ int getReadCount()
+ {
+ return readCount.get();
+ }
+
+ void resetCount()
+ {
+ readCount.set(0);
+ }
+
+ @Override
+ public InputStream readRange(String filename, long offset, long length)
throws IOException
+ {
+ readCount.incrementAndGet();
+ return super.readRange(filename, offset, length);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]