gianm commented on code in PR #19282:
URL: https://github.com/apache/druid/pull/19282#discussion_r3082840906
##########
processing/src/main/java/org/apache/druid/segment/loading/LoadSpec.java:
##########
@@ -38,6 +40,24 @@ public interface LoadSpec
*/
LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException;
+ /**
+ * Open a {@link SegmentRangeReader} for reading byte ranges from the
segment file in deep storage. This enables
+ * partial segment downloads where only the needed portions of a segment
file are fetched.
+ * <p>
+ * Returns {@code null} if range reads are not supported by this storage
backend or for this particular segment
+ * (e.g. if the segment is stored in a compressed archive format that does
not support random access). When
+ * {@code null} is returned, callers should fall back to downloading the
full segment via {@link #loadSegment(File)}.
+ * <p>
+ * The caller is responsible for closing the returned reader when done.
Review Comment:
`SegmentRangeReader` isn't closable, how is the caller supposed to close the
reader?
##########
processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java:
##########
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A {@link SegmentFileMapper} that downloads internal files on demand from
deep storage via a
+ * {@link SegmentRangeReader}. This enables partial segment downloads where
only the files needed for a query are
+ * fetched, rather than downloading the entire segment.
+ * <p>
+ * Locally, this mapper mirrors the original V10 container structure: each
container from the segment file is
+ * represented as a local sparse file at its original size, and only the byte
ranges for downloaded internal files are
+ * populated. This means the number of local files (and mmaps) equals the
number of containers (typically 1-3 at
+ * 'standard' segment sizes) rather than the number of internal files (which
could be hundreds).
+ * <p>
+ * The {@link MappedByteBuffer} for each container is created once and then
the channel is closed immediately (same
+ * pattern as {@link SegmentFileMapperV10}). Writes of downloaded file data
use short-lived {@link RandomAccessFile}
+ * instances. The mmap reflects writes through the shared page cache.
+ * <p>
+ * State is persisted to disk so that the mapper can be restored after a
process restart without re-fetching metadata
+ * from deep storage. The raw V10 header bytes are written to a local file,
and a compact bitmap file is appended to
+ * the end of it to track which internal files have been downloaded (one bit
per file, updated after each download). On
+ * subsequent calls, the metadata is parsed from the local file instead of
range-reading from deep storage.
+ * <p>
+ * External segment files are supported via child {@link
PartialSegmentFileMapperV10} instances, each targeting a
+ * different file in the segment's storage location.
+ * <p>
+ * Thread-safe for concurrent access from multiple queries. Per-file locks
prevent duplicate downloads of the same
+ * internal file.
+ *
+ * @see SegmentFileMapperV10
+ * @see SegmentRangeReader
+ */
+public class PartialSegmentFileMapperV10 implements SegmentFileMapper
+{
+ static final String METADATA_HEADER_SUFFIX = ".header";
+
+ /**
+ * Create (or restore) a lazy mapper for the main segment file with attached
external file mappers. If persisted state
+ * exists locally from a previous session, metadata is read from disk.
Otherwise, metadata is fetched from deep
+ * storage via range reads and persisted locally.
+ */
+ public static PartialSegmentFileMapperV10 create(
+ SegmentRangeReader rangeReader,
+ ObjectMapper jsonMapper,
+ File localCacheDir,
+ String targetFilename,
+ List<String> externals
+ ) throws IOException
+ {
+ final PartialSegmentFileMapperV10 entryPoint = createForFile(
+ rangeReader,
+ jsonMapper,
+ localCacheDir,
+ targetFilename
+ );
+
+ final Map<String, PartialSegmentFileMapperV10> externalMappers = new
HashMap<>();
+ try {
+ for (String filename : externals) {
+ externalMappers.put(
+ filename,
+ createForFile(rangeReader, jsonMapper, localCacheDir, filename)
+ );
+ }
+ }
+ catch (Throwable t) {
+ Closer closer = Closer.create();
+ closer.register(entryPoint);
+ closer.registerAll(externalMappers.values());
+ throw CloseableUtils.closeAndWrapInCatch(t, closer);
+ }
+
+ entryPoint.externalMappers.putAll(externalMappers);
+ return entryPoint;
+ }
+
+ @VisibleForTesting
+ static PartialSegmentFileMapperV10 createForFile(
+ SegmentRangeReader rangeReader,
+ ObjectMapper jsonMapper,
+ File localCacheDir,
+ String targetFilename
+ ) throws IOException
+ {
+ FileUtils.mkdirp(localCacheDir);
+ final File headerFile = new File(localCacheDir, targetFilename +
METADATA_HEADER_SUFFIX);
+
+ // try to load from existing local file, re-fetching from deep storage if
missing or corrupted
+ SegmentFileMetadataReader.Result result = null;
+ MappedByteBuffer bitmapBuffer = null;
+
+ if (headerFile.exists()) {
+ try {
+ result = parseHeaderFile(headerFile, jsonMapper);
+ bitmapBuffer = mmapBitmap(headerFile, result);
+ }
+ catch (Exception e) {
+ // corrupted file (partial write, truncated bitmap, bad JSON, etc.) —
delete and re-fetch
+ result = null;
+ headerFile.delete();
+ }
+ }
+
+ if (result == null) {
+ fetchAndPersistHeader(rangeReader, targetFilename, headerFile);
+ result = parseHeaderFile(headerFile, jsonMapper);
+ bitmapBuffer = mmapBitmap(headerFile, result);
+ }
+
+ final PartialSegmentFileMapperV10 mapper = new PartialSegmentFileMapperV10(
+ result.getMetadata(),
+ result.getHeaderSize(),
+ rangeReader,
+ targetFilename,
+ localCacheDir,
+ bitmapBuffer
+ );
+
+ // restore downloaded files from the bitmap
+ for (int i = 0; i < mapper.sortedFileNames.size(); i++) {
+ final int byteIndex = i / 8;
+ final int bitIndex = i % 8;
+ if ((bitmapBuffer.get(byteIndex) & (1 << bitIndex)) != 0) {
+ final String name = mapper.sortedFileNames.get(i);
+ final SegmentInternalFileMetadata fileMetadata =
result.getMetadata().getFiles().get(name);
+ if (fileMetadata != null) {
+ mapper.ensureContainerInitialized(fileMetadata.getContainer());
+ mapper.downloadedFiles.add(name);
+ mapper.downloadedBytes.addAndGet(fileMetadata.getSize());
+ }
+ }
+ }
+
+ return mapper;
+ }
+
+ private final SegmentFileMetadata metadata;
+ private final long headerSize;
+ private final SegmentRangeReader rangeReader;
+ private final String targetFilename;
+ private final File localCacheDir;
+
+ // stable sorted ordering of file names for bitmap indexing
+ private final List<String> sortedFileNames;
+ private final Map<String, Integer> fileNameToIndex;
+
+ // per-container state, lazily initialized
+ private final MappedByteBuffer[] containers;
+ private final File[] containerFiles;
+ private final ReentrantLock[] containerLocks;
+
+ // external file mappers
+ private final Map<String, PartialSegmentFileMapperV10> externalMappers = new
HashMap<>();
+
+ // track which internal files have been downloaded
+ private final Set<String> downloadedFiles = ConcurrentHashMap.newKeySet();
+ private final ConcurrentHashMap<String, ReentrantLock> fileLocks = new
ConcurrentHashMap<>();
+ private final ReentrantLock bitmapLock;
+ private final MappedByteBuffer bitmapBuffer;
+ private final AtomicLong downloadedBytes = new AtomicLong(0);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private PartialSegmentFileMapperV10(
+ SegmentFileMetadata metadata,
+ long headerSize,
+ SegmentRangeReader rangeReader,
+ String targetFilename,
+ File localCacheDir,
+ MappedByteBuffer bitmapBuffer
+ )
+ {
+ this.metadata = metadata;
+ this.headerSize = headerSize;
+ this.rangeReader = rangeReader;
+ this.targetFilename = targetFilename;
+ this.localCacheDir = localCacheDir;
+ this.bitmapBuffer = bitmapBuffer;
+
+ // build stable file name ordering for bitmap indexing
+ this.sortedFileNames = new ArrayList<>(new
TreeSet<>(metadata.getFiles().keySet()));
+ this.fileNameToIndex = new HashMap<>();
+ for (int i = 0; i < sortedFileNames.size(); i++) {
+ fileNameToIndex.put(sortedFileNames.get(i), i);
+ }
+
+ final int numContainers = metadata.getContainers().size();
+ this.containers = new MappedByteBuffer[numContainers];
+ this.containerFiles = new File[numContainers];
+ this.containerLocks = new ReentrantLock[numContainers];
+ for (int i = 0; i < numContainers; i++) {
+ this.containerLocks[i] = new ReentrantLock();
+ }
+ this.bitmapLock = new ReentrantLock();
+ }
+
+ public SegmentFileMetadata getSegmentFileMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public Set<String> getInternalFilenames()
+ {
+ return metadata.getFiles().keySet();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer mapFile(String name) throws IOException
+ {
+ checkClosed();
+
+ final SegmentInternalFileMetadata fileMetadata =
metadata.getFiles().get(name);
+ if (fileMetadata == null) {
+ return null;
+ }
+
+ ensureFileDownloaded(name, fileMetadata);
+
+ // slice from the container mmap
+ final MappedByteBuffer container = containers[fileMetadata.getContainer()];
+ final ByteBuffer view = container.asReadOnlyBuffer();
+ view.position(Ints.checkedCast(fileMetadata.getStartOffset()))
+ .limit(Ints.checkedCast(fileMetadata.getStartOffset() +
fileMetadata.getSize()));
+ return view.slice();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer mapExternalFile(String filename, String name) throws
IOException
+ {
+ checkClosed();
+ final PartialSegmentFileMapperV10 externalMapper =
externalMappers.get(filename);
+ if (externalMapper == null) {
+ throw DruidException.defensive("external file[%s] containing[%s] not
found", filename, name);
+ }
+ return externalMapper.mapFile(name);
+ }
+
+ /**
+ * Pre-download a set of internal files so that subsequent {@link
#mapFile(String)} calls for these files will not
+ * trigger individual downloads. Files that are already downloaded are
skipped. This is useful for batch-downloading
+ * all files for a projection at once.
+ */
+ public void ensureFilesAvailable(Set<String> fileNames) throws IOException
+ {
+ for (String name : fileNames) {
+ final SegmentInternalFileMetadata fileMetadata =
metadata.getFiles().get(name);
+ if (fileMetadata != null) {
+ ensureFileDownloaded(name, fileMetadata);
+ }
+ }
+ }
+
+ /**
+ * Total bytes downloaded so far across all internal files, including
external mappers.
+ */
+ public long getDownloadedBytes()
+ {
+ long total = downloadedBytes.get();
+ for (PartialSegmentFileMapperV10 ext : externalMappers.values()) {
+ total += ext.getDownloadedBytes();
+ }
+ return total;
+ }
+
+ @Override
+ public void close()
+ {
+ if (closed.compareAndSet(false, true)) {
+ final Closer closer = Closer.create();
+ closer.register(() -> ByteBufferUtils.unmap(bitmapBuffer));
+ for (MappedByteBuffer buffer : containers) {
+ if (buffer != null) {
+ closer.register(() -> ByteBufferUtils.unmap(buffer));
+ }
+ }
+ closer.registerAll(externalMappers.values());
+ CloseableUtils.closeAndWrapExceptions(closer);
+ }
+ }
+
+ private void checkClosed()
+ {
+ if (closed.get()) {
+ throw DruidException.defensive("PartialSegmentFileMapperV10 is closed");
+ }
+ }
+
+ /**
+ * Compute the absolute byte offset of an internal file within the segment
file in deep storage.
+ */
+ private long computeAbsoluteOffset(SegmentInternalFileMetadata fileMetadata)
+ {
+ final SegmentFileContainerMetadata container =
metadata.getContainers().get(fileMetadata.getContainer());
+ return headerSize + container.getStartOffset() +
fileMetadata.getStartOffset();
+ }
+
+ private void ensureFileDownloaded(String name, SegmentInternalFileMetadata
fileMetadata) throws IOException
+ {
+ // already downloaded, nothing to do
+ if (downloadedFiles.contains(name)) {
+ return;
+ }
+
+ final ReentrantLock lock = fileLocks.computeIfAbsent(name, k -> new
ReentrantLock());
+ lock.lock();
+ try {
+ checkClosed();
+
+ if (downloadedFiles.contains(name)) {
+ return;
+ }
+
+ ensureContainerInitialized(fileMetadata.getContainer());
+ downloadFileToContainer(name, fileMetadata);
+ downloadedFiles.add(name);
+ markDownloadedInBitmap(name);
+ }
+ finally {
+ lock.unlock();
+ fileLocks.remove(name, lock);
+ }
+ }
+
+ /**
+ * Initialize a local container file if not already done. Creates a sparse
file at the original container size
+ * and memory-maps it. The channel is closed immediately after mapping, the
mmap persists independently, backed by
+ * the kernel page cache. This avoids the risk of channel closure from
thread interruption.
+ */
+ private void ensureContainerInitialized(int containerIndex) throws
IOException
+ {
+ if (containers[containerIndex] != null) {
+ return;
+ }
+
+ containerLocks[containerIndex].lock();
+ try {
+ if (containers[containerIndex] != null) {
+ return;
+ }
+
+ final SegmentFileContainerMetadata containerMeta =
metadata.getContainers().get(containerIndex);
+ final File localFile = new File(
+ localCacheDir,
+ StringUtils.format("%s.container.%05d", targetFilename,
containerIndex)
+ );
+
+ // create sparse file at original container size, mmap it, then close
the channel immediately.
+ // set containerFiles before containers so that when another thread sees
containers[i] != null
+ // (the fast-path check), containerFiles[i] is guaranteed to be set
already.
+ try (RandomAccessFile raf = new RandomAccessFile(localFile, "rw");
FileChannel channel = raf.getChannel()) {
+ raf.setLength(containerMeta.getSize());
+ containerFiles[containerIndex] = localFile;
+ containers[containerIndex] = channel.map(
+ FileChannel.MapMode.READ_ONLY,
+ 0,
+ containerMeta.getSize()
+ );
+ }
+ }
+ finally {
+ containerLocks[containerIndex].unlock();
+ }
+ }
+
+ /**
+ * Download an internal file from deep storage and write it to the correct
position in its local container file.
+ * Uses a short-lived {@link RandomAccessFile} for writing. The mmap sees
the written data through the shared page
+ * cache.
+ */
+ private void downloadFileToContainer(String name,
SegmentInternalFileMetadata fileMetadata) throws IOException
+ {
+ final long absoluteOffset = computeAbsoluteOffset(fileMetadata);
+ final long size = fileMetadata.getSize();
+
+ // stream directly from deep storage to the local container file to avoid
holding the entire file in heap
+ try (InputStream is = rangeReader.readRange(targetFilename,
absoluteOffset, size);
+ RandomAccessFile raf = new
RandomAccessFile(containerFiles[fileMetadata.getContainer()], "rw")) {
+ raf.seek(fileMetadata.getStartOffset());
+ final byte[] buf = new byte[8192];
+ long remaining = size;
+ while (remaining > 0) {
+ final int toRead = (int) Math.min(buf.length, remaining);
+ final int read = is.read(buf, 0, toRead);
+ if (read < 0) {
+ throw DruidException.defensive(
+ "unexpected end of stream for file[%s], expected[%s] more bytes",
+ name,
+ remaining
+ );
+ }
+ raf.write(buf, 0, read);
+ remaining -= read;
+ }
+ }
+
+ downloadedBytes.addAndGet(size);
+ }
+
+ /**
+ * Set the bit for a downloaded file in the bitmap. Single-byte
read-modify-write on the mmap under
+ * {@link #bitmapLock}. The OS flushes the mmap to disk.
+ */
+ private void markDownloadedInBitmap(String name)
+ {
+ final Integer index = fileNameToIndex.get(name);
+ if (index == null) {
+ return;
+ }
+ final int byteIndex = index / 8;
+ final int bitMask = 1 << (index % 8);
+
+ bitmapLock.lock();
+ try {
+ final byte existing = bitmapBuffer.get(byteIndex);
+ bitmapBuffer.put(byteIndex, (byte) (existing | bitMask));
+ }
+ finally {
+ bitmapLock.unlock();
+ }
+ }
+
+ /**
+ * Fetch the raw V10 header bytes from deep storage and write them to a
local file. The bitmap region is not
+ * included, it is created by {@link #mmapBitmap} after parsing. The file is
parseable by
+ * {@link SegmentFileMetadataReader#read(InputStream, ObjectMapper)}.
+ */
+ private static void fetchAndPersistHeader(
+ SegmentRangeReader rangeReader,
+ String targetFilename,
+ File headerFile
+ ) throws IOException
+ {
+ // read the fixed header to determine the metadata size
+ final byte[] fixedHeader = new byte[SegmentFileMetadataReader.HEADER_SIZE];
+ try (InputStream headerStream = rangeReader.readRange(targetFilename, 0,
fixedHeader.length)) {
+ int offset = 0;
+ while (offset < fixedHeader.length) {
+ int read = headerStream.read(fixedHeader, offset, fixedHeader.length -
offset);
+ if (read < 0) {
+ break;
+ }
+ offset += read;
+ }
+ }
+
+ final ByteBuffer headerBuf =
ByteBuffer.wrap(fixedHeader).order(ByteOrder.LITTLE_ENDIAN);
+ final int metaLength = headerBuf.getInt(2);
+ final CompressionStrategy compressionStrategy =
CompressionStrategy.forId(headerBuf.get(1));
+
+ // compute the remaining bytes after the fixed header
+ final long remainingBytes;
+ if (CompressionStrategy.NONE == compressionStrategy) {
+ remainingBytes = metaLength;
+ } else {
+ // upper bound: compressed length int + uncompressed size (compressed
will be <= uncompressed)
Review Comment:
What guarantees "compressed will be <= uncompressed"? In general it's
possible for compressors to increase data size, if the data isn't compressible.
Is there something else that is deciding we should only compress metadata if
the compressed size is smaller than the uncompressed size?
If nothing is guaranteeing this, the amount to fetch is
`LZ4Utils.maxCompressedLength(metaLength)` if it's LZ4 compressed or
`Zstd.compressBound(metaLength)` if it's zstd.
##########
processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java:
##########
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A {@link SegmentFileMapper} that downloads internal files on demand from
deep storage via a
+ * {@link SegmentRangeReader}. This enables partial segment downloads where
only the files needed for a query are
+ * fetched, rather than downloading the entire segment.
+ * <p>
+ * Locally, this mapper mirrors the original V10 container structure: each
container from the segment file is
+ * represented as a local sparse file at its original size, and only the byte
ranges for downloaded internal files are
+ * populated. This means the number of local files (and mmaps) equals the
number of containers (typically 1-3 at
+ * 'standard' segment sizes) rather than the number of internal files (which
could be hundreds).
+ * <p>
+ * The {@link MappedByteBuffer} for each container is created once and then
the channel is closed immediately (same
+ * pattern as {@link SegmentFileMapperV10}). Writes of downloaded file data
use short-lived {@link RandomAccessFile}
+ * instances. The mmap reflects writes through the shared page cache.
+ * <p>
+ * State is persisted to disk so that the mapper can be restored after a
process restart without re-fetching metadata
+ * from deep storage. The raw V10 header bytes are written to a local file,
and a compact bitmap file is appended to
+ * the end of it to track which internal files have been downloaded (one bit
per file, updated after each download). On
+ * subsequent calls, the metadata is parsed from the local file instead of
range-reading from deep storage.
+ * <p>
+ * External segment files are supported via child {@link
PartialSegmentFileMapperV10} instances, each targeting a
+ * different file in the segment's storage location.
+ * <p>
+ * Thread-safe for concurrent access from multiple queries. Per-file locks
prevent duplicate downloads of the same
+ * internal file.
+ *
+ * @see SegmentFileMapperV10
+ * @see SegmentRangeReader
+ */
+public class PartialSegmentFileMapperV10 implements SegmentFileMapper
+{
+ static final String METADATA_HEADER_SUFFIX = ".header";
+
+ /**
+ * Create (or restore) a lazy mapper for the main segment file with attached
external file mappers. If persisted state
+ * exists locally from a previous session, metadata is read from disk.
Otherwise, metadata is fetched from deep
+ * storage via range reads and persisted locally.
+ */
+ public static PartialSegmentFileMapperV10 create(
+ SegmentRangeReader rangeReader,
+ ObjectMapper jsonMapper,
+ File localCacheDir,
+ String targetFilename,
+ List<String> externals
+ ) throws IOException
+ {
+ final PartialSegmentFileMapperV10 entryPoint = createForFile(
+ rangeReader,
+ jsonMapper,
+ localCacheDir,
+ targetFilename
+ );
+
+ final Map<String, PartialSegmentFileMapperV10> externalMappers = new
HashMap<>();
+ try {
+ for (String filename : externals) {
+ externalMappers.put(
+ filename,
+ createForFile(rangeReader, jsonMapper, localCacheDir, filename)
+ );
+ }
+ }
+ catch (Throwable t) {
+ Closer closer = Closer.create();
+ closer.register(entryPoint);
+ closer.registerAll(externalMappers.values());
+ throw CloseableUtils.closeAndWrapInCatch(t, closer);
+ }
+
+ entryPoint.externalMappers.putAll(externalMappers);
+ return entryPoint;
+ }
+
+ @VisibleForTesting
+ static PartialSegmentFileMapperV10 createForFile(
+ SegmentRangeReader rangeReader,
+ ObjectMapper jsonMapper,
+ File localCacheDir,
+ String targetFilename
+ ) throws IOException
+ {
+ FileUtils.mkdirp(localCacheDir);
+ final File headerFile = new File(localCacheDir, targetFilename +
METADATA_HEADER_SUFFIX);
+
+ // try to load from existing local file, re-fetching from deep storage if
missing or corrupted
+ SegmentFileMetadataReader.Result result = null;
+ MappedByteBuffer bitmapBuffer = null;
+
+ if (headerFile.exists()) {
+ try {
+ result = parseHeaderFile(headerFile, jsonMapper);
+ bitmapBuffer = mmapBitmap(headerFile, result);
+ }
+ catch (Exception e) {
+ // corrupted file (partial write, truncated bitmap, bad JSON, etc.) —
delete and re-fetch
+ result = null;
+ headerFile.delete();
+ }
+ }
+
+ if (result == null) {
+ fetchAndPersistHeader(rangeReader, targetFilename, headerFile);
+ result = parseHeaderFile(headerFile, jsonMapper);
+ bitmapBuffer = mmapBitmap(headerFile, result);
+ }
+
+ final PartialSegmentFileMapperV10 mapper = new PartialSegmentFileMapperV10(
+ result.getMetadata(),
+ result.getHeaderSize(),
+ rangeReader,
+ targetFilename,
+ localCacheDir,
+ bitmapBuffer
+ );
+
+ // restore downloaded files from the bitmap
+ for (int i = 0; i < mapper.sortedFileNames.size(); i++) {
+ final int byteIndex = i / 8;
+ final int bitIndex = i % 8;
+ if ((bitmapBuffer.get(byteIndex) & (1 << bitIndex)) != 0) {
+ final String name = mapper.sortedFileNames.get(i);
+ final SegmentInternalFileMetadata fileMetadata =
result.getMetadata().getFiles().get(name);
+ if (fileMetadata != null) {
+ mapper.ensureContainerInitialized(fileMetadata.getContainer());
+ mapper.downloadedFiles.add(name);
+ mapper.downloadedBytes.addAndGet(fileMetadata.getSize());
+ }
+ }
+ }
+
+ return mapper;
+ }
+
+ private final SegmentFileMetadata metadata;
+ private final long headerSize;
+ private final SegmentRangeReader rangeReader;
+ private final String targetFilename;
+ private final File localCacheDir;
+
+ // stable sorted ordering of file names for bitmap indexing
+ private final List<String> sortedFileNames;
+ private final Map<String, Integer> fileNameToIndex;
+
+ // per-container state, lazily initialized
+ private final MappedByteBuffer[] containers;
+ private final File[] containerFiles;
+ private final ReentrantLock[] containerLocks;
+
+ // external file mappers
+ private final Map<String, PartialSegmentFileMapperV10> externalMappers = new
HashMap<>();
+
+ // track which internal files have been downloaded
+ private final Set<String> downloadedFiles = ConcurrentHashMap.newKeySet();
+ private final ConcurrentHashMap<String, ReentrantLock> fileLocks = new
ConcurrentHashMap<>();
+ private final ReentrantLock bitmapLock;
+ private final MappedByteBuffer bitmapBuffer;
+ private final AtomicLong downloadedBytes = new AtomicLong(0);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private PartialSegmentFileMapperV10(
+ SegmentFileMetadata metadata,
+ long headerSize,
+ SegmentRangeReader rangeReader,
+ String targetFilename,
+ File localCacheDir,
+ MappedByteBuffer bitmapBuffer
+ )
+ {
+ this.metadata = metadata;
+ this.headerSize = headerSize;
+ this.rangeReader = rangeReader;
+ this.targetFilename = targetFilename;
+ this.localCacheDir = localCacheDir;
+ this.bitmapBuffer = bitmapBuffer;
+
+ // build stable file name ordering for bitmap indexing
+ this.sortedFileNames = new ArrayList<>(new
TreeSet<>(metadata.getFiles().keySet()));
+ this.fileNameToIndex = new HashMap<>();
+ for (int i = 0; i < sortedFileNames.size(); i++) {
+ fileNameToIndex.put(sortedFileNames.get(i), i);
+ }
+
+ final int numContainers = metadata.getContainers().size();
+ this.containers = new MappedByteBuffer[numContainers];
+ this.containerFiles = new File[numContainers];
+ this.containerLocks = new ReentrantLock[numContainers];
+ for (int i = 0; i < numContainers; i++) {
+ this.containerLocks[i] = new ReentrantLock();
+ }
+ this.bitmapLock = new ReentrantLock();
+ }
+
+ public SegmentFileMetadata getSegmentFileMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public Set<String> getInternalFilenames()
+ {
+ return metadata.getFiles().keySet();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer mapFile(String name) throws IOException
+ {
+ checkClosed();
+
+ final SegmentInternalFileMetadata fileMetadata =
metadata.getFiles().get(name);
+ if (fileMetadata == null) {
+ return null;
+ }
+
+ ensureFileDownloaded(name, fileMetadata);
+
+ // slice from the container mmap
+ final MappedByteBuffer container = containers[fileMetadata.getContainer()];
+ final ByteBuffer view = container.asReadOnlyBuffer();
+ view.position(Ints.checkedCast(fileMetadata.getStartOffset()))
+ .limit(Ints.checkedCast(fileMetadata.getStartOffset() +
fileMetadata.getSize()));
+ return view.slice();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer mapExternalFile(String filename, String name) throws
IOException
+ {
+ checkClosed();
+ final PartialSegmentFileMapperV10 externalMapper =
externalMappers.get(filename);
+ if (externalMapper == null) {
+ throw DruidException.defensive("external file[%s] containing[%s] not
found", filename, name);
+ }
+ return externalMapper.mapFile(name);
+ }
+
+ /**
+ * Pre-download a set of internal files so that subsequent {@link
#mapFile(String)} calls for these files will not
+ * trigger individual downloads. Files that are already downloaded are
skipped. This is useful for batch-downloading
+ * all files for a projection at once.
+ */
+ public void ensureFilesAvailable(Set<String> fileNames) throws IOException
+ {
+ for (String name : fileNames) {
+ final SegmentInternalFileMetadata fileMetadata =
metadata.getFiles().get(name);
+ if (fileMetadata != null) {
+ ensureFileDownloaded(name, fileMetadata);
+ }
+ }
+ }
+
+ /**
+ * Total bytes downloaded so far across all internal files, including
external mappers.
+ */
+ public long getDownloadedBytes()
+ {
+ long total = downloadedBytes.get();
+ for (PartialSegmentFileMapperV10 ext : externalMappers.values()) {
+ total += ext.getDownloadedBytes();
+ }
+ return total;
+ }
+
+ @Override
+ public void close()
+ {
+ if (closed.compareAndSet(false, true)) {
+ final Closer closer = Closer.create();
+ closer.register(() -> ByteBufferUtils.unmap(bitmapBuffer));
+ for (MappedByteBuffer buffer : containers) {
+ if (buffer != null) {
+ closer.register(() -> ByteBufferUtils.unmap(buffer));
+ }
+ }
+ closer.registerAll(externalMappers.values());
+ CloseableUtils.closeAndWrapExceptions(closer);
+ }
+ }
+
+ private void checkClosed()
+ {
+ if (closed.get()) {
+ throw DruidException.defensive("PartialSegmentFileMapperV10 is closed");
+ }
+ }
+
+ /**
+ * Compute the absolute byte offset of an internal file within the segment
file in deep storage.
+ */
+ private long computeAbsoluteOffset(SegmentInternalFileMetadata fileMetadata)
+ {
+ final SegmentFileContainerMetadata container =
metadata.getContainers().get(fileMetadata.getContainer());
+ return headerSize + container.getStartOffset() +
fileMetadata.getStartOffset();
+ }
+
+ private void ensureFileDownloaded(String name, SegmentInternalFileMetadata
fileMetadata) throws IOException
+ {
+ // already downloaded, nothing to do
+ if (downloadedFiles.contains(name)) {
+ return;
+ }
+
+ final ReentrantLock lock = fileLocks.computeIfAbsent(name, k -> new
ReentrantLock());
+ lock.lock();
+ try {
+ checkClosed();
+
+ if (downloadedFiles.contains(name)) {
+ return;
+ }
+
+ ensureContainerInitialized(fileMetadata.getContainer());
+ downloadFileToContainer(name, fileMetadata);
+ downloadedFiles.add(name);
+ markDownloadedInBitmap(name);
+ }
+ finally {
+ lock.unlock();
+ fileLocks.remove(name, lock);
+ }
+ }
+
+ /**
+ * Initialize a local container file if not already done. Creates a sparse
file at the original container size
+ * and memory-maps it. The channel is closed immediately after mapping, the
mmap persists independently, backed by
+ * the kernel page cache. This avoids the risk of channel closure from
thread interruption.
+ */
+ private void ensureContainerInitialized(int containerIndex) throws
IOException
+ {
+ if (containers[containerIndex] != null) {
+ return;
+ }
+
+ containerLocks[containerIndex].lock();
+ try {
+ if (containers[containerIndex] != null) {
+ return;
+ }
+
+ final SegmentFileContainerMetadata containerMeta =
metadata.getContainers().get(containerIndex);
+ final File localFile = new File(
+ localCacheDir,
+ StringUtils.format("%s.container.%05d", targetFilename,
containerIndex)
+ );
+
+ // create sparse file at original container size, mmap it, then close
the channel immediately.
+ // set containerFiles before containers so that when another thread sees
containers[i] != null
+ // (the fast-path check), containerFiles[i] is guaranteed to be set
already.
+ try (RandomAccessFile raf = new RandomAccessFile(localFile, "rw");
FileChannel channel = raf.getChannel()) {
+ raf.setLength(containerMeta.getSize());
+ containerFiles[containerIndex] = localFile;
+ containers[containerIndex] = channel.map(
+ FileChannel.MapMode.READ_ONLY,
+ 0,
+ containerMeta.getSize()
+ );
+ }
+ }
+ finally {
+ containerLocks[containerIndex].unlock();
+ }
+ }
+
+ /**
+ * Download an internal file from deep storage and write it to the correct
position in its local container file.
+ * Uses a short-lived {@link RandomAccessFile} for writing. The mmap sees
the written data through the shared page
+ * cache.
+ */
+ private void downloadFileToContainer(String name,
SegmentInternalFileMetadata fileMetadata) throws IOException
+ {
+ final long absoluteOffset = computeAbsoluteOffset(fileMetadata);
+ final long size = fileMetadata.getSize();
+
+ // stream directly from deep storage to the local container file to avoid
holding the entire file in heap
+ try (InputStream is = rangeReader.readRange(targetFilename,
absoluteOffset, size);
+ RandomAccessFile raf = new
RandomAccessFile(containerFiles[fileMetadata.getContainer()], "rw")) {
+ raf.seek(fileMetadata.getStartOffset());
+ final byte[] buf = new byte[8192];
+ long remaining = size;
+ while (remaining > 0) {
+ final int toRead = (int) Math.min(buf.length, remaining);
+ final int read = is.read(buf, 0, toRead);
+ if (read < 0) {
+ throw DruidException.defensive(
+ "unexpected end of stream for file[%s], expected[%s] more bytes",
+ name,
+ remaining
+ );
+ }
+ raf.write(buf, 0, read);
+ remaining -= read;
+ }
+ }
+
+ downloadedBytes.addAndGet(size);
+ }
+
+ /**
+ * Set the bit for a downloaded file in the bitmap. Single-byte
read-modify-write on the mmap under
+ * {@link #bitmapLock}. The OS flushes the mmap to disk.
+ */
+ private void markDownloadedInBitmap(String name)
+ {
+ final Integer index = fileNameToIndex.get(name);
+ if (index == null) {
+ return;
+ }
+ final int byteIndex = index / 8;
+ final int bitMask = 1 << (index % 8);
+
+ bitmapLock.lock();
+ try {
+ final byte existing = bitmapBuffer.get(byteIndex);
+ bitmapBuffer.put(byteIndex, (byte) (existing | bitMask));
+ }
+ finally {
+ bitmapLock.unlock();
+ }
+ }
+
+ /**
+ * Fetch the raw V10 header bytes from deep storage and write them to a
local file. The bitmap region is not
+ * included, it is created by {@link #mmapBitmap} after parsing. The file is
parseable by
+ * {@link SegmentFileMetadataReader#read(InputStream, ObjectMapper)}.
+ */
+ private static void fetchAndPersistHeader(
+ SegmentRangeReader rangeReader,
+ String targetFilename,
+ File headerFile
+ ) throws IOException
+ {
+ // read the fixed header to determine the metadata size
+ final byte[] fixedHeader = new byte[SegmentFileMetadataReader.HEADER_SIZE];
+ try (InputStream headerStream = rangeReader.readRange(targetFilename, 0,
fixedHeader.length)) {
+ int offset = 0;
+ while (offset < fixedHeader.length) {
+ int read = headerStream.read(fixedHeader, offset, fixedHeader.length -
offset);
+ if (read < 0) {
+ break;
+ }
+ offset += read;
+ }
+ }
+
+ final ByteBuffer headerBuf =
ByteBuffer.wrap(fixedHeader).order(ByteOrder.LITTLE_ENDIAN);
+ final int metaLength = headerBuf.getInt(2);
+ final CompressionStrategy compressionStrategy =
CompressionStrategy.forId(headerBuf.get(1));
+
+ // compute the remaining bytes after the fixed header
+ final long remainingBytes;
+ if (CompressionStrategy.NONE == compressionStrategy) {
+ remainingBytes = metaLength;
+ } else {
+ // upper bound: compressed length int + uncompressed size (compressed
will be <= uncompressed)
+ remainingBytes = Integer.BYTES + metaLength;
+ }
+
+ // write fixed header + remaining metadata bytes to a local file
atomically (write to temp, then rename)
+ // to avoid leaving a partial file on disk if the process crashes mid-write
+ FileUtils.mkdirp(headerFile.getParentFile());
+ FileUtils.writeAtomically(headerFile, out -> {
+ out.write(fixedHeader);
+ try (InputStream remainingStream = rangeReader.readRange(
+ targetFilename,
+ SegmentFileMetadataReader.HEADER_SIZE,
+ remainingBytes
+ )) {
+ final byte[] buf = new byte[8192];
Review Comment:
Looks like `ByteStreams.limit(remainingStream,
remainingBytes).transferTo(out)`.
##########
processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java:
##########
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A {@link SegmentFileMapper} that downloads internal files on demand from
deep storage via a
+ * {@link SegmentRangeReader}. This enables partial segment downloads where
only the files needed for a query are
+ * fetched, rather than downloading the entire segment.
+ * <p>
+ * Locally, this mapper mirrors the original V10 container structure: each
container from the segment file is
+ * represented as a local sparse file at its original size, and only the byte
ranges for downloaded internal files are
+ * populated. This means the number of local files (and mmaps) equals the
number of containers (typically 1-3 at
+ * 'standard' segment sizes) rather than the number of internal files (which
could be hundreds).
+ * <p>
+ * The {@link MappedByteBuffer} for each container is created once and then
the channel is closed immediately (same
+ * pattern as {@link SegmentFileMapperV10}). Writes of downloaded file data
use short-lived {@link RandomAccessFile}
+ * instances. The mmap reflects writes through the shared page cache.
+ * <p>
+ * State is persisted to disk so that the mapper can be restored after a
process restart without re-fetching metadata
+ * from deep storage. The raw V10 header bytes are written to a local file,
and a compact bitmap file is appended to
+ * the end of it to track which internal files have been downloaded (one bit
per file, updated after each download). On
+ * subsequent calls, the metadata is parsed from the local file instead of
range-reading from deep storage.
+ * <p>
+ * External segment files are supported via child {@link
PartialSegmentFileMapperV10} instances, each targeting a
+ * different file in the segment's storage location.
+ * <p>
+ * Thread-safe for concurrent access from multiple queries. Per-file locks
prevent duplicate downloads of the same
+ * internal file.
+ *
+ * @see SegmentFileMapperV10
+ * @see SegmentRangeReader
+ */
+public class PartialSegmentFileMapperV10 implements SegmentFileMapper
+{
+ static final String METADATA_HEADER_SUFFIX = ".header";
+
+ /**
+ * Create (or restore) a lazy mapper for the main segment file with attached
external file mappers. If persisted state
+ * exists locally from a previous session, metadata is read from disk.
Otherwise, metadata is fetched from deep
+ * storage via range reads and persisted locally.
+ */
+ public static PartialSegmentFileMapperV10 create(
+ SegmentRangeReader rangeReader,
+ ObjectMapper jsonMapper,
+ File localCacheDir,
+ String targetFilename,
+ List<String> externals
+ ) throws IOException
+ {
+ final PartialSegmentFileMapperV10 entryPoint = createForFile(
+ rangeReader,
+ jsonMapper,
+ localCacheDir,
+ targetFilename
+ );
+
+ final Map<String, PartialSegmentFileMapperV10> externalMappers = new
HashMap<>();
+ try {
+ for (String filename : externals) {
+ externalMappers.put(
+ filename,
+ createForFile(rangeReader, jsonMapper, localCacheDir, filename)
+ );
+ }
+ }
+ catch (Throwable t) {
+ Closer closer = Closer.create();
+ closer.register(entryPoint);
+ closer.registerAll(externalMappers.values());
+ throw CloseableUtils.closeAndWrapInCatch(t, closer);
+ }
+
+ entryPoint.externalMappers.putAll(externalMappers);
+ return entryPoint;
+ }
+
+ @VisibleForTesting
+ static PartialSegmentFileMapperV10 createForFile(
+ SegmentRangeReader rangeReader,
+ ObjectMapper jsonMapper,
+ File localCacheDir,
+ String targetFilename
+ ) throws IOException
+ {
+ FileUtils.mkdirp(localCacheDir);
+ final File headerFile = new File(localCacheDir, targetFilename +
METADATA_HEADER_SUFFIX);
+
+ // try to load from existing local file, re-fetching from deep storage if
missing or corrupted
+ SegmentFileMetadataReader.Result result = null;
+ MappedByteBuffer bitmapBuffer = null;
+
+ if (headerFile.exists()) {
+ try {
+ result = parseHeaderFile(headerFile, jsonMapper);
+ bitmapBuffer = mmapBitmap(headerFile, result);
+ }
+ catch (Exception e) {
+ // corrupted file (partial write, truncated bitmap, bad JSON, etc.) —
delete and re-fetch
+ result = null;
+ headerFile.delete();
+ }
+ }
+
+ if (result == null) {
+ fetchAndPersistHeader(rangeReader, targetFilename, headerFile);
+ result = parseHeaderFile(headerFile, jsonMapper);
+ bitmapBuffer = mmapBitmap(headerFile, result);
+ }
+
+ final PartialSegmentFileMapperV10 mapper = new PartialSegmentFileMapperV10(
+ result.getMetadata(),
+ result.getHeaderSize(),
+ rangeReader,
+ targetFilename,
+ localCacheDir,
+ bitmapBuffer
+ );
+
+ // restore downloaded files from the bitmap
+ for (int i = 0; i < mapper.sortedFileNames.size(); i++) {
+ final int byteIndex = i / 8;
+ final int bitIndex = i % 8;
+ if ((bitmapBuffer.get(byteIndex) & (1 << bitIndex)) != 0) {
+ final String name = mapper.sortedFileNames.get(i);
+ final SegmentInternalFileMetadata fileMetadata =
result.getMetadata().getFiles().get(name);
+ if (fileMetadata != null) {
+ mapper.ensureContainerInitialized(fileMetadata.getContainer());
+ mapper.downloadedFiles.add(name);
+ mapper.downloadedBytes.addAndGet(fileMetadata.getSize());
+ }
+ }
+ }
+
+ return mapper;
+ }
+
+ private final SegmentFileMetadata metadata;
+ private final long headerSize;
+ private final SegmentRangeReader rangeReader;
+ private final String targetFilename;
+ private final File localCacheDir;
+
+ // stable sorted ordering of file names for bitmap indexing
+ private final List<String> sortedFileNames;
+ private final Map<String, Integer> fileNameToIndex;
+
+ // per-container state, lazily initialized
+ private final MappedByteBuffer[] containers;
+ private final File[] containerFiles;
+ private final ReentrantLock[] containerLocks;
+
+ // external file mappers
+ private final Map<String, PartialSegmentFileMapperV10> externalMappers = new
HashMap<>();
+
+ // track which internal files have been downloaded
+ private final Set<String> downloadedFiles = ConcurrentHashMap.newKeySet();
+ private final ConcurrentHashMap<String, ReentrantLock> fileLocks = new
ConcurrentHashMap<>();
+ private final ReentrantLock bitmapLock;
+ private final MappedByteBuffer bitmapBuffer;
+ private final AtomicLong downloadedBytes = new AtomicLong(0);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private PartialSegmentFileMapperV10(
+ SegmentFileMetadata metadata,
+ long headerSize,
+ SegmentRangeReader rangeReader,
+ String targetFilename,
+ File localCacheDir,
+ MappedByteBuffer bitmapBuffer
+ )
+ {
+ this.metadata = metadata;
+ this.headerSize = headerSize;
+ this.rangeReader = rangeReader;
+ this.targetFilename = targetFilename;
+ this.localCacheDir = localCacheDir;
+ this.bitmapBuffer = bitmapBuffer;
+
+ // build stable file name ordering for bitmap indexing
+ this.sortedFileNames = new ArrayList<>(new
TreeSet<>(metadata.getFiles().keySet()));
+ this.fileNameToIndex = new HashMap<>();
+ for (int i = 0; i < sortedFileNames.size(); i++) {
+ fileNameToIndex.put(sortedFileNames.get(i), i);
+ }
+
+ final int numContainers = metadata.getContainers().size();
+ this.containers = new MappedByteBuffer[numContainers];
+ this.containerFiles = new File[numContainers];
+ this.containerLocks = new ReentrantLock[numContainers];
+ for (int i = 0; i < numContainers; i++) {
+ this.containerLocks[i] = new ReentrantLock();
+ }
+ this.bitmapLock = new ReentrantLock();
+ }
+
+ public SegmentFileMetadata getSegmentFileMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public Set<String> getInternalFilenames()
+ {
+ return metadata.getFiles().keySet();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer mapFile(String name) throws IOException
+ {
+ checkClosed();
+
+ final SegmentInternalFileMetadata fileMetadata =
metadata.getFiles().get(name);
+ if (fileMetadata == null) {
+ return null;
+ }
+
+ ensureFileDownloaded(name, fileMetadata);
+
+ // slice from the container mmap
+ final MappedByteBuffer container = containers[fileMetadata.getContainer()];
+ final ByteBuffer view = container.asReadOnlyBuffer();
+ view.position(Ints.checkedCast(fileMetadata.getStartOffset()))
+ .limit(Ints.checkedCast(fileMetadata.getStartOffset() +
fileMetadata.getSize()));
+ return view.slice();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer mapExternalFile(String filename, String name) throws
IOException
+ {
+ checkClosed();
+ final PartialSegmentFileMapperV10 externalMapper =
externalMappers.get(filename);
+ if (externalMapper == null) {
+ throw DruidException.defensive("external file[%s] containing[%s] not
found", filename, name);
+ }
+ return externalMapper.mapFile(name);
+ }
+
+ /**
+ * Pre-download a set of internal files so that subsequent {@link
#mapFile(String)} calls for these files will not
+ * trigger individual downloads. Files that are already downloaded are
skipped. This is useful for batch-downloading
+ * all files for a projection at once.
+ */
+ public void ensureFilesAvailable(Set<String> fileNames) throws IOException
+ {
+ for (String name : fileNames) {
+ final SegmentInternalFileMetadata fileMetadata =
metadata.getFiles().get(name);
+ if (fileMetadata != null) {
+ ensureFileDownloaded(name, fileMetadata);
+ }
+ }
+ }
+
+ /**
+ * Total bytes downloaded so far across all internal files, including
external mappers.
+ */
+ public long getDownloadedBytes()
+ {
+ long total = downloadedBytes.get();
+ for (PartialSegmentFileMapperV10 ext : externalMappers.values()) {
+ total += ext.getDownloadedBytes();
+ }
+ return total;
+ }
+
+ @Override
+ public void close()
+ {
+ if (closed.compareAndSet(false, true)) {
+ final Closer closer = Closer.create();
+ closer.register(() -> ByteBufferUtils.unmap(bitmapBuffer));
+ for (MappedByteBuffer buffer : containers) {
+ if (buffer != null) {
+ closer.register(() -> ByteBufferUtils.unmap(buffer));
+ }
+ }
+ closer.registerAll(externalMappers.values());
+ CloseableUtils.closeAndWrapExceptions(closer);
+ }
+ }
+
+ private void checkClosed()
+ {
+ if (closed.get()) {
+ throw DruidException.defensive("PartialSegmentFileMapperV10 is closed");
+ }
+ }
+
+ /**
+ * Compute the absolute byte offset of an internal file within the segment
file in deep storage.
+ */
+ private long computeAbsoluteOffset(SegmentInternalFileMetadata fileMetadata)
+ {
+ final SegmentFileContainerMetadata container =
metadata.getContainers().get(fileMetadata.getContainer());
+ return headerSize + container.getStartOffset() +
fileMetadata.getStartOffset();
+ }
+
+ private void ensureFileDownloaded(String name, SegmentInternalFileMetadata
fileMetadata) throws IOException
+ {
+ // already downloaded, nothing to do
+ if (downloadedFiles.contains(name)) {
+ return;
+ }
+
+ final ReentrantLock lock = fileLocks.computeIfAbsent(name, k -> new
ReentrantLock());
+ lock.lock();
+ try {
+ checkClosed();
+
+ if (downloadedFiles.contains(name)) {
+ return;
+ }
+
+ ensureContainerInitialized(fileMetadata.getContainer());
+ downloadFileToContainer(name, fileMetadata);
+ downloadedFiles.add(name);
+ markDownloadedInBitmap(name);
+ }
+ finally {
+ lock.unlock();
+ fileLocks.remove(name, lock);
+ }
+ }
+
+ /**
+ * Initialize a local container file if not already done. Creates a sparse
file at the original container size
+ * and memory-maps it. The channel is closed immediately after mapping, the
mmap persists independently, backed by
+ * the kernel page cache. This avoids the risk of channel closure from
thread interruption.
+ */
+ private void ensureContainerInitialized(int containerIndex) throws
IOException
+ {
+ if (containers[containerIndex] != null) {
+ return;
+ }
+
+ containerLocks[containerIndex].lock();
+ try {
+ if (containers[containerIndex] != null) {
+ return;
+ }
+
+ final SegmentFileContainerMetadata containerMeta =
metadata.getContainers().get(containerIndex);
+ final File localFile = new File(
+ localCacheDir,
+ StringUtils.format("%s.container.%05d", targetFilename,
containerIndex)
+ );
+
+ // create sparse file at original container size, mmap it, then close
the channel immediately.
+ // set containerFiles before containers so that when another thread sees
containers[i] != null
+ // (the fast-path check), containerFiles[i] is guaranteed to be set
already.
+ try (RandomAccessFile raf = new RandomAccessFile(localFile, "rw");
FileChannel channel = raf.getChannel()) {
+ raf.setLength(containerMeta.getSize());
+ containerFiles[containerIndex] = localFile;
+ containers[containerIndex] = channel.map(
+ FileChannel.MapMode.READ_ONLY,
+ 0,
+ containerMeta.getSize()
+ );
+ }
+ }
+ finally {
+ containerLocks[containerIndex].unlock();
+ }
+ }
+
+ /**
+ * Download an internal file from deep storage and write it to the correct
position in its local container file.
+ * Uses a short-lived {@link RandomAccessFile} for writing. The mmap sees
the written data through the shared page
+ * cache.
+ */
+ private void downloadFileToContainer(String name,
SegmentInternalFileMetadata fileMetadata) throws IOException
+ {
+ final long absoluteOffset = computeAbsoluteOffset(fileMetadata);
+ final long size = fileMetadata.getSize();
+
+ // stream directly from deep storage to the local container file to avoid
holding the entire file in heap
+ try (InputStream is = rangeReader.readRange(targetFilename,
absoluteOffset, size);
+ RandomAccessFile raf = new
RandomAccessFile(containerFiles[fileMetadata.getContainer()], "rw")) {
+ raf.seek(fileMetadata.getStartOffset());
+ final byte[] buf = new byte[8192];
+ long remaining = size;
+ while (remaining > 0) {
+ final int toRead = (int) Math.min(buf.length, remaining);
+ final int read = is.read(buf, 0, toRead);
+ if (read < 0) {
+ throw DruidException.defensive(
+ "unexpected end of stream for file[%s], expected[%s] more bytes",
+ name,
+ remaining
+ );
+ }
+ raf.write(buf, 0, read);
+ remaining -= read;
+ }
+ }
+
+ downloadedBytes.addAndGet(size);
+ }
+
+ /**
+ * Set the bit for a downloaded file in the bitmap. Single-byte
read-modify-write on the mmap under
+ * {@link #bitmapLock}. The OS flushes the mmap to disk.
+ */
+ private void markDownloadedInBitmap(String name)
+ {
+ final Integer index = fileNameToIndex.get(name);
+ if (index == null) {
+ return;
+ }
+ final int byteIndex = index / 8;
+ final int bitMask = 1 << (index % 8);
+
+ bitmapLock.lock();
+ try {
+ final byte existing = bitmapBuffer.get(byteIndex);
+ bitmapBuffer.put(byteIndex, (byte) (existing | bitMask));
+ }
+ finally {
+ bitmapLock.unlock();
+ }
+ }
+
+ /**
+ * Fetch the raw V10 header bytes from deep storage and write them to a
local file. The bitmap region is not
+ * included, it is created by {@link #mmapBitmap} after parsing. The file is
parseable by
+ * {@link SegmentFileMetadataReader#read(InputStream, ObjectMapper)}.
+ */
+ private static void fetchAndPersistHeader(
+ SegmentRangeReader rangeReader,
+ String targetFilename,
+ File headerFile
+ ) throws IOException
+ {
+ // read the fixed header to determine the metadata size
+ final byte[] fixedHeader = new byte[SegmentFileMetadataReader.HEADER_SIZE];
+ try (InputStream headerStream = rangeReader.readRange(targetFilename, 0,
fixedHeader.length)) {
+ int offset = 0;
+ while (offset < fixedHeader.length) {
+ int read = headerStream.read(fixedHeader, offset, fixedHeader.length -
offset);
+ if (read < 0) {
+ break;
+ }
+ offset += read;
+ }
+ }
+
+ final ByteBuffer headerBuf =
ByteBuffer.wrap(fixedHeader).order(ByteOrder.LITTLE_ENDIAN);
+ final int metaLength = headerBuf.getInt(2);
+ final CompressionStrategy compressionStrategy =
CompressionStrategy.forId(headerBuf.get(1));
+
+ // compute the remaining bytes after the fixed header
+ final long remainingBytes;
+ if (CompressionStrategy.NONE == compressionStrategy) {
+ remainingBytes = metaLength;
+ } else {
+ // upper bound: compressed length int + uncompressed size (compressed
will be <= uncompressed)
+ remainingBytes = Integer.BYTES + metaLength;
+ }
+
+ // write fixed header + remaining metadata bytes to a local file
atomically (write to temp, then rename)
+ // to avoid leaving a partial file on disk if the process crashes mid-write
+ FileUtils.mkdirp(headerFile.getParentFile());
+ FileUtils.writeAtomically(headerFile, out -> {
+ out.write(fixedHeader);
+ try (InputStream remainingStream = rangeReader.readRange(
+ targetFilename,
+ SegmentFileMetadataReader.HEADER_SIZE,
+ remainingBytes
+ )) {
+ final byte[] buf = new byte[8192];
+ long toWrite = remainingBytes;
+ while (toWrite > 0) {
+ final int read = remainingStream.read(buf, 0, (int)
Math.min(buf.length, toWrite));
+ if (read < 0) {
+ break;
+ }
+ out.write(buf, 0, read);
+ toWrite -= read;
+ }
+ }
+ return null;
+ });
+ }
+
+ /**
+ * Parse metadata from the header file. Throws on any parse failure (corrupt
JSON, truncated header, etc.).
+ */
+ private static SegmentFileMetadataReader.Result parseHeaderFile(
+ File headerFile,
+ ObjectMapper jsonMapper
+ ) throws IOException
+ {
+ try (FileInputStream fis = new FileInputStream(headerFile)) {
+ return SegmentFileMetadataReader.read(fis, jsonMapper);
+ }
+ }
+
+ /**
+ * Mmap the bitmap region of the header file as read-write. Extends the file
if the bitmap region doesn't exist yet.
+ * The channel is closed immediately after mapping.
+ */
+ private static MappedByteBuffer mmapBitmap(
+ File headerFile,
+ SegmentFileMetadataReader.Result result
+ ) throws IOException
+ {
+ final int numBitmapBytes = (result.getMetadata().getFiles().size() + 7) /
8;
+ final long expectedSize = result.getHeaderSize() + numBitmapBytes;
+ try (RandomAccessFile raf = new RandomAccessFile(headerFile, "rw");
+ FileChannel channel = raf.getChannel()) {
+ if (raf.length() < expectedSize) {
Review Comment:
Is it possible this will erroneously be false (and therefore we'll map some
stuff from the first container) if the header was compressed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]