This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d095672e26 feat: rework SegmentFileBuilderV10 to organize internal 
containers by projection to be friendly to partial loading/eviction (#19375)
3d095672e26 is described below

commit 3d095672e267fac6cba6c6b5feac118e0ea79674
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Apr 29 12:11:33 2026 -0700

    feat: rework SegmentFileBuilderV10 to organize internal containers by 
projection to be friendly to partial loading/eviction (#19375)
---
 .../data/input/impl/AggregateProjectionSpec.java   |   6 +-
 .../java/util/common/io/smoosh/FileSmoosher.java   |  97 +----
 .../java/org/apache/druid/segment/IndexIO.java     |   2 +-
 .../org/apache/druid/segment/IndexMergerBase.java  |  11 +-
 .../org/apache/druid/segment/IndexMergerV10.java   |   1 +
 .../druid/segment/file/SegmentFileBuilder.java     |  18 +
 .../druid/segment/file/SegmentFileBuilderV10.java  | 462 ++++++++++++++++++++-
 .../druid/segment/projections/Projections.java     |  17 +-
 .../segment/file/SegmentFileBuilderV10Test.java    | 368 ++++++++++++++++
 9 files changed, 869 insertions(+), 113 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
index 778a6ae734f..a19a5e1a6e6 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
@@ -39,6 +39,7 @@ import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.projections.AggregateProjectionSchema;
+import org.apache.druid.segment.projections.Projections;
 import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTimeZone;
 
@@ -99,10 +100,7 @@ public class AggregateProjectionSpec
       @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators
   )
   {
-    if (name == null || name.isEmpty()) {
-      throw InvalidInput.exception("projection name cannot be null or empty");
-    }
-    this.name = name;
+    this.name = Projections.validateProjectionName(name);
     if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null 
|| aggregators.length == 0)) {
       throw InvalidInput.exception(
           "projection[%s] groupingColumns and aggregators must not both be 
null or empty",
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
index dd3842f1011..f6166e65648 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java
@@ -35,11 +35,8 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.column.ColumnDescriptor;
 import org.apache.druid.segment.file.SegmentFileBuilder;
 import org.apache.druid.segment.file.SegmentFileChannel;
-import org.apache.druid.segment.file.SegmentFileContainerMetadata;
-import org.apache.druid.segment.file.SegmentInternalFileMetadata;
 import org.apache.druid.utils.CloseableUtils;
 
-import javax.annotation.Nullable;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -99,35 +96,20 @@ public class FileSmoosher implements SegmentFileBuilder
   private Outer currOut = null;
   private boolean writerCurrentlyInUse = false;
 
-  // helper for SegmentFileBuilderV10 to have control over naming of smoosh 
output files; if this is non-null
-  // meta.smoosh is not written
-  @Nullable
-  private final String outputFileName;
-
   public FileSmoosher(
       File baseDir
   )
   {
-    this(baseDir, Integer.MAX_VALUE, null);
+    this(baseDir, Integer.MAX_VALUE);
   }
 
   public FileSmoosher(
       File baseDir,
       int maxChunkSize
   )
-  {
-    this(baseDir, maxChunkSize, null);
-  }
-
-  public FileSmoosher(
-      File baseDir,
-      int maxChunkSize,
-      @Nullable String outputFileName
-  )
   {
     this.baseDir = baseDir;
     this.maxChunkSize = maxChunkSize;
-    this.outputFileName = outputFileName;
     this.delegateFileNameMap = new HashMap<>();
 
     Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a 
positive value.");
@@ -143,43 +125,6 @@ public class FileSmoosher implements SegmentFileBuilder
     return new File(baseDir, StringUtils.format("%05d.%s", i, FILE_EXTENSION));
   }
 
-  static File makeChunkFile(File baseDir, String prefix, int i)
-  {
-    return new File(baseDir, StringUtils.format("%s-%05d.%s", prefix, i, 
FILE_EXTENSION));
-  }
-
-  public List<File> getOutFiles()
-  {
-    return outFiles;
-  }
-
-  public List<SegmentFileContainerMetadata> getContainers()
-  {
-    List<SegmentFileContainerMetadata> smooshContainers = new ArrayList<>();
-    long offset = 0;
-    for (File f : outFiles) {
-      smooshContainers.add(new SegmentFileContainerMetadata(offset, 
f.length()));
-      offset += f.length();
-    }
-    return smooshContainers;
-  }
-
-  public Map<String, SegmentInternalFileMetadata> getInternalFiles()
-  {
-    Map<String, SegmentInternalFileMetadata> smooshFileMetadata = new 
TreeMap<>();
-    for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
-      smooshFileMetadata.put(
-          entry.getKey(),
-          new SegmentInternalFileMetadata(
-              entry.getValue().getFileNum(),
-              entry.getValue().getStartOffset(),
-              entry.getValue().getEndOffset() - 
entry.getValue().getStartOffset()
-          )
-      );
-    }
-    return smooshFileMetadata;
-  }
-
   @Override
   public void addColumn(String name, ColumnDescriptor columnDescriptor)
   {
@@ -456,26 +401,24 @@ public class FileSmoosher implements SegmentFileBuilder
       currOut.close();
     }
 
-    if (outputFileName == null) {
-      File metaFile = metaFile(baseDir);
-
-      try (Writer out =
-               new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(metaFile), StandardCharsets.UTF_8))) {
-        out.write(StringUtils.format("v1,%d,%d", maxChunkSize, 
outFiles.size()));
+    File metaFile = metaFile(baseDir);
+
+    try (Writer out =
+             new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(metaFile), StandardCharsets.UTF_8))) {
+      out.write(StringUtils.format("v1,%d,%d", maxChunkSize, outFiles.size()));
+      out.write("\n");
+
+      for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
+        final Metadata metadata = entry.getValue();
+        out.write(
+            JOINER.join(
+                entry.getKey(),
+                metadata.getFileNum(),
+                metadata.getStartOffset(),
+                metadata.getEndOffset()
+            )
+        );
         out.write("\n");
-
-        for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
-          final Metadata metadata = entry.getValue();
-          out.write(
-              JOINER.join(
-                  entry.getKey(),
-                  metadata.getFileNum(),
-                  metadata.getStartOffset(),
-                  metadata.getEndOffset()
-              )
-          );
-          out.write("\n");
-        }
       }
     }
   }
@@ -483,9 +426,7 @@ public class FileSmoosher implements SegmentFileBuilder
   private Outer getNewCurrOut() throws FileNotFoundException
   {
     final int fileNum = outFiles.size();
-    File outFile = outputFileName != null
-                   ? makeChunkFile(baseDir, outputFileName, fileNum)
-                   : makeChunkFile(baseDir, fileNum);
+    File outFile = makeChunkFile(baseDir, fileNum);
     outFiles.add(outFile);
     return new Outer(fileNum, outFile, maxChunkSize);
   }
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java 
b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index e5acd4f73fb..f246421f841 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -1039,7 +1039,7 @@ public class IndexIO
       final Map<String, Supplier<BaseColumnHolder>> projectionColumns = new 
LinkedHashMap<>();
 
       for (String column : projectionSpec.getSchema().getColumnNames()) {
-        final String smooshName = 
Projections.getProjectionSmooshFileName(projectionSpec.getSchema(), column);
+        final String smooshName = 
Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), 
column);
         final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName);
         final ColumnDescriptor columnDescriptor = 
metadata.getColumnDescriptors().get(smooshName);
         if (columnDescriptor == null) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
index 2f311015359..06cde93fa3d 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
@@ -512,7 +512,7 @@ public abstract class IndexMergerBase implements IndexMerger
         columnFormats.put(dimension, dimensionFormat);
         DimensionHandler handler = dimensionFormat.getColumnHandler(dimension);
         DimensionMergerV9 merger = handler.makeMerger(
-            Projections.getProjectionSmooshFileName(spec.getSchema(), 
dimension),
+            Projections.getProjectionSegmentInternalFileName(spec.getSchema(), 
dimension),
             indexSpec,
             segmentWriteOutMedium,
             dimensionFormat.toColumnCapabilities(),
@@ -543,7 +543,7 @@ public abstract class IndexMergerBase implements IndexMerger
               metrics,
               columnFormats,
               indexSpec,
-              Projections.getProjectionSmooshPrefix(spec.getSchema())
+              
Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema())
           );
 
       Function<List<TransformableRowIterator>, TimeAndDimsIterator> 
rowMergerFn =
@@ -630,13 +630,14 @@ public abstract class IndexMergerBase implements 
IndexMerger
 
       final String section2 = "build projection[" + projectionSchema.getName() 
+ "] inverted index and columns";
       progress.startSection(section2);
+      segmentFileBuilder.startFileGroup(projectionSchema.getName());
       if (projectionSchema.getTimeColumnName() != null) {
         makeTimeColumn(
             segmentFileBuilder,
             progress,
             timeWriter,
             indexSpec,
-            Projections.getProjectionSmooshFileName(spec.getSchema(), 
projectionSchema.getTimeColumnName())
+            Projections.getProjectionSegmentInternalFileName(spec.getSchema(), 
projectionSchema.getTimeColumnName())
         );
       }
       makeMetricsColumns(
@@ -646,7 +647,7 @@ public abstract class IndexMergerBase implements IndexMerger
           columnFormats,
           metricWriters,
           indexSpec,
-          Projections.getProjectionSmooshPrefix(spec.getSchema())
+          Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema())
       );
 
       for (int i = 0; i < dimensions.size(); i++) {
@@ -664,7 +665,7 @@ public abstract class IndexMergerBase implements IndexMerger
           // use merger descriptor, merger either has values or handles it own 
null column storage details
           columnDesc = merger.makeColumnDescriptor();
         }
-        makeColumn(segmentFileBuilder, 
Projections.getProjectionSmooshFileName(spec.getSchema(), dimension), 
columnDesc);
+        makeColumn(segmentFileBuilder, 
Projections.getProjectionSegmentInternalFileName(spec.getSchema(), dimension), 
columnDesc);
       }
 
       progress.stopSection(section2);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
index 7d3661886a0..8c8a6d862d5 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
@@ -218,6 +218,7 @@ public class IndexMergerV10 extends IndexMergerBase
       /************ Create Inverted Indexes and Finalize Build Columns 
*************/
       final String section = "build inverted index and columns";
       progress.startSection(section);
+      v10Smoosher.startFileGroup(Projections.BASE_TABLE_PROJECTION_NAME);
       makeTimeColumn(v10Smoosher, progress, timeWriter, indexSpec, basePrefix 
+ ColumnHolder.TIME_COLUMN_NAME);
       makeMetricsColumns(
           v10Smoosher,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java
 
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java
index 5d5ac10e1d4..6d5aea47374 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.file;
 
 import org.apache.druid.segment.column.ColumnDescriptor;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -45,6 +46,23 @@ public interface SegmentFileBuilder extends Closeable
    */
   void addColumn(String name, ColumnDescriptor columnDescriptor);
 
+  /**
+   * Declare that subsequent writes belong to a named group of files that 
should be stored together. This is a hint
+   * about physical layout, it does not constrain the names of files 
subsequently added, and implementations are free
+   * to ignore it entirely (the default is a no-op for formats that don't 
organize data into coarse-grained
+   * groupings). Projections are the primary caller today, but the mechanism 
is generic, it's equally applicable to
+   * grouping internal metadata, data shared across columns, etc.
+   * <p>
+   * Callers should invoke this before writing each group's files; passing 
{@code null} clears the current group.
+   * Callers should not invoke this while a writer returned by {@link 
#addWithChannel} is still open (implementations
+   * may reject such calls).
+   *
+   * @see SegmentFileBuilderV10#startFileGroup(String) for the V10 semantics
+   */
+  default void startFileGroup(@Nullable String groupName)
+  {
+  }
+
   /**
    * Add a {@link File} to the segment file as the specified name
    */
diff --git 
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
 
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
index 6986a2759fe..0557fc2a4bd 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java
@@ -23,13 +23,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.primitives.Ints;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.column.ColumnDescriptor;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.projections.ProjectionMetadata;
+import org.apache.druid.utils.CloseableUtils;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -39,20 +45,41 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.TreeMap;
 
 /**
- * {@link SegmentFileBuilder} for V10 format segments. Right now, this uses a 
{@link FileSmoosher} underneath to build
- * V9 smoosh files and collect the metadata about the offsets in those 
containers, and then appends them into the V10
- * consolidated segment file after the header and {@link SegmentFileMetadata} 
is written.
+ * {@link SegmentFileBuilder} for V10 format segments. Files are written into 
'container' chunk files in {@link #baseDir}
+ * and are concatenated after the header and {@link SegmentFileMetadata} on 
{@link #close()} to produce the final
+ * consolidated segment file.
  * <p>
  * V10 file format:
  * | version (byte) | meta compression (byte) | meta length (int) | meta json 
| container 0 | ... | container n |
+ * <p>
+ * Containers are scoped to at most one declared file group. Callers declare 
which group they are writing via
+ * {@link #startFileGroup(String)} before writing its files; a new container 
is started when the declared group
+ * changes or the current container would exceed {@link #maxContainerSize}. A 
group whose total size exceeds the max
+ * container size spans multiple containers, all tagged with the same group. 
This gives readers a clean 1:1 (or 1:N)
+ * mapping between groups and containers, which supports per-group partial 
loading without any read-side reorganization.
+ * Projections are the primary caller today, but the mechanism is equally 
usable for other organizational needs
+ * (shared data across columns, internal metadata, etc.).
+ * <p>
+ * Callers that never invoke {@link #startFileGroup(String)} are mapped to a 
null-group container.
+ * <p>
+ * Much of the logic here was ported from {@link 
org.apache.druid.java.util.common.io.smoosh.FileSmoosher} of the V9
+ * format and there is a fair bit of overlap. In fact, the initial 
implementation of this class wrapped a V9 smoosher
+ * to build the files before combining them into the V10 format. The main 
difference is that V9 fills each container to
+ * the max while here we organize with file groups.
  */
 public class SegmentFileBuilderV10 implements SegmentFileBuilder
 {
+  private static final Logger LOG = new Logger(SegmentFileBuilderV10.class);
+
   public static SegmentFileBuilderV10 create(ObjectMapper jsonMapper, File 
baseDir)
   {
     return create(jsonMapper, baseDir, CompressionStrategy.NONE);
@@ -72,12 +99,32 @@ public class SegmentFileBuilderV10 implements 
SegmentFileBuilder
   private final ObjectMapper jsonMapper;
   private final String outputFileName;
   private final File baseDir;
-  private final long maxChunkSize;
+  private final long maxContainerSize;
   private final CompressionStrategy metadataCompression;
-  private final FileSmoosher smoosher;
   private final Map<String, SegmentFileBuilderV10> externalSegmentFileBuilders;
   private final Map<String, ColumnDescriptor> columns = new TreeMap<>();
 
+  private final List<ContainerWriter> containers = new ArrayList<>();
+  private final Map<String, SegmentInternalFileMetadata> internalFiles = new 
TreeMap<>();
+
+  // Nested addWithChannel calls (for example a serializer that, while being 
written, emits sub-files for its own
+  // columnar parts) can't write into the current container concurrently with 
the outer writer. These nested writes are
+  // redirected to temporary files and merged back into container(s) once the 
outer writer completes. Each entry
+  // carries the file group that was active when the delegate was created so 
that the merge routes it into the
+  // correct container even if the active group has since changed.
+  private final List<DelegateEntry> completedDelegates = new ArrayList<>();
+  private final List<DelegateEntry> inProgressDelegates = new ArrayList<>();
+  private long delegateFileCounter = 0;
+
+  @Nullable
+  private ContainerWriter currentContainer = null;
+  private boolean writerCurrentlyInUse = false;
+  // The file group declared by the most recent {@link #startFileGroup} call. 
Writes are routed into containers
+  // tagged with this group. Remains {@code null} if the caller never declares 
one, in which case all writes share
+  // a single null-group container.
+  @Nullable
+  private String currentFileGroup = null;
+
   @Nullable
   private String interval = null;
   @Nullable
@@ -89,35 +136,141 @@ public class SegmentFileBuilderV10 implements 
SegmentFileBuilder
       ObjectMapper jsonMapper,
       String outputFileName,
       File baseDir,
-      long maxChunkSize,
+      long maxContainerSize,
       CompressionStrategy metadataCompression
   )
   {
     this.jsonMapper = jsonMapper;
     this.outputFileName = outputFileName;
     this.baseDir = baseDir;
-    this.maxChunkSize = maxChunkSize;
+    this.maxContainerSize = maxContainerSize;
     this.metadataCompression = metadataCompression;
-    this.smoosher = new FileSmoosher(baseDir, Ints.checkedCast(maxChunkSize), 
outputFileName);
     this.externalSegmentFileBuilders = new TreeMap<>();
   }
 
   @Override
   public void add(String name, File fileToAdd) throws IOException
   {
-    smoosher.add(name, fileToAdd);
+    try (FileInputStream fis = new FileInputStream(fileToAdd);
+         FileChannel src = fis.getChannel()) {
+      final long size = src.size();
+      try (SegmentFileChannel out = addWithChannel(name, size)) {
+        long position = 0;
+        while (position < size) {
+          final long transferred = src.transferTo(position, size - position, 
out);
+          if (transferred <= 0) {
+            throw new IOE("Unable to transfer bytes from file[%s] at 
position[%,d]", fileToAdd, position);
+          }
+          position += transferred;
+        }
+      }
+    }
   }
 
   @Override
   public void add(String name, ByteBuffer bufferToAdd) throws IOException
   {
-    smoosher.add(name, bufferToAdd);
+    try (SegmentFileChannel out = addWithChannel(name, 
bufferToAdd.remaining())) {
+      out.write(bufferToAdd);
+    }
   }
 
   @Override
-  public SegmentFileChannel addWithChannel(String name, long size) throws 
IOException
+  public SegmentFileChannel addWithChannel(final String name, final long size) 
throws IOException
   {
-    return smoosher.addWithChannel(name, size);
+    if (name.contains(",")) {
+      throw new IAE("Cannot have a comma in the name of a file, got[%s].", 
name);
+    }
+    if (internalFiles.containsKey(name)) {
+      throw new IAE("Cannot add files of the same name, already have [%s]", 
name);
+    }
+    if (size > maxContainerSize) {
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build(
+                              "Serialized buffer size[%,d] for column[%s] 
exceeds the maximum[%,d]. "
+                              + "Consider adjusting the tuningConfig - for 
example, reduce maxRowsPerSegment, "
+                              + "or partition your data further.",
+                              size, name, maxContainerSize
+                          );
+    }
+
+    // If an outer writer is mid-write we can't append to the current 
container concurrently, route through a temp
+    // file that will be merged back into a container once the outer writer 
releases.
+    if (writerCurrentlyInUse) {
+      return delegateChannel(name, size);
+    }
+
+    ensureContainer(currentFileGroup, size);
+    final ContainerWriter target = currentContainer;
+    final long startOffset = target.currOffset;
+    writerCurrentlyInUse = true;
+
+    return new SegmentFileChannel()
+    {
+      private boolean open = true;
+      private long bytesWritten = 0;
+
+      @Override
+      public int write(ByteBuffer src) throws IOException
+      {
+        return Ints.checkedCast(verifySize(target.write(src)));
+      }
+
+      @Override
+      public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException
+      {
+        return verifySize(target.write(srcs, offset, length));
+      }
+
+      @Override
+      public long write(ByteBuffer[] srcs) throws IOException
+      {
+        return verifySize(target.write(srcs));
+      }
+
+      private long verifySize(long bytesWrittenInChunk)
+      {
+        bytesWritten += bytesWrittenInChunk;
+
+        if (bytesWritten != target.currOffset - startOffset) {
+          throw new ISE("Perhaps there is some concurrent modification going 
on?");
+        }
+        if (bytesWritten > size) {
+          throw new ISE("Wrote[%,d] bytes for something of size[%,d].  
Liar!!!", bytesWritten, size);
+        }
+
+        return bytesWrittenInChunk;
+      }
+
+      @Override
+      public boolean isOpen()
+      {
+        return open;
+      }
+
+      @Override
+      public void close() throws IOException
+      {
+        if (!open) {
+          return;
+        }
+        open = false;
+        writerCurrentlyInUse = false;
+
+        if (bytesWritten != target.currOffset - startOffset) {
+          throw new ISE("Perhaps there is some concurrent modification going 
on?");
+        }
+        if (bytesWritten != size) {
+          throw new IOE("Expected [%,d] bytes, only saw [%,d], potential 
corruption?", size, bytesWritten);
+        }
+        internalFiles.put(
+            name,
+            new SegmentInternalFileMetadata(target.fileNum, startOffset, 
target.currOffset - startOffset)
+        );
+        mergeDelegatedFiles();
+      }
+    };
   }
 
   @Override
@@ -125,7 +278,7 @@ public class SegmentFileBuilderV10 implements 
SegmentFileBuilder
   {
     return externalSegmentFileBuilders.computeIfAbsent(
         externalFile,
-        (k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir, 
maxChunkSize, metadataCompression)
+        (k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir, 
maxContainerSize, metadataCompression)
     );
   }
 
@@ -135,6 +288,31 @@ public class SegmentFileBuilderV10 implements 
SegmentFileBuilder
     this.columns.put(name, columnDescriptor);
   }
 
+  /**
+   * Declare the file group that subsequent writes belong to. Writes are 
routed into a container tagged with the
+   * declared group; a new container is rolled when the group changes or the 
incoming file won't fit. A group whose
+   * total size exceeds {@link #maxContainerSize} is split across multiple 
consecutive containers, all tagged with
+   * the same group. Passing {@code null} clears the current group; subsequent 
writes are then routed into a
+   * null-group container until the next call.
+   * <p>
+   * Current V10-specific limitations worth knowing:
+   * <ul>
+   *   <li>Groups cannot be re-entered. Once a different group (or {@code 
null}) has been declared, the previous
+   *       group's container is closed, and you cannot go back and append more 
files to it, any such writes would
+   *       open a fresh container for the re-declared group, so the group's 
files would end up in non-contiguous
+   *       containers. If all of a group's files must land in the same 
container(s), write them contiguously.</li>
+   *   <li>Throws if called while a writer returned by {@link #addWithChannel} 
is still open.</li>
+   * </ul>
+   */
+  @Override
+  public void startFileGroup(@Nullable String groupName)
+  {
+    if (writerCurrentlyInUse) {
+      throw DruidException.defensive("Cannot start file group[%s] while a 
writer is in progress", groupName);
+    }
+    this.currentFileGroup = groupName;
+  }
+
   public void addInterval(String interval)
   {
     this.interval = interval;
@@ -153,7 +331,9 @@ public class SegmentFileBuilderV10 implements 
SegmentFileBuilder
   @Override
   public void abort()
   {
-    smoosher.abort();
+    if (currentContainer != null) {
+      CloseableUtils.closeAndWrapExceptions(currentContainer);
+    }
   }
 
   @Override
@@ -163,11 +343,22 @@ public class SegmentFileBuilderV10 implements 
SegmentFileBuilder
       externalBuilder.close();
     }
 
-    smoosher.close();
+    if (!completedDelegates.isEmpty() || !inProgressDelegates.isEmpty()) {
+      abort();
+      throw new ISE(
+          "[%d] writers in progress and [%d] completed writers needs to be 
closed before closing builder.",
+          inProgressDelegates.size(),
+          completedDelegates.size()
+      );
+    }
 
-    SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata(
-        smoosher.getContainers(),
-        smoosher.getInternalFiles(),
+    if (currentContainer != null) {
+      currentContainer.close();
+    }
+
+    final SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata(
+        buildContainerMetadata(),
+        internalFiles,
         interval,
         columns.isEmpty() ? null : columns,
         projections,
@@ -222,7 +413,8 @@ public class SegmentFileBuilderV10 implements 
SegmentFileBuilder
         Channels.writeFully(channel, compressed);
       }
 
-      for (File f : smoosher.getOutFiles()) {
+      for (ContainerWriter container : containers) {
+        final File f = container.file;
         try (FileInputStream fis = new FileInputStream(f)) {
           byte[] buffer = new byte[4096];
           int bytesRead;
@@ -230,13 +422,239 @@ public class SegmentFileBuilderV10 implements 
SegmentFileBuilder
             outputStream.write(buffer, 0, bytesRead);
           }
         }
-        // delete all the old 00000.smoosh
+        // delete all the old container files
         DruidException.conditionalDefensive(
             f.delete(),
-            "Failed to delete temporary file[%s]",
+            "Failed to delete temporary container file[%s]",
             f
         );
       }
     }
   }
+
+  private List<SegmentFileContainerMetadata> buildContainerMetadata()
+  {
+    final List<SegmentFileContainerMetadata> result = new 
ArrayList<>(containers.size());
+    long offset = 0;
+    for (ContainerWriter container : containers) {
+      final long length = container.file.length();
+      result.add(new SegmentFileContainerMetadata(offset, length));
+      offset += length;
+    }
+    return result;
+  }
+
+  /**
+   * Ensure that {@link #currentContainer} is ready to accept {@code size} 
bytes of a file belonging to {@code group}.
+   * Rolls the current container and starts a new one when:
+   * <ul>
+   *   <li>there is no current container, or</li>
+   *   <li>the current container is for a different group, or</li>
+   *   <li>the current container cannot fit the incoming bytes within {@link 
#maxContainerSize}.</li>
+   * </ul>
+   */
+  private void ensureContainer(@Nullable String group, long size) throws 
IOException
+  {
+    if (currentContainer == null
+        || !Objects.equals(currentContainer.group, group)
+        || !currentContainer.canFit(size)) {
+      if (currentContainer != null) {
+        currentContainer.close();
+      }
+      currentContainer = openNewContainer(group);
+      containers.add(currentContainer);
+    }
+  }
+
+  private ContainerWriter openNewContainer(@Nullable String group) throws 
IOException
+  {
+    FileUtils.mkdirp(baseDir);
+    final int fileNum = containers.size();
+    final File containerFile = new File(
+        baseDir,
+        StringUtils.format("%s-%05d.container", outputFileName, fileNum)
+    );
+    return new ContainerWriter(fileNum, containerFile, group, 
maxContainerSize);
+  }
+
+  private SegmentFileChannel delegateChannel(final String name, final long 
size) throws IOException
+  {
+    // Prefixed with outputFileName so delegate files from a main builder and 
its externals (which share baseDir)
+    // cannot collide, since main and external always have distinct output 
file names.
+    final String delegateName = StringUtils.format("%s-delegate-%d", 
outputFileName, delegateFileCounter++);
+    final File tmpFile = new File(baseDir, delegateName);
+    // Snapshot the active group now so that if this delegate is merged after 
the outer writer has advanced past
+    // the group it was created under, it still routes into the correct 
container.
+    final DelegateEntry entry = new DelegateEntry(tmpFile, name, 
currentFileGroup);
+    inProgressDelegates.add(entry);
+
+    return new SegmentFileChannel()
+    {
+      private final FileChannel channel = FileChannel.open(
+          tmpFile.toPath(),
+          StandardOpenOption.WRITE,
+          StandardOpenOption.CREATE,
+          StandardOpenOption.TRUNCATE_EXISTING
+      );
+
+      private long bytesWritten = 0;
+
+      @Override
+      public int write(ByteBuffer src) throws IOException
+      {
+        return Ints.checkedCast(addBytes(channel.write(src)));
+      }
+
+      @Override
+      public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException
+      {
+        return addBytes(channel.write(srcs, offset, length));
+      }
+
+      @Override
+      public long write(ByteBuffer[] srcs) throws IOException
+      {
+        return addBytes(channel.write(srcs));
+      }
+
+      private long addBytes(long n)
+      {
+        if (n > size - bytesWritten) {
+          throw new ISE(
+              "Wrote more bytes[%,d] than expected[%,d] for delegated 
file[%s]",
+              bytesWritten + n, size, name
+          );
+        }
+        bytesWritten += n;
+        return n;
+      }
+
+      @Override
+      public boolean isOpen()
+      {
+        return channel.isOpen();
+      }
+
+      @Override
+      public void close() throws IOException
+      {
+        channel.close();
+        completedDelegates.add(entry);
+        inProgressDelegates.remove(entry);
+        if (!writerCurrentlyInUse) {
+          mergeDelegatedFiles();
+        }
+      }
+    };
+  }
+
+  /**
+   * Move completed delegate temp files into containers by replaying them as 
regular {@link #add} calls. Only called
+   * when no outer writer is currently holding the builder. Each entry's 
snapshotted group is restored as
+   * {@link #currentFileGroup} during its replay so the file lands in the 
container that was active when the
+   * nested write was originally requested, not whichever group happens to be 
active at merge time.
+   */
+  private void mergeDelegatedFiles() throws IOException
+  {
+    if (completedDelegates.isEmpty()) {
+      return;
+    }
+    final List<DelegateEntry> toProcess = new ArrayList<>(completedDelegates);
+    completedDelegates.clear();
+    final String savedGroup = currentFileGroup;
+    try {
+      for (DelegateEntry entry : toProcess) {
+        currentFileGroup = entry.group;
+        add(entry.name, entry.file);
+        if (!entry.file.delete()) {
+          LOG.warn("Unable to delete delegate file[%s]", entry.file);
+        }
+      }
+    }
+    finally {
+      currentFileGroup = savedGroup;
+    }
+  }
+
+  private record DelegateEntry(File file, String name, @Nullable String group)
+  {
+  }
+
+  /**
+   * Low-level writer for a single container chunk file. One container holds 
internal files from at most one group.
+   */
+  private static class ContainerWriter implements GatheringByteChannel
+  {
+    private final int fileNum;
+    private final File file;
+    @Nullable
+    private final String group;
+    private final long maxSize;
+    private final Closer closer = Closer.create();
+    private final GatheringByteChannel channel;
+    private long currOffset = 0;
+
+    ContainerWriter(int fileNum, File file, @Nullable String group, long 
maxSize) throws IOException
+    {
+      this.fileNum = fileNum;
+      this.file = file;
+      this.group = group;
+      this.maxSize = maxSize;
+      final FileOutputStream outStream = closer.register(new 
FileOutputStream(file));
+      this.channel = closer.register(outStream.getChannel());
+    }
+
+    boolean canFit(long size)
+    {
+      // overflow-safe form of currOffset + size <= maxSize for non-negative 
currOffset/size/maxSize
+      return size <= maxSize - currOffset;
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException
+    {
+      return Ints.checkedCast(recordWrite(channel.write(src)));
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException
+    {
+      return recordWrite(channel.write(srcs, offset, length));
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs) throws IOException
+    {
+      return recordWrite(channel.write(srcs));
+    }
+
+    private long recordWrite(long n)
+    {
+      if (n > maxSize - currOffset) {
+        throw new ISE("Wrote more bytes[%,d] than available[%,d]", n, maxSize 
- currOffset);
+      }
+      currOffset += n;
+      return n;
+    }
+
+    @Override
+    public boolean isOpen()
+    {
+      return channel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      closer.close();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Created container file[%s] for group[%s] of size[%,d] bytes.",
+            file.getAbsolutePath(),
+            group,
+            file.length()
+        );
+      }
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
 
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
index 0415d540e24..c81549f41c2 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java
@@ -57,6 +57,17 @@ public class Projections
 
   private static final ConcurrentHashMap<byte[], Boolean> PERIOD_GRAN_CACHE = 
new ConcurrentHashMap<>();
 
+  public static String validateProjectionName(@Nullable String name)
+  {
+    if (name == null || name.isEmpty()) {
+      throw InvalidInput.exception("projection name cannot be null or empty");
+    }
+    if (name.startsWith("__")) {
+      throw InvalidInput.exception("projection cannot use reserved name[%s]", 
BASE_TABLE_PROJECTION_NAME);
+    }
+    return name;
+  }
+
   @Nullable
   public static <T> QueryableProjection<T> findMatchingProjection(
       CursorBuildSpec cursorBuildSpec,
@@ -509,12 +520,12 @@ public class Projections
     return projectionSpec.getSchema().getName() + "/";
   }
 
-  public static String getProjectionSmooshFileName(ProjectionSchema schema, 
String columnName)
+  public static String getProjectionSegmentInternalFileName(ProjectionSchema 
schema, String columnName)
   {
-    return getProjectionSmooshPrefix(schema) + columnName;
+    return getProjectionSegmentInternalFilePrefix(schema) + columnName;
   }
 
-  public static String getProjectionSmooshPrefix(ProjectionSchema 
projectionSchema)
+  public static String getProjectionSegmentInternalFilePrefix(ProjectionSchema 
projectionSchema)
   {
     return projectionSchema.getName() + "/";
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java
 
b/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java
new file mode 100644
index 00000000000..89c232973cc
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Files;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.TestHelper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+class SegmentFileBuilderV10Test
+{
+  private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
+
+  @TempDir
+  File tempDir;
+
+  @Test
+  void testOneContainerPerProjection() throws IOException
+  {
+    final File baseDir = new File(tempDir, "base_" + 
ThreadLocalRandom.current().nextInt());
+    FileUtils.mkdirp(baseDir);
+
+    // matches the production usage pattern in IndexMergerV10: call 
startFileGroup then write that projection's
+    // columns, then move on to the next projection.
+    final String[] projections = {"__base", "projA", "projB"};
+    final int colCount = 3;
+    try (SegmentFileBuilderV10 builder = 
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+      for (String projection : projections) {
+        builder.startFileGroup(projection);
+        for (int col = 0; col < colCount; col++) {
+          final String name = projection + "/col" + col;
+          final File tmpFile = new File(tempDir, 
StringUtils.format("%s-%s.bin", projection, col));
+          Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+          builder.add(name, tmpFile);
+        }
+      }
+    }
+
+    final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+    try (SegmentFileMapperV10 mapper = 
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) {
+      final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata();
+
+      Assertions.assertEquals(projections.length, 
metadata.getContainers().size());
+      Assertions.assertEquals(projections.length * 3, 
metadata.getFiles().size());
+      assertNoContainerMixesProjections(metadata);
+
+      assertColumns(projections, colCount, mapper);
+    }
+  }
+
+  @Test
+  void testProjectionNameWithSlashRoutesCorrectly() throws IOException
+  {
+    final File baseDir = new File(tempDir, "base_" + 
ThreadLocalRandom.current().nextInt());
+    FileUtils.mkdirp(baseDir);
+
+    final String slashyProjection = "nested/projection";
+    final int colCount = 3;
+    try (SegmentFileBuilderV10 builder = 
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+      builder.startFileGroup("__base");
+      for (int col = 0; col < colCount; col++) {
+        final String name = "__base/col" + col;
+        final File tmpFile = new File(tempDir, 
StringUtils.format("base-%s.bin", col));
+        Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+        builder.add(name, tmpFile);
+      }
+      builder.startFileGroup(slashyProjection);
+      for (int col = 0; col < colCount; col++) {
+        final String name = slashyProjection + "/col" + col;
+        final File tmpFile = new File(tempDir, 
StringUtils.format("slashy-%s.bin", col));
+        Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+        builder.add(name, tmpFile);
+      }
+    }
+
+    final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+    try (SegmentFileMapperV10 mapper = 
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) {
+      final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata();
+      // 2 projections, 2 containers, even though the slashy name's first '/' 
would have parsed as projection "nested"
+      Assertions.assertEquals(2, metadata.getContainers().size());
+      Assertions.assertEquals(2 * colCount, metadata.getFiles().size());
+
+      // round-trip both sets of files
+      for (int col = 0; col < colCount; col++) {
+        final String baseName = "__base/col" + col;
+        final ByteBuffer baseBuf = mapper.mapFile(baseName);
+        Assertions.assertNotNull(baseBuf, baseName);
+        Assertions.assertEquals(baseName.hashCode(), baseBuf.getInt(), 
baseName);
+
+        final String slashyName = slashyProjection + "/col" + col;
+        final ByteBuffer slashyBuf = mapper.mapFile(slashyName);
+        Assertions.assertNotNull(slashyBuf, slashyName);
+        Assertions.assertEquals(slashyName.hashCode(), slashyBuf.getInt(), 
slashyName);
+      }
+    }
+  }
+
+  @Test
+  void testStartFileGroupWhileWriterInUseThrows() throws IOException
+  {
+    final File baseDir = new File(tempDir, "base_" + 
ThreadLocalRandom.current().nextInt());
+    FileUtils.mkdirp(baseDir);
+
+    try (SegmentFileBuilderV10 builder = 
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+      builder.startFileGroup("__base");
+      try (SegmentFileChannel outer = builder.addWithChannel("__base/col0", 
4)) {
+        Assertions.assertThrows(RuntimeException.class, () -> 
builder.startFileGroup("projA"));
+        outer.write(ByteBuffer.wrap(new byte[]{1, 2, 3, 4}));
+      }
+    }
+  }
+
+  @Test
+  void testExternalBuilderAlsoSplitsContainersByProjection() throws IOException
+  {
+    final String externalName = "external.segment";
+    final File baseDir = new File(tempDir, "base_" + 
ThreadLocalRandom.current().nextInt());
+    FileUtils.mkdirp(baseDir);
+
+    final String[] mainProjections = {"__base", "projA", "projB"};
+    final String[] externalProjections = {"extProjX", "extProjY"};
+    final int colCount = 3;
+
+    try (SegmentFileBuilderV10 builder = 
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+      for (String projection : mainProjections) {
+        builder.startFileGroup(projection);
+        for (int col = 0; col < colCount; col++) {
+          final String name = projection + "/col" + col;
+          final File tmpFile = new File(tempDir, 
StringUtils.format("main-%s-%s.bin", projection, col));
+          Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+          builder.add(name, tmpFile);
+        }
+      }
+
+      // getExternalBuilder returns the SegmentFileBuilder interface but under 
the hood produces an independent V10
+      // sub-file with its own header + containers. Projection-per-container 
splitting must apply there too.
+      final SegmentFileBuilder external = 
builder.getExternalBuilder(externalName);
+      for (String projection : externalProjections) {
+        external.startFileGroup(projection);
+        for (int col = 0; col < colCount; col++) {
+          final String name = projection + "/col" + (col + 1000);
+          final File tmpFile = new File(tempDir, 
StringUtils.format("ext-%s-%s.bin", projection, col));
+          Files.write(Ints.toByteArray(name.hashCode()), tmpFile);
+          external.add(name, tmpFile);
+        }
+      }
+    }
+
+    final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+    final File externalFile = new File(baseDir, externalName);
+    Assertions.assertTrue(segmentFile.exists(), "main v10 file missing");
+    Assertions.assertTrue(externalFile.exists(), "external v10 file missing");
+
+    // the external file on its own is a well-formed V10 sub-segment, load it 
directly to check its container layout.
+    try (SegmentFileMapperV10 externalOnly = 
SegmentFileMapperV10.create(externalFile, JSON_MAPPER)) {
+      final SegmentFileMetadata externalMetadata = 
externalOnly.getSegmentFileMetadata();
+      Assertions.assertEquals(externalProjections.length, 
externalMetadata.getContainers().size());
+      Assertions.assertEquals(externalProjections.length * colCount, 
externalMetadata.getFiles().size());
+      assertNoContainerMixesProjections(externalMetadata);
+    }
+
+    // loaded together: main file checks its own containers and the external 
is attached for mapExternalFile().
+    try (SegmentFileMapperV10 mapper = 
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER, List.of(externalName))) {
+      final SegmentFileMetadata mainMetadata = mapper.getSegmentFileMetadata();
+      Assertions.assertEquals(mainProjections.length, 
mainMetadata.getContainers().size());
+      Assertions.assertEquals(mainProjections.length * colCount, 
mainMetadata.getFiles().size());
+      assertNoContainerMixesProjections(mainMetadata);
+
+      assertColumns(mainProjections, colCount, mapper);
+
+      for (String projection : externalProjections) {
+        for (int col = 0; col < colCount; col++) {
+          final String name = projection + "/col" + (col + 1000);
+          final ByteBuffer buf = mapper.mapExternalFile(externalName, name);
+          Assertions.assertNotNull(buf, name);
+          Assertions.assertEquals(name.hashCode(), buf.getInt(), name);
+        }
+      }
+    }
+  }
+
+  @Test
+  void testNestedAddWithChannelDelegatesPerBuilder() throws IOException
+  {
+    // exercises the delegate-temp-file path on both the main and external 
builders: while an outer addWithChannel is
+    // mid-write on a builder, a nested addWithChannel on the same builder 
must route through a temp file and then be
+    // merged back in at outer-close. Main and external each drive this 
independently, and since they share baseDir,
+    // their delegate file names must not collide.
+    final String externalName = "external.segment";
+    final File baseDir = new File(tempDir, "base_" + 
ThreadLocalRandom.current().nextInt());
+    FileUtils.mkdirp(baseDir);
+
+    final byte[] outerBytes = new byte[]{1, 2, 3, 4};
+    final byte[] nestedBytes = new byte[]{5, 6, 7, 8};
+
+    try (SegmentFileBuilderV10 builder = 
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+      builder.startFileGroup("__base");
+      try (SegmentFileChannel outer = builder.addWithChannel("__base/outer", 
outerBytes.length)) {
+        // nested write while outer is in use → forced into delegate temp file
+        try (SegmentFileChannel nested = 
builder.addWithChannel("__base/nested", nestedBytes.length)) {
+          nested.write(ByteBuffer.wrap(nestedBytes));
+        }
+        outer.write(ByteBuffer.wrap(outerBytes));
+      }
+
+      final SegmentFileBuilder external = 
builder.getExternalBuilder(externalName);
+      external.startFileGroup("extProj");
+      try (SegmentFileChannel extOuter = 
external.addWithChannel("extProj/outer", outerBytes.length)) {
+        try (SegmentFileChannel extNested = 
external.addWithChannel("extProj/nested", nestedBytes.length)) {
+          extNested.write(ByteBuffer.wrap(nestedBytes));
+        }
+        extOuter.write(ByteBuffer.wrap(outerBytes));
+      }
+    }
+
+    final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+    try (SegmentFileMapperV10 mapper = 
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER, List.of(externalName))) {
+      assertBytes(mapper.mapFile("__base/outer"), outerBytes);
+      assertBytes(mapper.mapFile("__base/nested"), nestedBytes);
+      assertBytes(mapper.mapExternalFile(externalName, "extProj/outer"), 
outerBytes);
+      assertBytes(mapper.mapExternalFile(externalName, "extProj/nested"), 
nestedBytes);
+    }
+  }
+
+  @Test
+  void testNestedDelegateClosedAfterOuterRoutesToOriginalGroup() throws 
IOException
+  {
+    // doing something like this is weird and probably should happen in 
practice, but if a nested write was requested
+    // while file group "groupA" was active; even if the caller switches to 
"groupB" before finally closing the nested
+    // channel, the delegated bytes must still land in groupA's container, not 
groupB's. Otherwise the grouping breaks,
+    // and files from other groups end up in the same container.
+    final File baseDir = new File(tempDir, "base_" + 
ThreadLocalRandom.current().nextInt());
+    FileUtils.mkdirp(baseDir);
+
+    final byte[] outerBytes = new byte[]{1, 2, 3, 4};
+    final byte[] nestedBytes = new byte[]{5, 6, 7, 8};
+    final byte[] groupBBytes = new byte[]{9, 10, 11, 12};
+
+    try (SegmentFileBuilderV10 builder = 
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+      builder.startFileGroup("groupA");
+
+      final SegmentFileChannel outer = builder.addWithChannel("groupA/outer", 
outerBytes.length);
+      final SegmentFileChannel nested = 
builder.addWithChannel("groupA/nested", nestedBytes.length);
+      nested.write(ByteBuffer.wrap(nestedBytes));
+
+      // close the outer first so writerCurrentlyInUse clears while the nested 
delegate is still open
+      outer.write(ByteBuffer.wrap(outerBytes));
+      outer.close();
+
+      // switch group before closing the still-open nested delegate; merge 
must use the snapshotted "groupA"
+      builder.startFileGroup("groupB");
+      nested.close();
+
+      // and a real groupB file so we can verify groupB's container is 
independent of the nested file
+      try (SegmentFileChannel groupBFile = 
builder.addWithChannel("groupB/file", groupBBytes.length)) {
+        groupBFile.write(ByteBuffer.wrap(groupBBytes));
+      }
+    }
+
+    final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+    try (SegmentFileMapperV10 mapper = 
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) {
+      final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata();
+
+      // the nested file was requested under groupA, so it must share groupA's 
container with groupA/outer
+      // and must NOT be in groupB's container alongside groupB/file.
+      final int outerContainer = 
metadata.getFiles().get("groupA/outer").getContainer();
+      final int nestedContainer = 
metadata.getFiles().get("groupA/nested").getContainer();
+      final int groupBContainer = 
metadata.getFiles().get("groupB/file").getContainer();
+      Assertions.assertEquals(outerContainer, nestedContainer, "nested 
delegate landed in the wrong container");
+      Assertions.assertNotEquals(groupBContainer, nestedContainer, "nested 
delegate leaked into groupB's container");
+
+      assertBytes(mapper.mapFile("groupA/outer"), outerBytes);
+      assertBytes(mapper.mapFile("groupA/nested"), nestedBytes);
+      assertBytes(mapper.mapFile("groupB/file"), groupBBytes);
+    }
+  }
+
+  @Test
+  void testUnprefixedFilesShareSingleContainer() throws IOException
+  {
+    final File baseDir = new File(tempDir, "base_" + 
ThreadLocalRandom.current().nextInt());
+    FileUtils.mkdirp(baseDir);
+
+    try (SegmentFileBuilderV10 builder = 
SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) {
+      for (int i = 0; i < 5; ++i) {
+        final File tmpFile = new File(tempDir, 
StringUtils.format("plain-%s.bin", i));
+        Files.write(Ints.toByteArray(i), tmpFile);
+        builder.add(String.valueOf(i), tmpFile);
+      }
+    }
+
+    final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME);
+    try (SegmentFileMapperV10 mapper = 
SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) {
+      Assertions.assertEquals(1, 
mapper.getSegmentFileMetadata().getContainers().size());
+    }
+  }
+
+  private static void assertBytes(ByteBuffer actual, byte[] expected)
+  {
+    Assertions.assertNotNull(actual);
+    Assertions.assertEquals(expected.length, actual.remaining());
+    final byte[] got = new byte[expected.length];
+    actual.get(got);
+    Assertions.assertArrayEquals(expected, got);
+  }
+
+  private static void assertNoContainerMixesProjections(SegmentFileMetadata 
metadata)
+  {
+    for (int containerIdx = 0; containerIdx < metadata.getContainers().size(); 
containerIdx++) {
+      final Set<String> projectionsInContainer = new HashSet<>();
+      for (Map.Entry<String, SegmentInternalFileMetadata> entry : 
metadata.getFiles().entrySet()) {
+        if (entry.getValue().getContainer() == containerIdx) {
+          final int slash = entry.getKey().indexOf('/');
+          projectionsInContainer.add(slash < 0 ? "" : 
entry.getKey().substring(0, slash));
+        }
+      }
+      Assertions.assertEquals(
+          1,
+          projectionsInContainer.size(),
+          "container[" + containerIdx + "] mixes projections: " + 
projectionsInContainer
+      );
+    }
+  }
+
+  private static void assertColumns(String[] projections, int colCount, 
SegmentFileMapperV10 mapper) throws IOException
+  {
+    for (String projection : projections) {
+      for (int col = 0; col < colCount; col++) {
+        final String name = projection + "/col" + col;
+        final ByteBuffer buf = mapper.mapFile(name);
+        Assertions.assertNotNull(buf, name);
+        Assertions.assertEquals(name.hashCode(), buf.getInt(), name);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to