This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3d095672e26 feat: rework SegmentFileBuilderV10 to organize internal
containers by projection to be friendly to partial loading/eviction (#19375)
3d095672e26 is described below
commit 3d095672e267fac6cba6c6b5feac118e0ea79674
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Apr 29 12:11:33 2026 -0700
feat: rework SegmentFileBuilderV10 to organize internal containers by
projection to be friendly to partial loading/eviction (#19375)
---
.../data/input/impl/AggregateProjectionSpec.java | 6 +-
.../java/util/common/io/smoosh/FileSmoosher.java | 97 +----
.../java/org/apache/druid/segment/IndexIO.java | 2 +-
.../org/apache/druid/segment/IndexMergerBase.java | 11 +-
.../org/apache/druid/segment/IndexMergerV10.java | 1 +
.../druid/segment/file/SegmentFileBuilder.java | 18 +
.../druid/segment/file/SegmentFileBuilderV10.java | 462 ++++++++++++++++++++-
.../druid/segment/projections/Projections.java | 17 +-
.../segment/file/SegmentFileBuilderV10Test.java | 368 ++++++++++++++++
9 files changed, 869 insertions(+), 113 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
index 778a6ae734f..a19a5e1a6e6 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
@@ -39,6 +39,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.projections.AggregateProjectionSchema;
+import org.apache.druid.segment.projections.Projections;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTimeZone;
@@ -99,10 +100,7 @@ public class AggregateProjectionSpec
@JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators
)
{
- if (name == null || name.isEmpty()) {
- throw InvalidInput.exception("projection name cannot be null or empty");
- }
- this.name = name;
+ this.name = Projections.validateProjectionName(name);
if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null
|| aggregators.length == 0)) {
throw InvalidInput.exception(
"projection[%s] groupingColumns and aggregators must not both be
null or empty",
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
index dd3842f1011..f6166e65648 100644
---
a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
+++
b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
@@ -35,11 +35,8 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.file.SegmentFileBuilder;
import org.apache.druid.segment.file.SegmentFileChannel;
-import org.apache.druid.segment.file.SegmentFileContainerMetadata;
-import org.apache.druid.segment.file.SegmentInternalFileMetadata;
import org.apache.druid.utils.CloseableUtils;
-import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
@@ -99,35 +96,20 @@ public class FileSmoosher implements SegmentFileBuilder
private Outer currOut = null;
private boolean writerCurrentlyInUse = false;
- // helper for SegmentFileBuilderV10 to have control over naming of smoosh
output files; if this is non-null
- // meta.smoosh is not written
- @Nullable
- private final String outputFileName;
-
public FileSmoosher(
File baseDir
)
{
- this(baseDir, Integer.MAX_VALUE, null);
+ this(baseDir, Integer.MAX_VALUE);
}
public FileSmoosher(
File baseDir,
int maxChunkSize
)
- {
- this(baseDir, maxChunkSize, null);
- }
-
- public FileSmoosher(
- File baseDir,
- int maxChunkSize,
- @Nullable String outputFileName
- )
{
this.baseDir = baseDir;
this.maxChunkSize = maxChunkSize;
- this.outputFileName = outputFileName;
this.delegateFileNameMap = new HashMap<>();
Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a
positive value.");
@@ -143,43 +125,6 @@ public class FileSmoosher implements SegmentFileBuilder
return new File(baseDir, StringUtils.format("%05d.%s", i, FILE_EXTENSION));
}
- static File makeChunkFile(File baseDir, String prefix, int i)
- {
- return new File(baseDir, StringUtils.format("%s-%05d.%s", prefix, i,
FILE_EXTENSION));
- }
-
- public List<File> getOutFiles()
- {
- return outFiles;
- }
-
- public List<SegmentFileContainerMetadata> getContainers()
- {
- List<SegmentFileContainerMetadata> smooshContainers = new ArrayList<>();
- long offset = 0;
- for (File f : outFiles) {
- smooshContainers.add(new SegmentFileContainerMetadata(offset,
f.length()));
- offset += f.length();
- }
- return smooshContainers;
- }
-
- public Map<String, SegmentInternalFileMetadata> getInternalFiles()
- {
- Map<String, SegmentInternalFileMetadata> smooshFileMetadata = new
TreeMap<>();
- for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
- smooshFileMetadata.put(
- entry.getKey(),
- new SegmentInternalFileMetadata(
- entry.getValue().getFileNum(),
- entry.getValue().getStartOffset(),
- entry.getValue().getEndOffset() -
entry.getValue().getStartOffset()
- )
- );
- }
- return smooshFileMetadata;
- }
-
@Override
public void addColumn(String name, ColumnDescriptor columnDescriptor)
{
@@ -456,26 +401,24 @@ public class FileSmoosher implements SegmentFileBuilder
currOut.close();
}
- if (outputFileName == null) {
- File metaFile = metaFile(baseDir);
-
- try (Writer out =
- new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(metaFile), StandardCharsets.UTF_8))) {
- out.write(StringUtils.format("v1,%d,%d", maxChunkSize,
outFiles.size()));
+ File metaFile = metaFile(baseDir);
+
+ try (Writer out =
+ new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(metaFile), StandardCharsets.UTF_8))) {
+ out.write(StringUtils.format("v1,%d,%d", maxChunkSize, outFiles.size()));
+ out.write("\n");
+
+ for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
+ final Metadata metadata = entry.getValue();
+ out.write(
+ JOINER.join(
+ entry.getKey(),
+ metadata.getFileNum(),
+ metadata.getStartOffset(),
+ metadata.getEndOffset()
+ )
+ );
out.write("\n");
-
- for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
- final Metadata metadata = entry.getValue();
- out.write(
- JOINER.join(
- entry.getKey(),
- metadata.getFileNum(),
- metadata.getStartOffset(),
- metadata.getEndOffset()
- )
- );
- out.write("\n");
- }
}
}
}
@@ -483,9 +426,7 @@ public class FileSmoosher implements SegmentFileBuilder
private Outer getNewCurrOut() throws FileNotFoundException
{
final int fileNum = outFiles.size();
- File outFile = outputFileName != null
- ? makeChunkFile(baseDir, outputFileName, fileNum)
- : makeChunkFile(baseDir, fileNum);
+ File outFile = makeChunkFile(baseDir, fileNum);
outFiles.add(outFile);
return new Outer(fileNum, outFile, maxChunkSize);
}
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index e5acd4f73fb..f246421f841 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -1039,7 +1039,7 @@ public class IndexIO
final Map<String, Supplier<BaseColumnHolder>> projectionColumns = new
LinkedHashMap<>();
for (String column : projectionSpec.getSchema().getColumnNames()) {
- final String smooshName =
Projections.getProjectionSmooshFileName(projectionSpec.getSchema(), column);
+ final String smooshName =
Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(),
column);
final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName);
final ColumnDescriptor columnDescriptor =
metadata.getColumnDescriptors().get(smooshName);
if (columnDescriptor == null) {
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
index 2f311015359..06cde93fa3d 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
@@ -512,7 +512,7 @@ public abstract class IndexMergerBase implements IndexMerger
columnFormats.put(dimension, dimensionFormat);
DimensionHandler handler = dimensionFormat.getColumnHandler(dimension);
DimensionMergerV9 merger = handler.makeMerger(
- Projections.getProjectionSmooshFileName(spec.getSchema(),
dimension),
+ Projections.getProjectionSegmentInternalFileName(spec.getSchema(),
dimension),
indexSpec,
segmentWriteOutMedium,
dimensionFormat.toColumnCapabilities(),
@@ -543,7 +543,7 @@ public abstract class IndexMergerBase implements IndexMerger
metrics,
columnFormats,
indexSpec,
- Projections.getProjectionSmooshPrefix(spec.getSchema())
+
Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema())
);
Function<List<TransformableRowIterator>, TimeAndDimsIterator>
rowMergerFn =
@@ -630,13 +630,14 @@ public abstract class IndexMergerBase implements
IndexMerger
final String section2 = "build projection[" + projectionSchema.getName()
+ "] inverted index and columns";
progress.startSection(section2);
+ segmentFileBuilder.startFileGroup(projectionSchema.getName());
if (projectionSchema.getTimeColumnName() != null) {
makeTimeColumn(
segmentFileBuilder,
progress,
timeWriter,
indexSpec,
- Projections.getProjectionSmooshFileName(spec.getSchema(),
projectionSchema.getTimeColumnName())
+ Projections.getProjectionSegmentInternalFileName(spec.getSchema(),
projectionSchema.getTimeColumnName())
);
}
makeMetricsColumns(
@@ -646,7 +647,7 @@ public abstract class IndexMergerBase implements IndexMerger
columnFormats,
metricWriters,
indexSpec,
- Projections.getProjectionSmooshPrefix(spec.getSchema())
+ Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema())
);
for (int i = 0; i < dimensions.size(); i++) {
@@ -664,7 +665,7 @@ public abstract class IndexMergerBase implements IndexMerger
// use merger descriptor, merger either has values or handles it own
null column storage details
columnDesc = merger.makeColumnDescriptor();
}
- makeColumn(segmentFileBuilder,
Projections.getProjectionSmooshFileName(spec.getSchema(), dimension),
columnDesc);
+ makeColumn(segmentFileBuilder,
Projections.getProjectionSegmentInternalFileName(spec.getSchema(), dimension),
columnDesc);
}
progress.stopSection(section2);
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
index 7d3661886a0..8c8a6d862d5 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
@@ -218,6 +218,7 @@ public class IndexMergerV10 extends IndexMergerBase
/************ Create Inverted Indexes and Finalize Build Columns
*************/
final String section = "build inverted index and columns";
progress.startSection(section);
+ v10Smoosher.startFileGroup(Projections.BASE_TABLE_PROJECTION_NAME);
makeTimeColumn(v10Smoosher, progress, timeWriter, indexSpec, basePrefix
+ ColumnHolder.TIME_COLUMN_NAME);
makeMetricsColumns(
v10Smoosher,
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java
index 5d5ac10e1d4..6d5aea47374 100644
---
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java
+++
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.file;
import org.apache.druid.segment.column.ColumnDescriptor;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@@ -45,6 +46,23 @@ public interface SegmentFileBuilder extends Closeable
*/
void addColumn(String name, ColumnDescriptor columnDescriptor);
+ /**
+ * Declare that subsequent writes belong to a named group of files that
should be stored together. This is a hint
+ * about physical layout, it does not constrain the names of files
subsequently added, and implementations are free
+ * to ignore it entirely (the default is a no-op for formats that don't
organize data into coarse-grained
+ * groupings). Projections are the primary caller today, but the mechanism
is generic, it's equally applicable to
+ * grouping internal metadata, data shared across columns, etc.
+ * <p>
+ * Callers should invoke this before writing each group's files; passing
{@code null} clears the current group.
+ * Callers should not invoke this while a writer returned by {@link
#addWithChannel} is still open (implementations
+ * may reject such calls).
+ *
+ * @see SegmentFileBuilderV10#startFileGroup(String) for the V10 semantics
+ */
+ default void startFileGroup(@Nullable String groupName)
+ {
+ }
+
/**
* Add a {@link File} to the segment file as the specified name
*/
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
index 6986a2759fe..0557fc2a4bd 100644
---
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
+++
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
@@ -23,13 +23,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Ints;
import org.apache.druid.error.DruidException;
import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.projections.ProjectionMetadata;
+import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.io.File;
@@ -39,20 +45,41 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.TreeMap;
/**
- * {@link SegmentFileBuilder} for V10 format segments. Right now, this uses a
{@link FileSmoosher} underneath to build
- * V9 smoosh files and collect the metadata about the offsets in those
containers, and then appends them into the V10
- * consolidated segment file after the header and {@link SegmentFileMetadata}
is written.
+ * {@link SegmentFileBuilder} for V10 format segments. Files are written into
'container' chunk files in {@link #baseDir}
+ * and are concatenated after the header and {@link SegmentFileMetadata} on
{@link #close()} to produce the final
+ * consolidated segment file.
* <p>
* V10 file format:
* | version (byte) | meta compression (byte) | meta length (int) | meta json
| container 0 | ... | container n |
+ * <p>
+ * Containers are scoped to at most one declared file group. Callers declare
which group they are writing via
+ * {@link #startFileGroup(String)} before writing its files; a new container
is started when the declared group
+ * changes or the current container would exceed {@link #maxContainerSize}. A
group whose total size exceeds the max
+ * container size spans multiple containers, all tagged with the same group.
This gives readers a clean 1:1 (or 1:N)
+ * mapping between groups and containers, which supports per-group partial
loading without any read-side reorganization.
+ * Projections are the primary caller today, but the mechanism is equally
usable for other organizational needs
+ * (shared data across columns, internal metadata, etc.).
+ * <p>
+ * Callers that never invoke {@link #startFileGroup(String)} are mapped to a
null-group container.
+ * <p>
+ * Much of the logic here was ported from {@link
org.apache.druid.java.util.common.io.smoosh.FileSmoosher} of the V9
+ * format and there is a fair bit of overlap. In fact, the initial
implementation of this class wrapped a V9 smoosher
+ * to build the files before combining them into the V10 format. The main
difference is that V9 fills each container to
+ * the max while here we organize with file groups.
*/
public class SegmentFileBuilderV10 implements SegmentFileBuilder
{
+ private static final Logger LOG = new Logger(SegmentFileBuilderV10.class);
+
public static SegmentFileBuilderV10 create(ObjectMapper jsonMapper, File
baseDir)
{
return create(jsonMapper, baseDir, CompressionStrategy.NONE);
@@ -72,12 +99,32 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
private final ObjectMapper jsonMapper;
private final String outputFileName;
private final File baseDir;
- private final long maxChunkSize;
+ private final long maxContainerSize;
private final CompressionStrategy metadataCompression;
- private final FileSmoosher smoosher;
private final Map<String, SegmentFileBuilderV10> externalSegmentFileBuilders;
private final Map<String, ColumnDescriptor> columns = new TreeMap<>();
+ private final List<ContainerWriter> containers = new ArrayList<>();
+ private final Map<String, SegmentInternalFileMetadata> internalFiles = new
TreeMap<>();
+
+ // Nested addWithChannel calls (for example a serializer that, while being
written, emits sub-files for its own
+ // columnar parts) can't write into the current container concurrently with
the outer writer. These nested writes are
+ // redirected to temporary files and merged back into container(s) once the
outer writer completes. Each entry
+ // carries the file group that was active when the delegate was created so
that the merge routes it into the
+ // correct container even if the active group has since changed.
+ private final List<DelegateEntry> completedDelegates = new ArrayList<>();
+ private final List<DelegateEntry> inProgressDelegates = new ArrayList<>();
+ private long delegateFileCounter = 0;
+
+ @Nullable
+ private ContainerWriter currentContainer = null;
+ private boolean writerCurrentlyInUse = false;
+ // The file group declared by the most recent {@link #startFileGroup} call.
Writes are routed into containers
+ // tagged with this group. Remains {@code null} if the caller never declares
one, in which case all writes share
+ // a single null-group container.
+ @Nullable
+ private String currentFileGroup = null;
+
@Nullable
private String interval = null;
@Nullable
@@ -89,35 +136,141 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
ObjectMapper jsonMapper,
String outputFileName,
File baseDir,
- long maxChunkSize,
+ long maxContainerSize,
CompressionStrategy metadataCompression
)
{
this.jsonMapper = jsonMapper;
this.outputFileName = outputFileName;
this.baseDir = baseDir;
- this.maxChunkSize = maxChunkSize;
+ this.maxContainerSize = maxContainerSize;
this.metadataCompression = metadataCompression;
- this.smoosher = new FileSmoosher(baseDir, Ints.checkedCast(maxChunkSize),
outputFileName);
this.externalSegmentFileBuilders = new TreeMap<>();
}
@Override
public void add(String name, File fileToAdd) throws IOException
{
- smoosher.add(name, fileToAdd);
+ try (FileInputStream fis = new FileInputStream(fileToAdd);
+ FileChannel src = fis.getChannel()) {
+ final long size = src.size();
+ try (SegmentFileChannel out = addWithChannel(name, size)) {
+ long position = 0;
+ while (position < size) {
+ final long transferred = src.transferTo(position, size - position,
out);
+ if (transferred <= 0) {
+ throw new IOE("Unable to transfer bytes from file[%s] at
position[%,d]", fileToAdd, position);
+ }
+ position += transferred;
+ }
+ }
+ }
}
@Override
public void add(String name, ByteBuffer bufferToAdd) throws IOException
{
- smoosher.add(name, bufferToAdd);
+ try (SegmentFileChannel out = addWithChannel(name,
bufferToAdd.remaining())) {
+ out.write(bufferToAdd);
+ }
}
@Override
- public SegmentFileChannel addWithChannel(String name, long size) throws
IOException
+ public SegmentFileChannel addWithChannel(final String name, final long size)
throws IOException
{
- return smoosher.addWithChannel(name, size);
+ if (name.contains(",")) {
+ throw new IAE("Cannot have a comma in the name of a file, got[%s].",
name);
+ }
+ if (internalFiles.containsKey(name)) {
+ throw new IAE("Cannot add files of the same name, already have [%s]",
name);
+ }
+ if (size > maxContainerSize) {
+ throw DruidException.forPersona(DruidException.Persona.ADMIN)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(
+ "Serialized buffer size[%,d] for column[%s]
exceeds the maximum[%,d]. "
+ + "Consider adjusting the tuningConfig - for
example, reduce maxRowsPerSegment, "
+ + "or partition your data further.",
+ size, name, maxContainerSize
+ );
+ }
+
+ // If an outer writer is mid-write we can't append to the current
container concurrently, route through a temp
+ // file that will be merged back into a container once the outer writer
releases.
+ if (writerCurrentlyInUse) {
+ return delegateChannel(name, size);
+ }
+
+ ensureContainer(currentFileGroup, size);
+ final ContainerWriter target = currentContainer;
+ final long startOffset = target.currOffset;
+ writerCurrentlyInUse = true;
+
+ return new SegmentFileChannel()
+ {
+ private boolean open = true;
+ private long bytesWritten = 0;
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ return Ints.checkedCast(verifySize(target.write(src)));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws
IOException
+ {
+ return verifySize(target.write(srcs, offset, length));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException
+ {
+ return verifySize(target.write(srcs));
+ }
+
+ private long verifySize(long bytesWrittenInChunk)
+ {
+ bytesWritten += bytesWrittenInChunk;
+
+ if (bytesWritten != target.currOffset - startOffset) {
+ throw new ISE("Perhaps there is some concurrent modification going
on?");
+ }
+ if (bytesWritten > size) {
+ throw new ISE("Wrote[%,d] bytes for something of size[%,d].
Liar!!!", bytesWritten, size);
+ }
+
+ return bytesWrittenInChunk;
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return open;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (!open) {
+ return;
+ }
+ open = false;
+ writerCurrentlyInUse = false;
+
+ if (bytesWritten != target.currOffset - startOffset) {
+ throw new ISE("Perhaps there is some concurrent modification going
on?");
+ }
+ if (bytesWritten != size) {
+ throw new IOE("Expected [%,d] bytes, only saw [%,d], potential
corruption?", size, bytesWritten);
+ }
+ internalFiles.put(
+ name,
+ new SegmentInternalFileMetadata(target.fileNum, startOffset,
target.currOffset - startOffset)
+ );
+ mergeDelegatedFiles();
+ }
+ };
}
@Override
@@ -125,7 +278,7 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
{
return externalSegmentFileBuilders.computeIfAbsent(
externalFile,
- (k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir,
maxChunkSize, metadataCompression)
+ (k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir,
maxContainerSize, metadataCompression)
);
}
@@ -135,6 +288,31 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
this.columns.put(name, columnDescriptor);
}
+ /**
+ * Declare the file group that subsequent writes belong to. Writes are
routed into a container tagged with the
+ * declared group; a new container is rolled when the group changes or the
incoming file won't fit. A group whose
+ * total size exceeds {@link #maxContainerSize} is split across multiple
consecutive containers, all tagged with
+ * the same group. Passing {@code null} clears the current group; subsequent
writes are then routed into a
+ * null-group container until the next call.
+ * <p>
+ * Current V10-specific limitations worth knowing:
+ * <ul>
+ * <li>Groups cannot be re-entered. Once a different group (or {@code
null}) has been declared, the previous
+ * group's container is closed, and you cannot go back and append more
files to it, any such writes would
+ * open a fresh container for the re-declared group, so the group's
files would end up in non-contiguous
+ * containers. If all of a group's files must land in the same
container(s), write them contiguously.</li>
+ * <li>Throws if called while a writer returned by {@link #addWithChannel}
is still open.</li>
+ * </ul>
+ */
+ @Override
+ public void startFileGroup(@Nullable String groupName)
+ {
+ if (writerCurrentlyInUse) {
+ throw DruidException.defensive("Cannot start file group[%s] while a
writer is in progress", groupName);
+ }
+ this.currentFileGroup = groupName;
+ }
+
public void addInterval(String interval)
{
this.interval = interval;
@@ -153,7 +331,9 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
@Override
public void abort()
{
- smoosher.abort();
+ if (currentContainer != null) {
+ CloseableUtils.closeAndWrapExceptions(currentContainer);
+ }
}
@Override
@@ -163,11 +343,22 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
externalBuilder.close();
}
- smoosher.close();
+ if (!completedDelegates.isEmpty() || !inProgressDelegates.isEmpty()) {
+ abort();
+ throw new ISE(
+ "[%d] writers in progress and [%d] completed writers needs to be
closed before closing builder.",
+ inProgressDelegates.size(),
+ completedDelegates.size()
+ );
+ }
- SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata(
- smoosher.getContainers(),
- smoosher.getInternalFiles(),
+ if (currentContainer != null) {
+ currentContainer.close();
+ }
+
+ final SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata(
+ buildContainerMetadata(),
+ internalFiles,
interval,
columns.isEmpty() ? null : columns,
projections,
@@ -222,7 +413,8 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
Channels.writeFully(channel, compressed);
}
- for (File f : smoosher.getOutFiles()) {
+ for (ContainerWriter container : containers) {
+ final File f = container.file;
try (FileInputStream fis = new FileInputStream(f)) {
byte[] buffer = new byte[4096];
int bytesRead;
@@ -230,13 +422,239 @@ public class SegmentFileBuilderV10 implements
SegmentFileBuilder
outputStream.write(buffer, 0, bytesRead);
}
}
- // delete all the old 00000.smoosh
+ // delete all the old container files
DruidException.conditionalDefensive(
f.delete(),
- "Failed to delete temporary file[%s]",
+ "Failed to delete temporary container file[%s]",
f
);
}
}
}
+
+ private List<SegmentFileContainerMetadata> buildContainerMetadata()
+ {
+ final List<SegmentFileContainerMetadata> result = new
ArrayList<>(containers.size());
+ long offset = 0;
+ for (ContainerWriter container : containers) {
+ final long length = container.file.length();
+ result.add(new SegmentFileContainerMetadata(offset, length));
+ offset += length;
+ }
+ return result;
+ }
+
+ /**
+ * Ensure that {@link #currentContainer} is ready to accept {@code size}
bytes of a file belonging to {@code group}.
+ * Rolls the current container and starts a new one when:
+ * <ul>
+ * <li>there is no current container, or</li>
+ * <li>the current container is for a different group, or</li>
+ * <li>the current container cannot fit the incoming bytes within {@link
#maxContainerSize}.</li>
+ * </ul>
+ */
+ private void ensureContainer(@Nullable String group, long size) throws
IOException
+ {
+ if (currentContainer == null
+ || !Objects.equals(currentContainer.group, group)
+ || !currentContainer.canFit(size)) {
+ if (currentContainer != null) {
+ currentContainer.close();
+ }
+ currentContainer = openNewContainer(group);
+ containers.add(currentContainer);
+ }
+ }
+
+ private ContainerWriter openNewContainer(@Nullable String group) throws
IOException
+ {
+ FileUtils.mkdirp(baseDir);
+ final int fileNum = containers.size();
+ final File containerFile = new File(
+ baseDir,
+ StringUtils.format("%s-%05d.container", outputFileName, fileNum)
+ );
+ return new ContainerWriter(fileNum, containerFile, group,
maxContainerSize);
+ }
+
+ private SegmentFileChannel delegateChannel(final String name, final long
size) throws IOException
+ {
+ // Prefixed with outputFileName so delegate files from a main builder and
its externals (which share baseDir)
+ // cannot collide, since main and external always have distinct output
file names.
+ final String delegateName = StringUtils.format("%s-delegate-%d",
outputFileName, delegateFileCounter++);
+ final File tmpFile = new File(baseDir, delegateName);
+ // Snapshot the active group now so that if this delegate is merged after
the outer writer has advanced past
+ // the group it was created under, it still routes into the correct
container.
+ final DelegateEntry entry = new DelegateEntry(tmpFile, name,
currentFileGroup);
+ inProgressDelegates.add(entry);
+
+ return new SegmentFileChannel()
+ {
+ private final FileChannel channel = FileChannel.open(
+ tmpFile.toPath(),
+ StandardOpenOption.WRITE,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING
+ );
+
+ private long bytesWritten = 0;
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ return Ints.checkedCast(addBytes(channel.write(src)));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws
IOException
+ {
+ return addBytes(channel.write(srcs, offset, length));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException
+ {
+ return addBytes(channel.write(srcs));
+ }
+
+ private long addBytes(long n)
+ {
+ if (n > size - bytesWritten) {
+ throw new ISE(
+ "Wrote more bytes[%,d] than expected[%,d] for delegated
file[%s]",
+ bytesWritten + n, size, name
+ );
+ }
+ bytesWritten += n;
+ return n;
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return channel.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ channel.close();
+ completedDelegates.add(entry);
+ inProgressDelegates.remove(entry);
+ if (!writerCurrentlyInUse) {
+ mergeDelegatedFiles();
+ }
+ }
+ };
+ }
+
+ /**
+ * Move completed delegate temp files into containers by replaying them as
regular {@link #add} calls. Only called
+ * when no outer writer is currently holding the builder. Each entry's
snapshotted group is restored as
+ * {@link #currentFileGroup} during its replay so the file lands in the
container that was active when the
+ * nested write was originally requested, not whichever group happens to be
active at merge time.
+ */
+ private void mergeDelegatedFiles() throws IOException
+ {
+ if (completedDelegates.isEmpty()) {
+ return;
+ }
+ final List<DelegateEntry> toProcess = new ArrayList<>(completedDelegates);
+ completedDelegates.clear();
+ final String savedGroup = currentFileGroup;
+ try {
+ for (DelegateEntry entry : toProcess) {
+ currentFileGroup = entry.group;
+ add(entry.name, entry.file);
+ if (!entry.file.delete()) {
+ LOG.warn("Unable to delete delegate file[%s]", entry.file);
+ }
+ }
+ }
+ finally {
+ currentFileGroup = savedGroup;
+ }
+ }
+
+ private record DelegateEntry(File file, String name, @Nullable String group)
+ {
+ }
+
+ /**
+ * Low-level writer for a single container chunk file. One container holds
internal files from at most one group.
+ */
+ private static class ContainerWriter implements GatheringByteChannel
+ {
+ private final int fileNum;
+ private final File file;
+ @Nullable
+ private final String group;
+ private final long maxSize;
+ private final Closer closer = Closer.create();
+ private final GatheringByteChannel channel;
+ private long currOffset = 0;
+
+ ContainerWriter(int fileNum, File file, @Nullable String group, long
maxSize) throws IOException
+ {
+ this.fileNum = fileNum;
+ this.file = file;
+ this.group = group;
+ this.maxSize = maxSize;
+ final FileOutputStream outStream = closer.register(new
FileOutputStream(file));
+ this.channel = closer.register(outStream.getChannel());
+ }
+
+ boolean canFit(long size)
+ {
+ // overflow-safe form of currOffset + size <= maxSize for non-negative
currOffset/size/maxSize
+ return size <= maxSize - currOffset;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ return Ints.checkedCast(recordWrite(channel.write(src)));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws
IOException
+ {
+ return recordWrite(channel.write(srcs, offset, length));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException
+ {
+ return recordWrite(channel.write(srcs));
+ }
+
+ private long recordWrite(long n)
+ {
+ if (n > maxSize - currOffset) {
+ throw new ISE("Wrote more bytes[%,d] than available[%,d]", n, maxSize
- currOffset);
+ }
+ currOffset += n;
+ return n;
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return channel.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ closer.close();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Created container file[%s] for group[%s] of size[%,d] bytes.",
+ file.getAbsolutePath(),
+ group,
+ file.length()
+ );
+ }
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index 0415d540e24..c81549f41c2 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -57,6 +57,17 @@ public class Projections
private static final ConcurrentHashMap<byte[], Boolean> PERIOD_GRAN_CACHE =
new ConcurrentHashMap<>();
+ public static String validateProjectionName(@Nullable String name)
+ {
+ if (name == null || name.isEmpty()) {
+ throw InvalidInput.exception("projection name cannot be null or empty");
+ }
+ if (name.startsWith("__")) {
+ throw InvalidInput.exception("projection cannot use reserved name[%s]",
BASE_TABLE_PROJECTION_NAME);
+ }
+ return name;
+ }
+
@Nullable
public static <T> QueryableProjection<T> findMatchingProjection(
CursorBuildSpec cursorBuildSpec,
@@ -509,12 +520,12 @@ public class Projections
return projectionSpec.getSchema().getName() + "/";
}
- public static String getProjectionSmooshFileName(ProjectionSchema schema,
String columnName)
+ public static String getProjectionSegmentInternalFileName(ProjectionSchema
schema, String columnName)
{
- return getProjectionSmooshPrefix(schema) + columnName;
+ return getProjectionSegmentInternalFilePrefix(schema) + columnName;
}
- public static String getProjectionSmooshPrefix(ProjectionSchema
projectionSchema)
+ public static String getProjectionSegmentInternalFilePrefix(ProjectionSchema
projectionSchema)
{
return projectionSchema.getName() + "/";
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java
b/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java
new file mode 100644
index 00000000000..89c232973cc
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Files;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.TestHelper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+class SegmentFileBuilderV10Test
+{
+ private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
+
+ @TempDir
+ File tempDir;
+
+ @Test
+ void testOneContainerPerProjection() throws IOException
+ {
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ // matches the production usage pattern in IndexMergerV10: call
startFileGroup then write that projection's
+ // columns, then move on to the next projection.
+ final String[] projections = {"__base", "projA", "projB"};
+ final int colCount = 3;
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ for (String projection : projections) {
+ builder.startFileGroup(projection);
+ for (int col = 0; col < colCount; col++) {
+ final String name = projection + "/col" + col;
+ final File tmpFile = new File(tempDir,
StringUtils.format("%s-%s.bin", projection, col));
+ Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+ builder.add(name, tmpFile);
+ }
+ }
+ }
+
+ final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+ try (SegmentFileMapperV10 mapper =
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) {
+ final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata();
+
+ Assertions.assertEquals(projections.length,
metadata.getContainers().size());
+ Assertions.assertEquals(projections.length * 3,
metadata.getFiles().size());
+ assertNoContainerMixesProjections(metadata);
+
+ assertColumns(projections, colCount, mapper);
+ }
+ }
+
+ @Test
+ void testProjectionNameWithSlashRoutesCorrectly() throws IOException
+ {
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ final String slashyProjection = "nested/projection";
+ final int colCount = 3;
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ builder.startFileGroup("__base");
+ for (int col = 0; col < colCount; col++) {
+ final String name = "__base/col" + col;
+ final File tmpFile = new File(tempDir,
StringUtils.format("base-%s.bin", col));
+ Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+ builder.add(name, tmpFile);
+ }
+ builder.startFileGroup(slashyProjection);
+ for (int col = 0; col < colCount; col++) {
+ final String name = slashyProjection + "/col" + col;
+ final File tmpFile = new File(tempDir,
StringUtils.format("slashy-%s.bin", col));
+ Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+ builder.add(name, tmpFile);
+ }
+ }
+
+ final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+ try (SegmentFileMapperV10 mapper =
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) {
+ final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata();
+ // 2 projections, 2 containers, even though the slashy name's first '/'
would have parsed as projection "nested"
+ Assertions.assertEquals(2, metadata.getContainers().size());
+ Assertions.assertEquals(2 * colCount, metadata.getFiles().size());
+
+ // round-trip both sets of files
+ for (int col = 0; col < colCount; col++) {
+ final String baseName = "__base/col" + col;
+ final ByteBuffer baseBuf = mapper.mapFile(baseName);
+ Assertions.assertNotNull(baseBuf, baseName);
+ Assertions.assertEquals(baseName.hashCode(), baseBuf.getInt(),
baseName);
+
+ final String slashyName = slashyProjection + "/col" + col;
+ final ByteBuffer slashyBuf = mapper.mapFile(slashyName);
+ Assertions.assertNotNull(slashyBuf, slashyName);
+ Assertions.assertEquals(slashyName.hashCode(), slashyBuf.getInt(),
slashyName);
+ }
+ }
+ }
+
+ @Test
+ void testStartFileGroupWhileWriterInUseThrows() throws IOException
+ {
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ builder.startFileGroup("__base");
+ try (SegmentFileChannel outer = builder.addWithChannel("__base/col0",
4)) {
+ Assertions.assertThrows(RuntimeException.class, () ->
builder.startFileGroup("projA"));
+ outer.write(ByteBuffer.wrap(new byte[]{1, 2, 3, 4}));
+ }
+ }
+ }
+
+ @Test
+ void testExternalBuilderAlsoSplitsContainersByProjection() throws IOException
+ {
+ final String externalName = "external.segment";
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ final String[] mainProjections = {"__base", "projA", "projB"};
+ final String[] externalProjections = {"extProjX", "extProjY"};
+ final int colCount = 3;
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ for (String projection : mainProjections) {
+ builder.startFileGroup(projection);
+ for (int col = 0; col < colCount; col++) {
+ final String name = projection + "/col" + col;
+ final File tmpFile = new File(tempDir,
StringUtils.format("main-%s-%s.bin", projection, col));
+ Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+ builder.add(name, tmpFile);
+ }
+ }
+
+ // getExternalBuilder returns the SegmentFileBuilder interface but under
the hood produces an independent V10
+ // sub-file with its own header + containers. Projection-per-container
splitting must apply there too.
+ final SegmentFileBuilder external =
builder.getExternalBuilder(externalName);
+ for (String projection : externalProjections) {
+ external.startFileGroup(projection);
+ for (int col = 0; col < colCount; col++) {
+ final String name = projection + "/col" + (col + 1000);
+ final File tmpFile = new File(tempDir,
StringUtils.format("ext-%s-%s.bin", projection, col));
+ Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+ external.add(name, tmpFile);
+ }
+ }
+ }
+
+ final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+ final File externalFile = new File(baseDir, externalName);
+ Assertions.assertTrue(segmentFile.exists(), "main v10 file missing");
+ Assertions.assertTrue(externalFile.exists(), "external v10 file missing");
+
+ // the external file on its own is a well-formed V10 sub-segment, load it
directly to check its container layout.
+ try (SegmentFileMapperV10 externalOnly =
SegmentFileMapperV10.create(externalFile, JSON_MAPPER)) {
+ final SegmentFileMetadata externalMetadata =
externalOnly.getSegmentFileMetadata();
+ Assertions.assertEquals(externalProjections.length,
externalMetadata.getContainers().size());
+ Assertions.assertEquals(externalProjections.length * colCount,
externalMetadata.getFiles().size());
+ assertNoContainerMixesProjections(externalMetadata);
+ }
+
+ // loaded together: main file checks its own containers and the external
is attached for mapExternalFile().
+ try (SegmentFileMapperV10 mapper =
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER, List.of(externalName))) {
+ final SegmentFileMetadata mainMetadata = mapper.getSegmentFileMetadata();
+ Assertions.assertEquals(mainProjections.length,
mainMetadata.getContainers().size());
+ Assertions.assertEquals(mainProjections.length * colCount,
mainMetadata.getFiles().size());
+ assertNoContainerMixesProjections(mainMetadata);
+
+ assertColumns(mainProjections, colCount, mapper);
+
+ for (String projection : externalProjections) {
+ for (int col = 0; col < colCount; col++) {
+ final String name = projection + "/col" + (col + 1000);
+ final ByteBuffer buf = mapper.mapExternalFile(externalName, name);
+ Assertions.assertNotNull(buf, name);
+ Assertions.assertEquals(name.hashCode(), buf.getInt(), name);
+ }
+ }
+ }
+ }
+
+ @Test
+ void testNestedAddWithChannelDelegatesPerBuilder() throws IOException
+ {
+ // exercises the delegate-temp-file path on both the main and external
builders: while an outer addWithChannel is
+ // mid-write on a builder, a nested addWithChannel on the same builder
must route through a temp file and then be
+ // merged back in at outer-close. Main and external each drive this
independently, and since they share baseDir,
+ // their delegate file names must not collide.
+ final String externalName = "external.segment";
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ final byte[] outerBytes = new byte[]{1, 2, 3, 4};
+ final byte[] nestedBytes = new byte[]{5, 6, 7, 8};
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ builder.startFileGroup("__base");
+ try (SegmentFileChannel outer = builder.addWithChannel("__base/outer",
outerBytes.length)) {
+ // nested write while outer is in use → forced into delegate temp file
+ try (SegmentFileChannel nested =
builder.addWithChannel("__base/nested", nestedBytes.length)) {
+ nested.write(ByteBuffer.wrap(nestedBytes));
+ }
+ outer.write(ByteBuffer.wrap(outerBytes));
+ }
+
+ final SegmentFileBuilder external =
builder.getExternalBuilder(externalName);
+ external.startFileGroup("extProj");
+ try (SegmentFileChannel extOuter =
external.addWithChannel("extProj/outer", outerBytes.length)) {
+ try (SegmentFileChannel extNested =
external.addWithChannel("extProj/nested", nestedBytes.length)) {
+ extNested.write(ByteBuffer.wrap(nestedBytes));
+ }
+ extOuter.write(ByteBuffer.wrap(outerBytes));
+ }
+ }
+
+ final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+ try (SegmentFileMapperV10 mapper =
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER, List.of(externalName))) {
+ assertBytes(mapper.mapFile("__base/outer"), outerBytes);
+ assertBytes(mapper.mapFile("__base/nested"), nestedBytes);
+ assertBytes(mapper.mapExternalFile(externalName, "extProj/outer"),
outerBytes);
+ assertBytes(mapper.mapExternalFile(externalName, "extProj/nested"),
nestedBytes);
+ }
+ }
+
+ @Test
+ void testNestedDelegateClosedAfterOuterRoutesToOriginalGroup() throws
IOException
+ {
+ // doing something like this is weird and probably should happen in
practice, but if a nested write was requested
+ // while file group "groupA" was active; even if the caller switches to
"groupB" before finally closing the nested
+ // channel, the delegated bytes must still land in groupA's container, not
groupB's. Otherwise the grouping breaks,
+ // and files from other groups end up in the same container.
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ final byte[] outerBytes = new byte[]{1, 2, 3, 4};
+ final byte[] nestedBytes = new byte[]{5, 6, 7, 8};
+ final byte[] groupBBytes = new byte[]{9, 10, 11, 12};
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ builder.startFileGroup("groupA");
+
+ final SegmentFileChannel outer = builder.addWithChannel("groupA/outer",
outerBytes.length);
+ final SegmentFileChannel nested =
builder.addWithChannel("groupA/nested", nestedBytes.length);
+ nested.write(ByteBuffer.wrap(nestedBytes));
+
+ // close the outer first so writerCurrentlyInUse clears while the nested
delegate is still open
+ outer.write(ByteBuffer.wrap(outerBytes));
+ outer.close();
+
+ // switch group before closing the still-open nested delegate; merge
must use the snapshotted "groupA"
+ builder.startFileGroup("groupB");
+ nested.close();
+
+ // and a real groupB file so we can verify groupB's container is
independent of the nested file
+ try (SegmentFileChannel groupBFile =
builder.addWithChannel("groupB/file", groupBBytes.length)) {
+ groupBFile.write(ByteBuffer.wrap(groupBBytes));
+ }
+ }
+
+ final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+ try (SegmentFileMapperV10 mapper =
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) {
+ final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata();
+
+ // the nested file was requested under groupA, so it must share groupA's
container with groupA/outer
+ // and must NOT be in groupB's container alongside groupB/file.
+ final int outerContainer =
metadata.getFiles().get("groupA/outer").getContainer();
+ final int nestedContainer =
metadata.getFiles().get("groupA/nested").getContainer();
+ final int groupBContainer =
metadata.getFiles().get("groupB/file").getContainer();
+ Assertions.assertEquals(outerContainer, nestedContainer, "nested
delegate landed in the wrong container");
+ Assertions.assertNotEquals(groupBContainer, nestedContainer, "nested
delegate leaked into groupB's container");
+
+ assertBytes(mapper.mapFile("groupA/outer"), outerBytes);
+ assertBytes(mapper.mapFile("groupA/nested"), nestedBytes);
+ assertBytes(mapper.mapFile("groupB/file"), groupBBytes);
+ }
+ }
+
+ @Test
+ void testUnprefixedFilesShareSingleContainer() throws IOException
+ {
+ final File baseDir = new File(tempDir, "base_" +
ThreadLocalRandom.current().nextInt());
+ FileUtils.mkdirp(baseDir);
+
+ try (SegmentFileBuilderV10 builder =
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+ for (int i = 0; i < 5; ++i) {
+ final File tmpFile = new File(tempDir,
StringUtils.format("plain-%s.bin", i));
+ Files.write(Ints.toByteArray(i), tmpFile);
+ builder.add(String.valueOf(i), tmpFile);
+ }
+ }
+
+ final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+ try (SegmentFileMapperV10 mapper =
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) {
+ Assertions.assertEquals(1,
mapper.getSegmentFileMetadata().getContainers().size());
+ }
+ }
+
+ private static void assertBytes(ByteBuffer actual, byte[] expected)
+ {
+ Assertions.assertNotNull(actual);
+ Assertions.assertEquals(expected.length, actual.remaining());
+ final byte[] got = new byte[expected.length];
+ actual.get(got);
+ Assertions.assertArrayEquals(expected, got);
+ }
+
+ private static void assertNoContainerMixesProjections(SegmentFileMetadata
metadata)
+ {
+ for (int containerIdx = 0; containerIdx < metadata.getContainers().size();
containerIdx++) {
+ final Set<String> projectionsInContainer = new HashSet<>();
+ for (Map.Entry<String, SegmentInternalFileMetadata> entry :
metadata.getFiles().entrySet()) {
+ if (entry.getValue().getContainer() == containerIdx) {
+ final int slash = entry.getKey().indexOf('/');
+ projectionsInContainer.add(slash < 0 ? "" :
entry.getKey().substring(0, slash));
+ }
+ }
+ Assertions.assertEquals(
+ 1,
+ projectionsInContainer.size(),
+ "container[" + containerIdx + "] mixes projections: " +
projectionsInContainer
+ );
+ }
+ }
+
+ private static void assertColumns(String[] projections, int colCount,
SegmentFileMapperV10 mapper) throws IOException
+ {
+ for (String projection : projections) {
+ for (int col = 0; col < colCount; col++) {
+ final String name = projection + "/col" + col;
+ final ByteBuffer buf = mapper.mapFile(name);
+ Assertions.assertNotNull(buf, name);
+ Assertions.assertEquals(name.hashCode(), buf.getInt(), name);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]