This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2c6a84b8ec Improved segment build time for Lucene text index realtime
to offline conversion (#12744)
2c6a84b8ec is described below
commit 2c6a84b8ec840ed679c0a98230c518522b6591dd
Author: Christopher Peck <[email protected]>
AuthorDate: Mon Apr 15 09:34:37 2024 -0700
Improved segment build time for Lucene text index realtime to offline
conversion (#12744)
* reuse mutable lucene index during segment conversion
* realtime segment conversion only
* add RealtimeSegmentConverter test for index reuse path
* clarify naming
* fix missed renaming
* address comments, close all resources
---
.../indexsegment/mutable/MutableSegmentImpl.java | 13 ++
.../converter/RealtimeSegmentConverter.java | 3 +
.../invertedindex/RealtimeLuceneTextIndex.java | 13 +-
.../creator/impl/SegmentColumnarIndexCreator.java | 5 +-
.../impl/SegmentIndexCreationDriverImpl.java | 45 +++++-
.../creator/impl/text/LuceneTextIndexCreator.java | 153 ++++++++++++++++++++-
.../index/readers/text/LuceneTextIndexReader.java | 2 +-
.../converter/RealtimeSegmentConverterTest.java | 153 ++++++++++++++++++++-
.../segment/store/FilePerIndexDirectoryTest.java | 11 +-
.../store/SingleFileIndexDirectoryTest.java | 8 +-
.../segment/spi/creator/IndexCreationContext.java | 48 ++++++-
.../pinot/segment/spi/creator/SegmentCreator.java | 3 +-
.../spi/creator/SegmentGeneratorConfig.java | 9 ++
.../pinot/segment/spi/index/TextIndexConfig.java | 2 +-
.../segment/spi/index/mutable/MutableIndex.java | 8 ++
15 files changed, 442 insertions(+), 34 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 55e0aec072..b336b30f20 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -932,6 +932,19 @@ public class MutableSegmentImpl implements MutableSegment {
}
}
+ /**
+ * Calls commit() on all mutable indexes. This is used in preparation for
realtime segment conversion.
+ * .commit() can be implemented per index to perform any required actions
before using mutable segment
+ * artifacts to optimize immutable segment build.
+ */
+ public void commit() {
+ for (IndexContainer indexContainer : _indexContainerMap.values()) {
+ for (MutableIndex mutableIndex :
indexContainer._mutableIndexes.values()) {
+ mutableIndex.commit();
+ }
+ }
+ }
+
@Override
public void destroy() {
_logger.info("Trying to close RealtimeSegmentImpl : {}", _segmentName);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index ffb9bfc23f..0bf8fe571f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -118,6 +118,9 @@ public class RealtimeSegmentConverter {
genConfig.setNullHandlingEnabled(_nullHandlingEnabled);
genConfig.setSegmentZKPropsConfig(_segmentZKPropsConfig);
+ // flush any artifacts to disk to improve mutable to immutable segment
conversion
+ _realtimeSegmentImpl.commit();
+
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader()) {
int[] sortedDocIds = _columnIndicesForRealtimeTable.getSortedColumn() !=
null
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index a71d2663ed..8d2e43c8a5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -78,7 +78,7 @@ public class RealtimeLuceneTextIndex implements
MutableTextIndex {
// for realtime
_indexCreator =
new LuceneTextIndexCreator(column, new
File(segmentIndexDir.getAbsolutePath() + "/" + segmentName),
- false /* commitOnClose */, config);
+ false /* commitOnClose */, true, null, config);
IndexWriter indexWriter = _indexCreator.getIndexWriter();
_searcherManager = new SearcherManager(indexWriter, false, false, null);
_analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
@@ -181,6 +181,17 @@ public class RealtimeLuceneTextIndex implements
MutableTextIndex {
return actualDocIDs;
}
+ @Override
+ public void commit() {
+ try {
+ _indexCreator.getIndexWriter().commit();
+ } catch (Exception e) {
+ LOGGER.error("Failed to commit the realtime lucene text index for column
{}, exception {}", _column,
+ e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public void close() {
try {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 2d7909b407..168490635a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -106,7 +106,8 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
@Override
public void init(SegmentGeneratorConfig segmentCreationSpec,
SegmentIndexCreationInfo segmentIndexCreationInfo,
- TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema
schema, File outDir)
+ TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema
schema, File outDir,
+ @Nullable int[] immutableToMutableIdMap)
throws Exception {
_docIdCounter = 0;
_config = segmentCreationSpec;
@@ -158,6 +159,8 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
.onHeap(segmentCreationSpec.isOnHeap())
.withForwardIndexDisabled(forwardIndexDisabled)
.withTextCommitOnClose(true)
+ .withImmutableToMutableIdMap(immutableToMutableIdMap)
+ .withRealtimeConversion(segmentCreationSpec.isRealtimeConversion())
.build();
//@formatter:on
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index e99d89c8b4..ecfea58ca7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -36,6 +36,7 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
@@ -191,6 +192,11 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
((RecordReaderSegmentCreationDataSource)
dataSource).setTransformPipeline(transformPipeline);
}
+ // Optimization for realtime segment conversion
+ if (dataSource instanceof RealtimeSegmentSegmentCreationDataSource) {
+ _config.setRealtimeConversion(true);
+ }
+
// Initialize stats collection
_segmentStats = dataSource.gatherStats(
new StatsCollectorConfig(config.getTableConfig(), _dataSchema,
config.getSegmentPartitionConfig()));
@@ -218,6 +224,23 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
LOGGER.debug("tempIndexDir:{}", _tempIndexDir);
}
+ /**
+ * Generate a mutable docId to immutable docId mapping from the sortedDocIds
iteration order
+ *
+ * @param sortedDocIds used to map sortedDocIds[immutableId] = mutableId
(based on RecordReader iteration order)
+ * @return int[] used to map output[mutableId] = immutableId, or null if
sortedDocIds is null
+ */
+ private int[] getImmutableToMutableIdMap(@Nullable int[] sortedDocIds) {
+ if (sortedDocIds == null) {
+ return null;
+ }
+ int[] res = new int[sortedDocIds.length];
+ for (int i = 0; i < res.length; i++) {
+ res[sortedDocIds[i]] = i;
+ }
+ return res;
+ }
+
@Override
public void build()
throws Exception {
@@ -229,10 +252,19 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
int incompleteRowsFound = 0;
try {
+ // TODO: Eventually pull the doc Id sorting logic out of Record Reader
so that all row oriented logic can be
+ // removed from this code.
+ int[] immutableToMutableIdMap = null;
+ if (_recordReader instanceof PinotSegmentRecordReader) {
+ immutableToMutableIdMap =
+ getImmutableToMutableIdMap(((PinotSegmentRecordReader)
_recordReader).getSortedDocIds());
+ }
+
// Initialize the index creation using the per-column statistics
information
// TODO: _indexCreationInfoMap holds the reference to all unique values
on heap (ColumnIndexCreationInfo ->
// ColumnStatistics) throughout the segment creation. Find a way
to release the memory early.
- _indexCreator.init(_config, _segmentIndexCreationInfo,
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+ _indexCreator.init(_config, _segmentIndexCreationInfo,
_indexCreationInfoMap, _dataSchema, _tempIndexDir,
+ immutableToMutableIdMap);
// Build the index
_recordReader.rewind();
@@ -299,19 +331,22 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
LOGGER.info("Collected stats for {} documents", _totalDocs);
try {
+ // TODO: Eventually pull the doc Id sorting logic out of Record Reader
so that all row oriented logic can be
+ // removed from this code.
+ int[] sortedDocIds = ((PinotSegmentRecordReader)
_recordReader).getSortedDocIds();
+ int[] immutableToMutableIdMap = getImmutableToMutableIdMap(sortedDocIds);
+
// Initialize the index creation using the per-column statistics
information
// TODO: _indexCreationInfoMap holds the reference to all unique values
on heap (ColumnIndexCreationInfo ->
// ColumnStatistics) throughout the segment creation. Find a way
to release the memory early.
- _indexCreator.init(_config, _segmentIndexCreationInfo,
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+ _indexCreator.init(_config, _segmentIndexCreationInfo,
_indexCreationInfoMap, _dataSchema, _tempIndexDir,
+ immutableToMutableIdMap);
// Build the indexes
LOGGER.info("Start building Index by column");
TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
- // TODO: Eventually pull the doc Id sorting logic out of Record Reader
so that all row oriented logic can be
- // removed from this code.
- int[] sortedDocIds = ((PinotSegmentRecordReader)
_recordReader).getSortedDocIds();
for (String col : columns) {
_indexCreator.indexColumn(col, sortedDocIds, indexSegment);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
index f14cf62bc6..49306d9404 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
@@ -20,8 +20,11 @@ package
org.apache.pinot.segment.local.segment.creator.impl.text;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.HashSet;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.CharArraySet;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
@@ -29,8 +32,11 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
@@ -41,6 +47,10 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.index.TextIndexConfig;
import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -49,12 +59,15 @@ import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCr
* and realtime from {@link RealtimeLuceneTextIndex}
*/
public class LuceneTextIndexCreator extends AbstractTextIndexCreator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LuceneTextIndexCreator.class);
public static final String LUCENE_INDEX_DOC_ID_COLUMN_NAME = "DocID";
private final String _textColumn;
- private final Directory _indexDirectory;
- private final IndexWriter _indexWriter;
-
+ private final boolean _commitOnClose;
+ private final boolean _reuseMutableIndex;
+ private final File _indexFile;
+ private Directory _indexDirectory;
+ private IndexWriter _indexWriter;
private int _nextDocId = 0;
public static HashSet<String> getDefaultEnglishStopWordsSet() {
@@ -75,6 +88,7 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
* @param segmentIndexDir segment index directory
* @param commit true if the index should be committed (at the end after all
documents have
* been added), false if index should not be committed
+ * @param immutableToMutableIdMap immutableToMutableIdMap from segment
conversion
* Note on commit:
* Once {@link SegmentColumnarIndexCreator}
* finishes indexing all documents/rows for the segment, we
need to commit and close
@@ -90,14 +104,19 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
* to offline), we close this lucene index writer to release
resources but don't commit.
* @param config the text index config
*/
- public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean
commit, TextIndexConfig config) {
+ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean
commit, boolean realtimeConversion,
+ @Nullable int[] immutableToMutableIdMap, TextIndexConfig config) {
_textColumn = column;
+ _commitOnClose = commit;
+
+ // to reuse the mutable index, it must be (1) not the realtime index, i.e.
commit is set to false
+ // and (2) happens during realtime segment conversion
+ _reuseMutableIndex = commit && realtimeConversion;
String luceneAnalyzerClass = config.getLuceneAnalyzerClass();
try {
// segment generation is always in V1 and later we convert (as part of
post creation processing)
// to V3 if segmentVersion is set to V3 in SegmentGeneratorConfig.
- File indexFile = getV1TextIndexFile(segmentIndexDir);
- _indexDirectory = FSDirectory.open(indexFile.toPath());
+ _indexFile = getV1TextIndexFile(segmentIndexDir);
Analyzer luceneAnalyzer;
if (luceneAnalyzerClass.isEmpty() ||
luceneAnalyzerClass.equals(StandardAnalyzer.class.getName())) {
@@ -111,6 +130,15 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
indexWriterConfig.setRAMBufferSizeMB(config.getLuceneMaxBufferSizeMB());
indexWriterConfig.setCommitOnClose(commit);
indexWriterConfig.setUseCompoundFile(config.isLuceneUseCompoundFile());
+
+ if (_reuseMutableIndex) {
+ LOGGER.info("Reusing the realtime lucene index for segment {} and
column {}", segmentIndexDir, column);
+
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
+ convertMutableSegment(segmentIndexDir, immutableToMutableIdMap,
indexWriterConfig);
+ return;
+ }
+
+ _indexDirectory = FSDirectory.open(_indexFile.toPath());
_indexWriter = new IndexWriter(_indexDirectory, indexWriterConfig);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(
@@ -122,15 +150,102 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
}
public LuceneTextIndexCreator(IndexCreationContext context, TextIndexConfig
indexConfig) {
- this(context.getFieldSpec().getName(), context.getIndexDir(),
context.isTextCommitOnClose(), indexConfig);
+ this(context.getFieldSpec().getName(), context.getIndexDir(),
context.isTextCommitOnClose(),
+ context.isRealtimeConversion(), context.getImmutableToMutableIdMap(),
indexConfig);
}
public IndexWriter getIndexWriter() {
return _indexWriter;
}
+ /**
+ * Copy the mutable lucene index files to create an immutable lucene index
+ * @param segmentIndexDir segment index directory
+ * @param immutableToMutableIdMap immutableToMutableIdMap from segment
conversion
+ * @param indexWriterConfig indexWriterConfig
+ */
+ private void convertMutableSegment(File segmentIndexDir, @Nullable int[]
immutableToMutableIdMap,
+ IndexWriterConfig indexWriterConfig) {
+ try {
+ // Copy the mutable index to the v1 index location
+ File dest = getV1TextIndexFile(segmentIndexDir);
+ File mutableDir = getMutableIndexDir(segmentIndexDir);
+ FileUtils.copyDirectory(mutableDir, dest);
+
+ // Remove the copied write.lock file
+ File writeLock = new File(dest, "write.lock");
+ FileUtils.delete(writeLock);
+
+ // Call .forceMerge(1) on the copied index as the mutable index will
likely contain many Lucene segments
+ try (Directory destDirectory = FSDirectory.open(dest.toPath());
+ IndexWriter indexWriter = new IndexWriter(destDirectory,
indexWriterConfig)) {
+ indexWriter.forceMerge(1, true);
+ indexWriter.commit();
+
+ buildMappingFile(segmentIndexDir, _textColumn, destDirectory,
immutableToMutableIdMap);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build the mapping file during
segment conversion: " + e);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to convert the mutable lucene index:
" + e);
+ }
+ }
+
+ /**
+ * Generate the mapping file from mutable Pinot docId (stored within the
Lucene index) to immutable Pinot docId using
+ * the immutableToMutableIdMap from segment conversion
+ * @param segmentIndexDir segment index directory
+ * @param column column name
+ * @param directory directory of the index
+ * @param immutableToMutableIdMap immutableToMutableIdMap from segment
conversion
+ */
+ private void buildMappingFile(File segmentIndexDir, String column, Directory
directory,
+ @Nullable int[] immutableToMutableIdMap)
+ throws IOException {
+ IndexReader indexReader = DirectoryReader.open(directory);
+ IndexSearcher indexSearcher = new IndexSearcher(indexReader);
+
+ int numDocs = indexSearcher.getIndexReader().numDocs();
+ int length = Integer.BYTES * numDocs;
+ File docIdMappingFile = new
File(SegmentDirectoryPaths.findSegmentDirectory(segmentIndexDir),
+ column +
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
+ String desc = "Text index docId mapping buffer: " + column;
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(docIdMappingFile, /*
readOnly */ false, 0, length,
+ ByteOrder.LITTLE_ENDIAN, desc)) {
+ try {
+ // If immutableToMutableIdMap is null, then docIds should not change
between the mutable and immutable segments.
+ // Therefore, the mapping file can be built without doing an
additional docId conversion
+ if (immutableToMutableIdMap == null) {
+ for (int i = 0; i < numDocs; i++) {
+ Document document = indexSearcher.doc(i);
+ int pinotDocId =
Integer.parseInt(document.get(LuceneTextIndexCreator.LUCENE_INDEX_DOC_ID_COLUMN_NAME));
+ buffer.putInt(i * Integer.BYTES, pinotDocId);
+ }
+ return;
+ }
+
+ for (int i = 0; i < numDocs; i++) {
+ Document document = indexSearcher.doc(i);
+ int mutablePinotDocId =
+
Integer.parseInt(document.get(LuceneTextIndexCreator.LUCENE_INDEX_DOC_ID_COLUMN_NAME));
+ int immutablePinotDocId = immutableToMutableIdMap[mutablePinotDocId];
+ buffer.putInt(i * Integer.BYTES, immutablePinotDocId);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Caught exception while building mutable to immutable doc id
mapping for text index column: " + column, e);
+ }
+ } finally {
+ indexReader.close();
+ }
+ }
+
@Override
public void add(String document) {
+ if (_reuseMutableIndex) {
+ return; // no-op
+ }
+
// text index on SV column
Document docToIndex = new Document();
docToIndex.add(new TextField(_textColumn, document, Field.Store.NO));
@@ -145,6 +260,10 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
@Override
public void add(String[] documents, int length) {
+ if (_reuseMutableIndex) {
+ return; // no-op
+ }
+
Document docToIndex = new Document();
// Whenever multiple fields with the same name appear in one document,
both the
@@ -165,6 +284,9 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
@Override
public void seal() {
+ if (_reuseMutableIndex) {
+ return; // no-op
+ }
try {
// Do this one time operation of combining the multiple lucene index
files (if any)
// into a single index file. Based on flush threshold and size of data,
Lucene
@@ -190,12 +312,20 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
@Override
public void close()
throws IOException {
+ if (_reuseMutableIndex) {
+ return; // no-op
+ }
try {
// based on the commit flag set in IndexWriterConfig, this will decide
to commit or not
_indexWriter.close();
_indexDirectory.close();
} catch (Exception e) {
throw new RuntimeException("Caught exception while closing the Lucene
index for column: " + _textColumn, e);
+ } finally {
+ // remove leftover write.lock file, as well as artifacts from .commit()
being called on the realtime index
+ if (!_commitOnClose) {
+ FileUtils.deleteQuietly(_indexFile);
+ }
}
}
@@ -203,4 +333,13 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
String luceneIndexDirectory = _textColumn +
V1Constants.Indexes.LUCENE_V9_TEXT_INDEX_FILE_EXTENSION;
return new File(indexDir, luceneIndexDirectory);
}
+
+ private File getMutableIndexDir(File indexDir) {
+ // tmpSegmentName format: tmp-tableName__9__1__20240227T0254Z-1709002522086
+ String tmpSegmentName = indexDir.getParentFile().getName();
+ String segmentName =
tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4,
tmpSegmentName.lastIndexOf('-'));
+ String mutableDir = indexDir.getParentFile().getParentFile().getParent() +
"/consumers/" + segmentName + "/"
+ + _textColumn +
V1Constants.Indexes.LUCENE_V9_TEXT_INDEX_FILE_EXTENSION;
+ return new File(mutableDir);
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
index 3a0efabe8c..07eb52f88b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
@@ -171,7 +171,7 @@ public class LuceneTextIndexReader implements
TextIndexReader {
return docIds;
} catch (Exception e) {
String msg =
- "Caught excepttion while searching the text index for column:" +
_column + " search query:" + searchQuery;
+ "Caught exception while searching the text index for column:" +
_column + " search query:" + searchQuery;
throw new RuntimeException(msg, e);
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index ded9e85b69..e4ed4bb396 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -24,6 +24,8 @@ import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -36,6 +38,7 @@ import
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
import
org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
@@ -44,9 +47,12 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.TextIndexConfig;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
@@ -58,6 +64,8 @@ import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -101,7 +109,7 @@ public class RealtimeSegmentConverterTest {
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
.setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
.setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
@@ -167,7 +175,7 @@ public class RealtimeSegmentConverterTest {
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+ new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
.setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1,
LONG_COLUMN1))
.setSortedColumn(LONG_COLUMN1)
@@ -252,7 +260,7 @@ public class RealtimeSegmentConverterTest {
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
.setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
.setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
@@ -319,7 +327,7 @@ public class RealtimeSegmentConverterTest {
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+ new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
.setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1,
LONG_COLUMN1))
.setSortedColumn(LONG_COLUMN1)
@@ -433,6 +441,130 @@ public class RealtimeSegmentConverterTest {
}
}
+ @DataProvider
+ public static Object[][] reuseParams() {
+ List<Boolean> enabledColumnMajorSegmentBuildParams = Arrays.asList(false,
true);
+ String[] sortedColumnParams = new String[]{null, STRING_COLUMN1};
+
+ return enabledColumnMajorSegmentBuildParams.stream().flatMap(
+ columnMajor -> Arrays.stream(sortedColumnParams).map(sortedColumn
-> new Object[]{columnMajor,
+ sortedColumn}))
+ .toArray(Object[][]::new);
+ }
+
+ // Test the realtime segment conversion of a table with an index that reuses
mutable index artifacts during conversion
+ @Test(dataProvider = "reuseParams")
+ public void testSegmentBuilderWithReuse(boolean columnMajorSegmentBuilder,
String sortedColumn)
+ throws Exception {
+ File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+ FieldConfig textIndexFieldConfig =
+ new
FieldConfig.Builder(STRING_COLUMN1).withEncodingType(FieldConfig.EncodingType.RAW)
+
.withIndexTypes(Collections.singletonList(FieldConfig.IndexType.TEXT)).build();
+ List<FieldConfig> fieldConfigList =
Collections.singletonList(textIndexFieldConfig);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+ .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1))
+
.setSortedColumn(sortedColumn).setColumnMajorSegmentBuilderEnabled(columnMajorSegmentBuilder)
+ .setFieldConfigList(fieldConfigList).build();
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN1,
FieldSpec.DataType.STRING)
+ .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+ String tableNameWithType = tableConfig.getTableName();
+ String segmentName = "testTable__0__0__123456";
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ TextIndexConfig textIndexConfig =
+ new TextIndexConfig(false, null, null, false, false,
Collections.emptyList(), Collections.emptyList(), false,
+ 500, null, false);
+
+ RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+ new
RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
+ .setIndex(Sets.newHashSet(STRING_COLUMN1),
StandardIndexes.inverted(), IndexConfig.ENABLED)
+ .setIndex(Sets.newHashSet(STRING_COLUMN1), StandardIndexes.text(),
textIndexConfig)
+
.setFieldConfigList(fieldConfigList).setSegmentZKMetadata(getSegmentZKMetadata(segmentName))
+ .setOffHeap(true).setMemoryManager(new
DirectMemoryManager(segmentName))
+ .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new
File(tmpDir, "stats")))
+ .setConsumerDir(new File(tmpDir, "consumers").getAbsolutePath());
+
+ // create mutable segment impl
+ RealtimeLuceneTextIndexSearcherPool.init(1);
+ MutableSegmentImpl mutableSegmentImpl = new
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+ List<GenericRow> rows = generateTestDataForReusePath();
+
+ for (GenericRow row : rows) {
+ mutableSegmentImpl.index(row, null);
+ }
+
+ // build converted segment
+ File outputDir = new File(new File(tmpDir, segmentName), "tmp-" +
segmentName + "-" + System.currentTimeMillis());
+ SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
+ segmentZKPropsConfig.setStartOffset("1");
+ segmentZKPropsConfig.setEndOffset("100");
+ ColumnIndicesForRealtimeTable cdc = new
ColumnIndicesForRealtimeTable(sortedColumn,
+ indexingConfig.getInvertedIndexColumns(),
Collections.singletonList(STRING_COLUMN1), null,
+ indexingConfig.getNoDictionaryColumns(),
indexingConfig.getVarLengthDictionaryColumns());
+ RealtimeSegmentConverter converter =
+ new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig,
outputDir.getAbsolutePath(), schema,
+ tableNameWithType, tableConfig, segmentName, cdc, false);
+ converter.build(SegmentVersion.v3, null);
+
+ File indexDir = new File(outputDir, segmentName);
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
+ assertEquals(segmentMetadata.getTotalDocs(), rows.size());
+ assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+
+ long expectedStartTime = (long) rows.get(0).getValue(DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getStartTime(), expectedStartTime);
+ long expectedEndTime = (long) rows.get(rows.size() -
1).getValue(DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getEndTime(), expectedEndTime);
+
+
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+ assertEquals(segmentMetadata.getStartOffset(), "1");
+ assertEquals(segmentMetadata.getEndOffset(), "100");
+
+ // read converted segment
+ SegmentLocalFSDirectory segmentDir = new SegmentLocalFSDirectory(indexDir,
segmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Reader segmentReader = segmentDir.createReader();
+
+ Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
+ Map<String, ColumnMetadata> columnMetadataMap =
segmentMetadata.getColumnMetadataMap();
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null,
tableConfig);
+ for (Map.Entry<String, ColumnMetadata> entry :
columnMetadataMap.entrySet()) {
+ indexContainerMap.put(entry.getKey(),
+ new PhysicalColumnIndexContainer(segmentReader, entry.getValue(),
indexLoadingConfig));
+ }
+ ImmutableSegmentImpl segmentFile = new ImmutableSegmentImpl(segmentDir,
segmentMetadata, indexContainerMap, null);
+
+ // test forward index contents
+ GenericRow readRow = new GenericRow();
+ int docId = 0;
+ for (int i = 0; i < rows.size(); i++) {
+ GenericRow row;
+ if (sortedColumn == null) {
+ row = rows.get(i);
+ } else {
+ row = rows.get(rows.size() - i - 1);
+ }
+
+ segmentFile.getRecord(docId, readRow);
+ assertEquals(readRow.getValue(STRING_COLUMN1),
row.getValue(STRING_COLUMN1));
+ assertEquals(readRow.getValue(DATE_TIME_COLUMN),
row.getValue(DATE_TIME_COLUMN));
+ docId += 1;
+ }
+
+ // test docId conversion
+ TextIndexReader textIndexReader = segmentFile.getIndex(STRING_COLUMN1,
StandardIndexes.text());
+ if (sortedColumn == null) {
+ assertEquals(textIndexReader.getDocIds("str-8"),
ImmutableRoaringBitmap.bitmapOf(0));
+ assertEquals(textIndexReader.getDocIds("str-4"),
ImmutableRoaringBitmap.bitmapOf(4));
+ } else {
+ assertEquals(textIndexReader.getDocIds("str-8"),
ImmutableRoaringBitmap.bitmapOf(7));
+ assertEquals(textIndexReader.getDocIds("str-4"),
ImmutableRoaringBitmap.bitmapOf(3));
+ }
+ }
+
private List<GenericRow> generateTestData() {
LinkedList<GenericRow> rows = new LinkedList<>();
@@ -457,6 +589,19 @@ public class RealtimeSegmentConverterTest {
return rows;
}
+ private List<GenericRow> generateTestDataForReusePath() {
+ List<GenericRow> rows = new LinkedList<>();
+
+ for (int i = 0; i < 8; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue(STRING_COLUMN1, "str" + (i - 8));
+ row.putValue(DATE_TIME_COLUMN, 1697814309L + i);
+ rows.add(row);
+ }
+
+ return rows;
+ }
+
private SegmentZKMetadata getSegmentZKMetadata(String segmentName) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
segmentZKMetadata.setCreationTime(System.currentTimeMillis());
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
index f60de6d12d..38eae8436e 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
@@ -204,8 +204,8 @@ public class FilePerIndexDirectoryTest {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null,
true, 500, null, false);
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, config);
- LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, config)) {
+ LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, config);
+ LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, config)) {
PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(),
1024);
buf.putInt(0, 1);
@@ -236,8 +236,7 @@ public class FilePerIndexDirectoryTest {
new File(TEMP_DIR, "foo" +
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
}
assertTrue(new File(TEMP_DIR, "bar" +
V1Constants.Indexes.LUCENE_V9_TEXT_INDEX_FILE_EXTENSION).exists());
- assertTrue(
- new File(TEMP_DIR, "bar" +
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
+ assertTrue(new File(TEMP_DIR, "bar" +
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
// Read indices back and check the content.
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap)) {
@@ -268,8 +267,8 @@ public class FilePerIndexDirectoryTest {
new TextIndexConfig(false, null, null, false, false, null, null,
true, 500, null, false);
// Write sth to buffers and flush them to index files on disk
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, config);
- LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, config)) {
+ LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, config);
+ LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, config)) {
PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(),
1024);
buf.putInt(0, 111);
buf = fpi.newBuffer("col2", StandardIndexes.dictionary(), 1024);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
index 2849466636..7f0dcebb05 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
@@ -237,8 +237,8 @@ public class SingleFileIndexDirectoryTest {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null,
true, 500, null, false);
try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, config);
- LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, config)) {
+ LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, config);
+ LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, config)) {
PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(),
1024);
buf.putInt(0, 1);
@@ -343,8 +343,8 @@ public class SingleFileIndexDirectoryTest {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null,
true, 500, null, false);
try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, config);
- LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, config)) {
+ LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, config);
+ LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, config)) {
PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(),
1024);
buf.putInt(0, 111);
buf = sfd.newBuffer("col2", StandardIndexes.dictionary(), 1024);
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 52df382efa..3ebe041e87 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -91,6 +91,20 @@ public interface IndexCreationContext {
boolean isTextCommitOnClose();
ColumnStatistics getColumnStatistics();
+ /**
+ * This flags whether the index creation is done during realtime segment
conversion
+ * @return
+ */
+ boolean isRealtimeConversion();
+
+ /**
+ * This contains immutableToMutableIdMap mapping generated in {@link
SegmentIndexCreationDriver}
+ *
+ * This allows for index creation during realtime segment conversion to take
advantage of mutable to immutable
+ * docId mapping
+ * @return
+ */
+ int[] getImmutableToMutableIdMap();
final class Builder {
private ColumnStatistics _columnStatistics;
@@ -112,6 +126,8 @@ public interface IndexCreationContext {
private boolean _optimizedDictionary;
private boolean _fixedLength;
private boolean _textCommitOnClose;
+ private boolean _realtimeConversion = false;
+ private int[] _immutableToMutableIdMap;
public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo
columnIndexCreationInfo) {
return
withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry())
@@ -229,11 +245,22 @@ public interface IndexCreationContext {
return this;
}
+ public Builder withRealtimeConversion(boolean realtimeConversion) {
+ _realtimeConversion = realtimeConversion;
+ return this;
+ }
+
+ public Builder withImmutableToMutableIdMap(int[] immutableToMutableIdMap) {
+ _immutableToMutableIdMap = immutableToMutableIdMap;
+ return this;
+ }
+
public Common build() {
return new Common(Objects.requireNonNull(_indexDir),
_lengthOfLongestEntry, _maxNumberOfMultiValueElements,
_maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec),
_sorted, _cardinality,
_totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue,
_maxValue, _forwardIndexDisabled,
- _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength,
_textCommitOnClose, _columnStatistics);
+ _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength,
_textCommitOnClose, _columnStatistics,
+ _realtimeConversion, _immutableToMutableIdMap);
}
public Builder withSortedUniqueElementsArray(Object
sortedUniqueElementsArray) {
@@ -267,13 +294,16 @@ public interface IndexCreationContext {
private final boolean _fixedLength;
private final boolean _textCommitOnClose;
private final ColumnStatistics _columnStatistics;
+ private final boolean _realtimeConversion;
+ private final int[] _immutableToMutableIdMap;
public Common(File indexDir, int lengthOfLongestEntry,
int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean
onHeap,
FieldSpec fieldSpec, boolean sorted, int cardinality, int
totalNumberOfEntries,
int totalDocs, boolean hasDictionary, Comparable<?> minValue,
Comparable<?> maxValue,
- boolean forwardIndexDisabled, Object sortedUniqueElementsArray,
boolean optimizeDictionary,
- boolean fixedLength, boolean textCommitOnClose, ColumnStatistics
columnStatistics) {
+ boolean forwardIndexDisabled, Object sortedUniqueElementsArray,
boolean optimizeDictionary, boolean fixedLength,
+ boolean textCommitOnClose, ColumnStatistics columnStatistics, boolean
realtimeConversion,
+ int[] immutableToMutableIdMap) {
_indexDir = indexDir;
_lengthOfLongestEntry = lengthOfLongestEntry;
_maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
@@ -293,6 +323,8 @@ public interface IndexCreationContext {
_fixedLength = fixedLength;
_textCommitOnClose = textCommitOnClose;
_columnStatistics = columnStatistics;
+ _realtimeConversion = realtimeConversion;
+ _immutableToMutableIdMap = immutableToMutableIdMap;
}
public FieldSpec getFieldSpec() {
@@ -378,5 +410,15 @@ public interface IndexCreationContext {
public ColumnStatistics getColumnStatistics() {
return _columnStatistics;
}
+
+ @Override
+ public boolean isRealtimeConversion() {
+ return _realtimeConversion;
+ }
+
+ @Override
+ public int[] getImmutableToMutableIdMap() {
+ return _immutableToMutableIdMap;
+ }
}
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
index 9adda03b72..dce1d5b1d4 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
@@ -46,7 +46,8 @@ public interface SegmentCreator extends Closeable,
Serializable {
* @throws Exception
*/
void init(SegmentGeneratorConfig segmentCreationSpec,
SegmentIndexCreationInfo segmentIndexCreationInfo,
- TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema
schema, File outDir)
+ TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema
schema, File outDir,
+ @Nullable int[] immutableToMutableIdMap)
throws Exception;
/**
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index 6305dcd852..5381bdc430 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -120,6 +120,7 @@ public class SegmentGeneratorConfig implements Serializable
{
private boolean _optimizeDictionary = false;
private boolean _optimizeDictionaryForMetrics = false;
private double _noDictionarySizeRatioThreshold =
IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD;
+ private boolean _realtimeConversion = false;
private final Map<String, FieldIndexConfigs> _indexConfigsByColName;
// constructed from FieldConfig
@@ -723,6 +724,14 @@ public class SegmentGeneratorConfig implements
Serializable {
return _noDictionarySizeRatioThreshold;
}
+ public boolean isRealtimeConversion() {
+ return _realtimeConversion;
+ }
+
+ public void setRealtimeConversion(boolean realtimeConversion) {
+ _realtimeConversion = realtimeConversion;
+ }
+
public void setNoDictionarySizeRatioThreshold(double
noDictionarySizeRatioThreshold) {
_noDictionarySizeRatioThreshold = noDictionarySizeRatioThreshold;
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/TextIndexConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/TextIndexConfig.java
index afbf7eb876..6c400a16db 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/TextIndexConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/TextIndexConfig.java
@@ -232,7 +232,7 @@ public class TextIndexConfig extends IndexConfig {
&& _fstType == that._fstType && Objects.equals(_rawValueForTextIndex,
that._rawValueForTextIndex)
&& Objects.equals(_stopWordsInclude, that._stopWordsInclude) &&
Objects.equals(_stopWordsExclude,
that._stopWordsExclude) && _luceneUseCompoundFile ==
that._luceneUseCompoundFile
- && _luceneMaxBufferSizeMB == that._luceneMaxBufferSizeMB;
+ && _luceneMaxBufferSizeMB == that._luceneMaxBufferSizeMB &&
_luceneAnalyzerClass == that._luceneAnalyzerClass;
}
@Override
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
index 2a9c4d9a56..dc3bdc9869 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
@@ -60,4 +60,12 @@ public interface MutableIndex extends IndexReader {
* @param docId The document id of the given row. A non-negative value.
*/
void add(@Nonnull Object[] values, @Nullable int[] dictIds, int docId);
+
+ /**
+ * Commits the mutable index artifacts to disk. This is used in preparation
for realtime segment conversion.
+ * commit() should be implemented to perform any required actions before
using mutable segment artifacts to
+ * optimize realtime segment conversion.
+ */
+ default void commit() {
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]