This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c6a84b8ec Improved segment build time for Lucene text index realtime 
to offline conversion (#12744)
2c6a84b8ec is described below

commit 2c6a84b8ec840ed679c0a98230c518522b6591dd
Author: Christopher Peck <[email protected]>
AuthorDate: Mon Apr 15 09:34:37 2024 -0700

    Improved segment build time for Lucene text index realtime to offline 
conversion (#12744)
    
    * reuse mutable lucene index during segment conversion
    
    * realtime segment conversion only
    
    * add RealtimeSegmentConverter test for index reuse path
    
    * clarify naming
    
    * fix missed renaming
    
    * address comments, close all resources
---
 .../indexsegment/mutable/MutableSegmentImpl.java   |  13 ++
 .../converter/RealtimeSegmentConverter.java        |   3 +
 .../invertedindex/RealtimeLuceneTextIndex.java     |  13 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |   5 +-
 .../impl/SegmentIndexCreationDriverImpl.java       |  45 +++++-
 .../creator/impl/text/LuceneTextIndexCreator.java  | 153 ++++++++++++++++++++-
 .../index/readers/text/LuceneTextIndexReader.java  |   2 +-
 .../converter/RealtimeSegmentConverterTest.java    | 153 ++++++++++++++++++++-
 .../segment/store/FilePerIndexDirectoryTest.java   |  11 +-
 .../store/SingleFileIndexDirectoryTest.java        |   8 +-
 .../segment/spi/creator/IndexCreationContext.java  |  48 ++++++-
 .../pinot/segment/spi/creator/SegmentCreator.java  |   3 +-
 .../spi/creator/SegmentGeneratorConfig.java        |   9 ++
 .../pinot/segment/spi/index/TextIndexConfig.java   |   2 +-
 .../segment/spi/index/mutable/MutableIndex.java    |   8 ++
 15 files changed, 442 insertions(+), 34 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 55e0aec072..b336b30f20 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -932,6 +932,19 @@ public class MutableSegmentImpl implements MutableSegment {
     }
   }
 
+  /**
+   * Calls commit() on all mutable indexes. This is used in preparation for 
realtime segment conversion.
+   * .commit() can be implemented per index to perform any required actions 
before using mutable segment
+   * artifacts to optimize immutable segment build.
+   */
+  public void commit() {
+    for (IndexContainer indexContainer : _indexContainerMap.values()) {
+      for (MutableIndex mutableIndex : 
indexContainer._mutableIndexes.values()) {
+        mutableIndex.commit();
+      }
+    }
+  }
+
   @Override
   public void destroy() {
     _logger.info("Trying to close RealtimeSegmentImpl : {}", _segmentName);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index ffb9bfc23f..0bf8fe571f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -118,6 +118,9 @@ public class RealtimeSegmentConverter {
     genConfig.setNullHandlingEnabled(_nullHandlingEnabled);
     genConfig.setSegmentZKPropsConfig(_segmentZKPropsConfig);
 
+    // flush any artifacts to disk to improve mutable to immutable segment 
conversion
+    _realtimeSegmentImpl.commit();
+
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
       int[] sortedDocIds = _columnIndicesForRealtimeTable.getSortedColumn() != 
null
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index a71d2663ed..8d2e43c8a5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -78,7 +78,7 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
       // for realtime
       _indexCreator =
           new LuceneTextIndexCreator(column, new 
File(segmentIndexDir.getAbsolutePath() + "/" + segmentName),
-              false /* commitOnClose */, config);
+              false /* commitOnClose */, true, null, config);
       IndexWriter indexWriter = _indexCreator.getIndexWriter();
       _searcherManager = new SearcherManager(indexWriter, false, false, null);
       _analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
@@ -181,6 +181,17 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
     return actualDocIDs;
   }
 
+  @Override
+  public void commit() {
+    try {
+      _indexCreator.getIndexWriter().commit();
+    } catch (Exception e) {
+      LOGGER.error("Failed to commit the realtime lucene text index for column 
{}, exception {}", _column,
+          e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
   public void close() {
     try {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 2d7909b407..168490635a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -106,7 +106,8 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
 
   @Override
   public void init(SegmentGeneratorConfig segmentCreationSpec, 
SegmentIndexCreationInfo segmentIndexCreationInfo,
-      TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema 
schema, File outDir)
+      TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema 
schema, File outDir,
+      @Nullable int[] immutableToMutableIdMap)
       throws Exception {
     _docIdCounter = 0;
     _config = segmentCreationSpec;
@@ -158,6 +159,8 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
           .onHeap(segmentCreationSpec.isOnHeap())
           .withForwardIndexDisabled(forwardIndexDisabled)
           .withTextCommitOnClose(true)
+          .withImmutableToMutableIdMap(immutableToMutableIdMap)
+          .withRealtimeConversion(segmentCreationSpec.isRealtimeConversion())
           .build();
       //@formatter:on
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index e99d89c8b4..ecfea58ca7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -36,6 +36,7 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
 import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
 import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
 import 
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
@@ -191,6 +192,11 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
       ((RecordReaderSegmentCreationDataSource) 
dataSource).setTransformPipeline(transformPipeline);
     }
 
+    // Optimization for realtime segment conversion
+    if (dataSource instanceof RealtimeSegmentSegmentCreationDataSource) {
+      _config.setRealtimeConversion(true);
+    }
+
     // Initialize stats collection
     _segmentStats = dataSource.gatherStats(
         new StatsCollectorConfig(config.getTableConfig(), _dataSchema, 
config.getSegmentPartitionConfig()));
@@ -218,6 +224,23 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     LOGGER.debug("tempIndexDir:{}", _tempIndexDir);
   }
 
+  /**
+   * Generate a mutable docId to immutable docId mapping from the sortedDocIds 
iteration order
+   *
+   * @param sortedDocIds used to map sortedDocIds[immutableId] = mutableId 
(based on RecordReader iteration order)
+   * @return int[] used to map output[mutableId] = immutableId, or null if 
sortedDocIds is null
+   */
+  private int[] getImmutableToMutableIdMap(@Nullable int[] sortedDocIds) {
+    if (sortedDocIds == null) {
+      return null;
+    }
+    int[] res = new int[sortedDocIds.length];
+    for (int i = 0; i < res.length; i++) {
+      res[sortedDocIds[i]] = i;
+    }
+    return res;
+  }
+
   @Override
   public void build()
       throws Exception {
@@ -229,10 +252,19 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
 
     int incompleteRowsFound = 0;
     try {
+      // TODO: Eventually pull the doc Id sorting logic out of Record Reader 
so that all row oriented logic can be
+      //    removed from this code.
+      int[] immutableToMutableIdMap = null;
+      if (_recordReader instanceof PinotSegmentRecordReader) {
+        immutableToMutableIdMap =
+            getImmutableToMutableIdMap(((PinotSegmentRecordReader) 
_recordReader).getSortedDocIds());
+      }
+
       // Initialize the index creation using the per-column statistics 
information
       // TODO: _indexCreationInfoMap holds the reference to all unique values 
on heap (ColumnIndexCreationInfo ->
       //       ColumnStatistics) throughout the segment creation. Find a way 
to release the memory early.
-      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir,
+          immutableToMutableIdMap);
 
       // Build the index
       _recordReader.rewind();
@@ -299,19 +331,22 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     LOGGER.info("Collected stats for {} documents", _totalDocs);
 
     try {
+      // TODO: Eventually pull the doc Id sorting logic out of Record Reader 
so that all row oriented logic can be
+      //    removed from this code.
+      int[] sortedDocIds = ((PinotSegmentRecordReader) 
_recordReader).getSortedDocIds();
+      int[] immutableToMutableIdMap = getImmutableToMutableIdMap(sortedDocIds);
+
       // Initialize the index creation using the per-column statistics 
information
       // TODO: _indexCreationInfoMap holds the reference to all unique values 
on heap (ColumnIndexCreationInfo ->
       //       ColumnStatistics) throughout the segment creation. Find a way 
to release the memory early.
-      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir,
+          immutableToMutableIdMap);
 
       // Build the indexes
       LOGGER.info("Start building Index by column");
 
       TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
 
-      // TODO: Eventually pull the doc Id sorting logic out of Record Reader 
so that all row oriented logic can be
-      //    removed from this code.
-      int[] sortedDocIds = ((PinotSegmentRecordReader) 
_recordReader).getSortedDocIds();
       for (String col : columns) {
         _indexCreator.indexColumn(col, sortedDocIds, indexSegment);
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
index f14cf62bc6..49306d9404 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
@@ -20,8 +20,11 @@ package 
org.apache.pinot.segment.local.segment.creator.impl.text;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.HashSet;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.CharArraySet;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
@@ -29,8 +32,11 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.StoredField;
 import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
@@ -41,6 +47,10 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.index.TextIndexConfig;
 import 
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -49,12 +59,15 @@ import 
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCr
  * and realtime from {@link RealtimeLuceneTextIndex}
  */
 public class LuceneTextIndexCreator extends AbstractTextIndexCreator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LuceneTextIndexCreator.class);
   public static final String LUCENE_INDEX_DOC_ID_COLUMN_NAME = "DocID";
 
   private final String _textColumn;
-  private final Directory _indexDirectory;
-  private final IndexWriter _indexWriter;
-
+  private final boolean _commitOnClose;
+  private final boolean _reuseMutableIndex;
+  private final File _indexFile;
+  private Directory _indexDirectory;
+  private IndexWriter _indexWriter;
   private int _nextDocId = 0;
 
   public static HashSet<String> getDefaultEnglishStopWordsSet() {
@@ -75,6 +88,7 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
    * @param segmentIndexDir segment index directory
    * @param commit true if the index should be committed (at the end after all 
documents have
    *               been added), false if index should not be committed
+   * @param immutableToMutableIdMap immutableToMutableIdMap from segment 
conversion
    * Note on commit:
    *               Once {@link SegmentColumnarIndexCreator}
    *               finishes indexing all documents/rows for the segment, we 
need to commit and close
@@ -90,14 +104,19 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
    *               to offline), we close this lucene index writer to release 
resources but don't commit.
    * @param config the text index config
    */
-  public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean 
commit, TextIndexConfig config) {
+  public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean 
commit, boolean realtimeConversion,
+      @Nullable int[] immutableToMutableIdMap, TextIndexConfig config) {
     _textColumn = column;
+    _commitOnClose = commit;
+
+    // to reuse the mutable index, it must be (1) not the realtime index, i.e. 
commit is set to false
+    // and (2) happens during realtime segment conversion
+    _reuseMutableIndex = commit && realtimeConversion;
     String luceneAnalyzerClass = config.getLuceneAnalyzerClass();
     try {
       // segment generation is always in V1 and later we convert (as part of 
post creation processing)
       // to V3 if segmentVersion is set to V3 in SegmentGeneratorConfig.
-      File indexFile = getV1TextIndexFile(segmentIndexDir);
-      _indexDirectory = FSDirectory.open(indexFile.toPath());
+      _indexFile = getV1TextIndexFile(segmentIndexDir);
 
       Analyzer luceneAnalyzer;
       if (luceneAnalyzerClass.isEmpty() || 
luceneAnalyzerClass.equals(StandardAnalyzer.class.getName())) {
@@ -111,6 +130,15 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
       indexWriterConfig.setRAMBufferSizeMB(config.getLuceneMaxBufferSizeMB());
       indexWriterConfig.setCommitOnClose(commit);
       indexWriterConfig.setUseCompoundFile(config.isLuceneUseCompoundFile());
+
+      if (_reuseMutableIndex) {
+        LOGGER.info("Reusing the realtime lucene index for segment {} and 
column {}", segmentIndexDir, column);
+        
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
+        convertMutableSegment(segmentIndexDir, immutableToMutableIdMap, 
indexWriterConfig);
+        return;
+      }
+
+      _indexDirectory = FSDirectory.open(_indexFile.toPath());
       _indexWriter = new IndexWriter(_indexDirectory, indexWriterConfig);
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException(
@@ -122,15 +150,102 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
   }
 
   public LuceneTextIndexCreator(IndexCreationContext context, TextIndexConfig 
indexConfig) {
-    this(context.getFieldSpec().getName(), context.getIndexDir(), 
context.isTextCommitOnClose(), indexConfig);
+    this(context.getFieldSpec().getName(), context.getIndexDir(), 
context.isTextCommitOnClose(),
+        context.isRealtimeConversion(), context.getImmutableToMutableIdMap(), 
indexConfig);
   }
 
   public IndexWriter getIndexWriter() {
     return _indexWriter;
   }
 
+  /**
+   * Copy the mutable lucene index files to create an immutable lucene index
+   * @param segmentIndexDir segment index directory
+   * @param immutableToMutableIdMap immutableToMutableIdMap from segment 
conversion
+   * @param indexWriterConfig indexWriterConfig
+   */
+  private void convertMutableSegment(File segmentIndexDir, @Nullable int[] 
immutableToMutableIdMap,
+      IndexWriterConfig indexWriterConfig) {
+    try {
+      // Copy the mutable index to the v1 index location
+      File dest = getV1TextIndexFile(segmentIndexDir);
+      File mutableDir = getMutableIndexDir(segmentIndexDir);
+      FileUtils.copyDirectory(mutableDir, dest);
+
+      // Remove the copied write.lock file
+      File writeLock = new File(dest, "write.lock");
+      FileUtils.delete(writeLock);
+
+      // Call .forceMerge(1) on the copied index as the mutable index will 
likely contain many Lucene segments
+      try (Directory destDirectory = FSDirectory.open(dest.toPath());
+          IndexWriter indexWriter = new IndexWriter(destDirectory, 
indexWriterConfig)) {
+        indexWriter.forceMerge(1, true);
+        indexWriter.commit();
+
+        buildMappingFile(segmentIndexDir, _textColumn, destDirectory, 
immutableToMutableIdMap);
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to build the mapping file during 
segment conversion: " + e);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to convert the mutable lucene index: 
" + e);
+    }
+  }
+
+  /**
+   * Generate the mapping file from mutable Pinot docId (stored within the 
Lucene index) to immutable Pinot docId using
+   * the immutableToMutableIdMap from segment conversion
+   * @param segmentIndexDir segment index directory
+   * @param column column name
+   * @param directory directory of the index
+   * @param immutableToMutableIdMap immutableToMutableIdMap from segment 
conversion
+   */
+  private void buildMappingFile(File segmentIndexDir, String column, Directory 
directory,
+      @Nullable int[] immutableToMutableIdMap)
+      throws IOException {
+    IndexReader indexReader = DirectoryReader.open(directory);
+    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
+
+    int numDocs = indexSearcher.getIndexReader().numDocs();
+    int length = Integer.BYTES * numDocs;
+    File docIdMappingFile = new 
File(SegmentDirectoryPaths.findSegmentDirectory(segmentIndexDir),
+        column + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
+    String desc = "Text index docId mapping buffer: " + column;
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(docIdMappingFile, /* 
readOnly */ false, 0, length,
+        ByteOrder.LITTLE_ENDIAN, desc)) {
+      try {
+        // If immutableToMutableIdMap is null, then docIds should not change 
between the mutable and immutable segments.
+        // Therefore, the mapping file can be built without doing an 
additional docId conversion
+        if (immutableToMutableIdMap == null) {
+          for (int i = 0; i < numDocs; i++) {
+            Document document = indexSearcher.doc(i);
+            int pinotDocId = 
Integer.parseInt(document.get(LuceneTextIndexCreator.LUCENE_INDEX_DOC_ID_COLUMN_NAME));
+            buffer.putInt(i * Integer.BYTES, pinotDocId);
+          }
+          return;
+        }
+
+        for (int i = 0; i < numDocs; i++) {
+          Document document = indexSearcher.doc(i);
+          int mutablePinotDocId =
+              
Integer.parseInt(document.get(LuceneTextIndexCreator.LUCENE_INDEX_DOC_ID_COLUMN_NAME));
+          int immutablePinotDocId = immutableToMutableIdMap[mutablePinotDocId];
+          buffer.putInt(i * Integer.BYTES, immutablePinotDocId);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Caught exception while building mutable to immutable doc id 
mapping for text index column: " + column, e);
+      }
+    } finally {
+      indexReader.close();
+    }
+  }
+
   @Override
   public void add(String document) {
+    if (_reuseMutableIndex) {
+      return; // no-op
+    }
+
     // text index on SV column
     Document docToIndex = new Document();
     docToIndex.add(new TextField(_textColumn, document, Field.Store.NO));
@@ -145,6 +260,10 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
 
   @Override
   public void add(String[] documents, int length) {
+    if (_reuseMutableIndex) {
+      return; // no-op
+    }
+
     Document docToIndex = new Document();
 
     // Whenever multiple fields with the same name appear in one document, 
both the
@@ -165,6 +284,9 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
 
   @Override
   public void seal() {
+    if (_reuseMutableIndex) {
+      return;  // no-op
+    }
     try {
       // Do this one time operation of combining the multiple lucene index 
files (if any)
       // into a single index file. Based on flush threshold and size of data, 
Lucene
@@ -190,12 +312,20 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
   @Override
   public void close()
       throws IOException {
+    if (_reuseMutableIndex) {
+      return;  // no-op
+    }
     try {
       // based on the commit flag set in IndexWriterConfig, this will decide 
to commit or not
       _indexWriter.close();
       _indexDirectory.close();
     } catch (Exception e) {
       throw new RuntimeException("Caught exception while closing the Lucene 
index for column: " + _textColumn, e);
+    } finally {
+      // remove leftover write.lock file, as well as artifacts from .commit() 
being called on the realtime index
+      if (!_commitOnClose) {
+        FileUtils.deleteQuietly(_indexFile);
+      }
     }
   }
 
@@ -203,4 +333,13 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
     String luceneIndexDirectory = _textColumn + 
V1Constants.Indexes.LUCENE_V9_TEXT_INDEX_FILE_EXTENSION;
     return new File(indexDir, luceneIndexDirectory);
   }
+
+  private File getMutableIndexDir(File indexDir) {
+    // tmpSegmentName format: tmp-tableName__9__1__20240227T0254Z-1709002522086
+    String tmpSegmentName = indexDir.getParentFile().getName();
+    String segmentName = 
tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4, 
tmpSegmentName.lastIndexOf('-'));
+    String mutableDir = indexDir.getParentFile().getParentFile().getParent() + 
"/consumers/" + segmentName + "/"
+        + _textColumn + 
V1Constants.Indexes.LUCENE_V9_TEXT_INDEX_FILE_EXTENSION;
+    return new File(mutableDir);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
index 3a0efabe8c..07eb52f88b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
@@ -171,7 +171,7 @@ public class LuceneTextIndexReader implements 
TextIndexReader {
       return docIds;
     } catch (Exception e) {
       String msg =
-          "Caught excepttion while searching the text index for column:" + 
_column + " search query:" + searchQuery;
+          "Caught exception while searching the text index for column:" + 
_column + " search query:" + searchQuery;
       throw new RuntimeException(msg, e);
     }
   }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index ded9e85b69..e4ed4bb396 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -24,6 +24,8 @@ import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -36,6 +38,7 @@ import 
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
 import 
org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
@@ -44,9 +47,12 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.TextIndexConfig;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
@@ -58,6 +64,8 @@ import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -101,7 +109,7 @@ public class RealtimeSegmentConverterTest {
       throws Exception {
     File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
             
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
             .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
             .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
@@ -167,7 +175,7 @@ public class RealtimeSegmentConverterTest {
       throws Exception {
     File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
     TableConfig tableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+        new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
             .setTimeColumnName(DATE_TIME_COLUMN)
             .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1, 
LONG_COLUMN1))
             .setSortedColumn(LONG_COLUMN1)
@@ -252,7 +260,7 @@ public class RealtimeSegmentConverterTest {
       throws Exception {
     File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
             
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
             .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
             .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
@@ -319,7 +327,7 @@ public class RealtimeSegmentConverterTest {
       throws Exception {
     File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
     TableConfig tableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+        new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
             .setTimeColumnName(DATE_TIME_COLUMN)
             .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1, 
LONG_COLUMN1))
             .setSortedColumn(LONG_COLUMN1)
@@ -433,6 +441,130 @@ public class RealtimeSegmentConverterTest {
     }
   }
 
+  @DataProvider
+  public static Object[][] reuseParams() {
+    List<Boolean> enabledColumnMajorSegmentBuildParams = Arrays.asList(false, 
true);
+    String[] sortedColumnParams = new String[]{null, STRING_COLUMN1};
+
+    return enabledColumnMajorSegmentBuildParams.stream().flatMap(
+            columnMajor -> Arrays.stream(sortedColumnParams).map(sortedColumn 
-> new Object[]{columnMajor,
+                sortedColumn}))
+        .toArray(Object[][]::new);
+  }
+
+  // Test the realtime segment conversion of a table with an index that reuses 
mutable index artifacts during conversion
+  @Test(dataProvider = "reuseParams")
+  public void testSegmentBuilderWithReuse(boolean columnMajorSegmentBuilder, 
String sortedColumn)
+      throws Exception {
+    File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+    FieldConfig textIndexFieldConfig =
+        new 
FieldConfig.Builder(STRING_COLUMN1).withEncodingType(FieldConfig.EncodingType.RAW)
+            
.withIndexTypes(Collections.singletonList(FieldConfig.IndexType.TEXT)).build();
+    List<FieldConfig> fieldConfigList = 
Collections.singletonList(textIndexFieldConfig);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+            .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1))
+            
.setSortedColumn(sortedColumn).setColumnMajorSegmentBuilderEnabled(columnMajorSegmentBuilder)
+            .setFieldConfigList(fieldConfigList).build();
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN1, 
FieldSpec.DataType.STRING)
+        .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+    String tableNameWithType = tableConfig.getTableName();
+    String segmentName = "testTable__0__0__123456";
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    TextIndexConfig textIndexConfig =
+        new TextIndexConfig(false, null, null, false, false, 
Collections.emptyList(), Collections.emptyList(), false,
+            500, null, false);
+
+    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+            
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
+            .setIndex(Sets.newHashSet(STRING_COLUMN1), 
StandardIndexes.inverted(), IndexConfig.ENABLED)
+            .setIndex(Sets.newHashSet(STRING_COLUMN1), StandardIndexes.text(), 
textIndexConfig)
+            
.setFieldConfigList(fieldConfigList).setSegmentZKMetadata(getSegmentZKMetadata(segmentName))
+            .setOffHeap(true).setMemoryManager(new 
DirectMemoryManager(segmentName))
+            .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new 
File(tmpDir, "stats")))
+            .setConsumerDir(new File(tmpDir, "consumers").getAbsolutePath());
+
+    // create mutable segment impl
+    RealtimeLuceneTextIndexSearcherPool.init(1);
+    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+    List<GenericRow> rows = generateTestDataForReusePath();
+
+    for (GenericRow row : rows) {
+      mutableSegmentImpl.index(row, null);
+    }
+
+    // build converted segment
+    File outputDir = new File(new File(tmpDir, segmentName), "tmp-" + 
segmentName + "-" + System.currentTimeMillis());
+    SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
+    segmentZKPropsConfig.setStartOffset("1");
+    segmentZKPropsConfig.setEndOffset("100");
+    ColumnIndicesForRealtimeTable cdc = new 
ColumnIndicesForRealtimeTable(sortedColumn,
+        indexingConfig.getInvertedIndexColumns(), 
Collections.singletonList(STRING_COLUMN1), null,
+        indexingConfig.getNoDictionaryColumns(), 
indexingConfig.getVarLengthDictionaryColumns());
+    RealtimeSegmentConverter converter =
+        new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, 
outputDir.getAbsolutePath(), schema,
+            tableNameWithType, tableConfig, segmentName, cdc, false);
+    converter.build(SegmentVersion.v3, null);
+
+    File indexDir = new File(outputDir, segmentName);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
+    assertEquals(segmentMetadata.getTotalDocs(), rows.size());
+    assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+
+    long expectedStartTime = (long) rows.get(0).getValue(DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getStartTime(), expectedStartTime);
+    long expectedEndTime = (long) rows.get(rows.size() - 
1).getValue(DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getEndTime(), expectedEndTime);
+
+    
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+    assertEquals(segmentMetadata.getStartOffset(), "1");
+    assertEquals(segmentMetadata.getEndOffset(), "100");
+
+    // read converted segment
+    SegmentLocalFSDirectory segmentDir = new SegmentLocalFSDirectory(indexDir, 
segmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Reader segmentReader = segmentDir.createReader();
+
+    Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
+    Map<String, ColumnMetadata> columnMetadataMap = 
segmentMetadata.getColumnMetadataMap();
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
tableConfig);
+    for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
+      indexContainerMap.put(entry.getKey(),
+          new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), 
indexLoadingConfig));
+    }
+    ImmutableSegmentImpl segmentFile = new ImmutableSegmentImpl(segmentDir, 
segmentMetadata, indexContainerMap, null);
+
+    // test forward index contents
+    GenericRow readRow = new GenericRow();
+    int docId = 0;
+    for (int i = 0; i < rows.size(); i++) {
+      GenericRow row;
+      if (sortedColumn == null) {
+        row = rows.get(i);
+      } else {
+        row = rows.get(rows.size() - i - 1);
+      }
+
+      segmentFile.getRecord(docId, readRow);
+      assertEquals(readRow.getValue(STRING_COLUMN1), 
row.getValue(STRING_COLUMN1));
+      assertEquals(readRow.getValue(DATE_TIME_COLUMN), 
row.getValue(DATE_TIME_COLUMN));
+      docId += 1;
+    }
+
+    // test docId conversion
+    TextIndexReader textIndexReader = segmentFile.getIndex(STRING_COLUMN1, 
StandardIndexes.text());
+    if (sortedColumn == null) {
+      assertEquals(textIndexReader.getDocIds("str-8"), 
ImmutableRoaringBitmap.bitmapOf(0));
+      assertEquals(textIndexReader.getDocIds("str-4"), 
ImmutableRoaringBitmap.bitmapOf(4));
+    } else {
+      assertEquals(textIndexReader.getDocIds("str-8"), 
ImmutableRoaringBitmap.bitmapOf(7));
+      assertEquals(textIndexReader.getDocIds("str-4"), 
ImmutableRoaringBitmap.bitmapOf(3));
+    }
+  }
+
   private List<GenericRow> generateTestData() {
     LinkedList<GenericRow> rows = new LinkedList<>();
 
@@ -457,6 +589,19 @@ public class RealtimeSegmentConverterTest {
     return rows;
   }
 
+  private List<GenericRow> generateTestDataForReusePath() {
+    List<GenericRow> rows = new LinkedList<>();
+
+    for (int i = 0; i < 8; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(STRING_COLUMN1, "str" + (i - 8));
+      row.putValue(DATE_TIME_COLUMN, 1697814309L + i);
+      rows.add(row);
+    }
+
+    return rows;
+  }
+
   private SegmentZKMetadata getSegmentZKMetadata(String segmentName) {
     SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
     segmentZKMetadata.setCreationTime(System.currentTimeMillis());
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
index f60de6d12d..38eae8436e 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
@@ -204,8 +204,8 @@ public class FilePerIndexDirectoryTest {
     TextIndexConfig config =
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-         LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, config);
-        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, config)) {
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, config);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, config)) {
       PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(), 
1024);
       buf.putInt(0, 1);
 
@@ -236,8 +236,7 @@ public class FilePerIndexDirectoryTest {
           new File(TEMP_DIR, "foo" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
     }
     assertTrue(new File(TEMP_DIR, "bar" + 
V1Constants.Indexes.LUCENE_V9_TEXT_INDEX_FILE_EXTENSION).exists());
-    assertTrue(
-        new File(TEMP_DIR, "bar" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
+    assertTrue(new File(TEMP_DIR, "bar" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
 
     // Read indices back and check the content.
     try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
@@ -268,8 +267,8 @@ public class FilePerIndexDirectoryTest {
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     // Write sth to buffers and flush them to index files on disk
     try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, config);
-        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, config)) {
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, config);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, config)) {
       PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(), 
1024);
       buf.putInt(0, 111);
       buf = fpi.newBuffer("col2", StandardIndexes.dictionary(), 1024);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
index 2849466636..7f0dcebb05 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
@@ -237,8 +237,8 @@ public class SingleFileIndexDirectoryTest {
     TextIndexConfig config =
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, config);
-        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, config)) {
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, config);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, config)) {
       PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(), 
1024);
       buf.putInt(0, 1);
 
@@ -343,8 +343,8 @@ public class SingleFileIndexDirectoryTest {
     TextIndexConfig config =
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, config);
-        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, config)) {
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, config);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, config)) {
       PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(), 
1024);
       buf.putInt(0, 111);
       buf = sfd.newBuffer("col2", StandardIndexes.dictionary(), 1024);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 52df382efa..3ebe041e87 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -91,6 +91,20 @@ public interface IndexCreationContext {
   boolean isTextCommitOnClose();
 
   ColumnStatistics getColumnStatistics();
+  /**
+   * This flags whether the index creation is done during realtime segment 
conversion
+   * @return
+   */
+  boolean isRealtimeConversion();
+
+  /**
+   * This contains immutableToMutableIdMap mapping generated in {@link 
SegmentIndexCreationDriver}
+   *
+   * This allows for index creation during realtime segment conversion to take 
advantage of mutable to immutable
+   * docId mapping
+   * @return
+   */
+  int[] getImmutableToMutableIdMap();
 
   final class Builder {
     private ColumnStatistics _columnStatistics;
@@ -112,6 +126,8 @@ public interface IndexCreationContext {
     private boolean _optimizedDictionary;
     private boolean _fixedLength;
     private boolean _textCommitOnClose;
+    private boolean _realtimeConversion = false;
+    private int[] _immutableToMutableIdMap;
 
     public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo 
columnIndexCreationInfo) {
       return 
withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry())
@@ -229,11 +245,22 @@ public interface IndexCreationContext {
       return this;
     }
 
+    public Builder withRealtimeConversion(boolean realtimeConversion) {
+      _realtimeConversion = realtimeConversion;
+      return this;
+    }
+
+    public Builder withImmutableToMutableIdMap(int[] immutableToMutableIdMap) {
+      _immutableToMutableIdMap = immutableToMutableIdMap;
+      return this;
+    }
+
     public Common build() {
       return new Common(Objects.requireNonNull(_indexDir), 
_lengthOfLongestEntry, _maxNumberOfMultiValueElements,
           _maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec), 
_sorted, _cardinality,
           _totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue, 
_maxValue, _forwardIndexDisabled,
-          _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength, 
_textCommitOnClose, _columnStatistics);
+          _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength, 
_textCommitOnClose, _columnStatistics,
+          _realtimeConversion, _immutableToMutableIdMap);
     }
 
     public Builder withSortedUniqueElementsArray(Object 
sortedUniqueElementsArray) {
@@ -267,13 +294,16 @@ public interface IndexCreationContext {
     private final boolean _fixedLength;
     private final boolean _textCommitOnClose;
     private final ColumnStatistics _columnStatistics;
+    private final boolean _realtimeConversion;
+    private final int[] _immutableToMutableIdMap;
 
     public Common(File indexDir, int lengthOfLongestEntry,
         int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean 
onHeap,
         FieldSpec fieldSpec, boolean sorted, int cardinality, int 
totalNumberOfEntries,
         int totalDocs, boolean hasDictionary, Comparable<?> minValue, 
Comparable<?> maxValue,
-        boolean forwardIndexDisabled, Object sortedUniqueElementsArray, 
boolean optimizeDictionary,
-        boolean fixedLength, boolean textCommitOnClose, ColumnStatistics 
columnStatistics) {
+        boolean forwardIndexDisabled, Object sortedUniqueElementsArray, 
boolean optimizeDictionary, boolean fixedLength,
+        boolean textCommitOnClose, ColumnStatistics columnStatistics, boolean 
realtimeConversion,
+        int[] immutableToMutableIdMap) {
       _indexDir = indexDir;
       _lengthOfLongestEntry = lengthOfLongestEntry;
       _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
@@ -293,6 +323,8 @@ public interface IndexCreationContext {
       _fixedLength = fixedLength;
       _textCommitOnClose = textCommitOnClose;
       _columnStatistics = columnStatistics;
+      _realtimeConversion = realtimeConversion;
+      _immutableToMutableIdMap = immutableToMutableIdMap;
     }
 
     public FieldSpec getFieldSpec() {
@@ -378,5 +410,15 @@ public interface IndexCreationContext {
     public ColumnStatistics getColumnStatistics() {
       return _columnStatistics;
     }
+
+    @Override
+    public boolean isRealtimeConversion() {
+      return _realtimeConversion;
+    }
+
+    @Override
+    public int[] getImmutableToMutableIdMap() {
+      return _immutableToMutableIdMap;
+    }
   }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
index 9adda03b72..dce1d5b1d4 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
@@ -46,7 +46,8 @@ public interface SegmentCreator extends Closeable, 
Serializable {
    * @throws Exception
    */
   void init(SegmentGeneratorConfig segmentCreationSpec, 
SegmentIndexCreationInfo segmentIndexCreationInfo,
-      TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema 
schema, File outDir)
+      TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema 
schema, File outDir,
+      @Nullable int[] immutableToMutableIdMap)
       throws Exception;
 
   /**
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index 6305dcd852..5381bdc430 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -120,6 +120,7 @@ public class SegmentGeneratorConfig implements Serializable 
{
   private boolean _optimizeDictionary = false;
   private boolean _optimizeDictionaryForMetrics = false;
   private double _noDictionarySizeRatioThreshold = 
IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD;
+  private boolean _realtimeConversion = false;
   private final Map<String, FieldIndexConfigs> _indexConfigsByColName;
 
   // constructed from FieldConfig
@@ -723,6 +724,14 @@ public class SegmentGeneratorConfig implements 
Serializable {
     return _noDictionarySizeRatioThreshold;
   }
 
+  public boolean isRealtimeConversion() {
+    return _realtimeConversion;
+  }
+
+  public void setRealtimeConversion(boolean realtimeConversion) {
+    _realtimeConversion = realtimeConversion;
+  }
+
   public void setNoDictionarySizeRatioThreshold(double 
noDictionarySizeRatioThreshold) {
     _noDictionarySizeRatioThreshold = noDictionarySizeRatioThreshold;
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/TextIndexConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/TextIndexConfig.java
index afbf7eb876..6c400a16db 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/TextIndexConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/TextIndexConfig.java
@@ -232,7 +232,7 @@ public class TextIndexConfig extends IndexConfig {
         && _fstType == that._fstType && Objects.equals(_rawValueForTextIndex, 
that._rawValueForTextIndex)
         && Objects.equals(_stopWordsInclude, that._stopWordsInclude) && 
Objects.equals(_stopWordsExclude,
         that._stopWordsExclude) && _luceneUseCompoundFile == 
that._luceneUseCompoundFile
-        && _luceneMaxBufferSizeMB == that._luceneMaxBufferSizeMB;
+        && _luceneMaxBufferSizeMB == that._luceneMaxBufferSizeMB && 
_luceneAnalyzerClass == that._luceneAnalyzerClass;
   }
 
   @Override
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
index 2a9c4d9a56..dc3bdc9869 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
@@ -60,4 +60,12 @@ public interface MutableIndex extends IndexReader {
    * @param docId The document id of the given row. A non-negative value.
    */
   void add(@Nonnull Object[] values, @Nullable int[] dictIds, int docId);
+
+  /**
+   * Commits the mutable index artifacts to disk. This is used in preparation 
for realtime segment conversion.
+   * commit() should be implemented to perform any required actions before 
using mutable segment artifacts to
+   * optimize realtime segment conversion.
+   */
+  default void commit() {
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to