This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6dd84b52d52 Add bad data handling for some IndexCreator::add()
functions to skip record or add dummy record (#16094)
6dd84b52d52 is described below
commit 6dd84b52d528526a45f783aa34414be5a4dee71f
Author: Sonam Mandal <[email protected]>
AuthorDate: Mon Jul 14 17:23:44 2025 -0700
Add bad data handling for some IndexCreator::add() functions to skip record
or add dummy record (#16094)
* Add bad data handling for some IndexCreator::add() functions to skip
record or add dummy record
* Add tableNameWithType to update the metric with table name included
* Address review comments
* Use continueOnError flag for FST and native text index creators
* Compared returned flattened record to default
JsonUtils.SKIPPED_FLATTENED_RECORD and update metric if they are equal
* Use _continueOnError even for JSON index
* Update H3 index creator to use continueOnError flag
* Address review comments
---
.../accounting/ResourceManagerAccountingTest.java | 3 +-
.../impl/inv/geospatial/BaseH3IndexCreator.java | 19 ++--
.../impl/inv/geospatial/OffHeapH3IndexCreator.java | 5 +-
.../impl/inv/geospatial/OnHeapH3IndexCreator.java | 5 +-
.../impl/inv/json/BaseJsonIndexCreator.java | 51 +++++++++-
.../impl/inv/json/OffHeapJsonIndexCreator.java | 5 +-
.../impl/inv/json/OnHeapJsonIndexCreator.java | 5 +-
.../impl/inv/text/LuceneFSTIndexCreator.java | 55 +++++++++--
.../creator/impl/text/NativeTextIndexCreator.java | 39 +++++++-
.../local/segment/index/h3/H3IndexType.java | 4 +-
.../local/segment/index/json/JsonIndexType.java | 5 +-
.../local/segment/index/text/TextIndexType.java | 3 +-
.../pinot/segment/local/utils/MetricUtils.java | 37 +++----
.../segment/local/segment/index/H3IndexTest.java | 64 +++++++++++-
.../segment/local/segment/index/JsonIndexTest.java | 89 +++++++++++++++--
.../index/creator/HnswVectorIndexCreatorTest.java | 95 ++++++++++++++++++
.../index/creator/LuceneFSTIndexCreatorTest.java | 72 ++++++++++++--
.../index/creator/LuceneTextIndexCreatorTest.java | 107 +++++++++++++++++++++
.../index/creator/NativeTextIndexCreatorTest.java | 87 ++++++++++++++++-
.../segment/store/FilePerIndexDirectoryTest.java | 2 +-
.../segment/spi/index/creator/FSTIndexCreator.java | 3 +-
.../spi/index/creator/JsonIndexCreator.java | 9 +-
.../java/org/apache/pinot/spi/utils/JsonUtils.java | 6 +-
23 files changed, 680 insertions(+), 90 deletions(-)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index b72cc621a97..38ab43ba220 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -520,7 +520,8 @@ public class ResourceManagerAccountingTest {
File indexDir = new File(FileUtils.getTempDirectory(),
"testJsonIndexExtractMapOOM");
FileUtils.forceMkdir(indexDir);
String colName = "col";
- try (JsonIndexCreator offHeapIndexCreator = new
OffHeapJsonIndexCreator(indexDir, colName, new JsonIndexConfig());
+ try (JsonIndexCreator offHeapIndexCreator = new
OffHeapJsonIndexCreator(indexDir, colName, "myTable_OFFLINE",
+ false, new JsonIndexConfig());
MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(new
JsonIndexConfig(), "table__0__1", "col")) {
// build json indexes
for (int i = 0; i < 1000000; i++) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java
index b3861073619..888d5cbb004 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial;
+import com.google.common.base.Preconditions;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
@@ -26,16 +27,14 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.segment.index.h3.H3IndexType;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.H3Utils;
+import org.apache.pinot.segment.local.utils.MetricUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
@@ -72,6 +71,7 @@ public abstract class BaseH3IndexCreator implements
GeoSpatialIndexCreator {
static final String BITMAP_VALUE_FILE_NAME = "bitmap.value.buf";
final String _tableNameWithType;
+ final boolean _continueOnError;
final File _indexFile;
final File _tempDir;
final File _dictionaryFile;
@@ -88,9 +88,11 @@ public abstract class BaseH3IndexCreator implements
GeoSpatialIndexCreator {
int _nextDocId;
- BaseH3IndexCreator(File indexDir, String columnName, String
tableNameWithType, H3IndexResolution resolution)
+ BaseH3IndexCreator(File indexDir, String columnName, String
tableNameWithType, boolean continueOnError,
+ H3IndexResolution resolution)
throws IOException {
_tableNameWithType = tableNameWithType;
+ _continueOnError = continueOnError;
_indexFile = new File(indexDir, columnName +
V1Constants.Indexes.H3_INDEX_FILE_EXTENSION);
_tempDir = new File(indexDir, columnName + TEMP_DIR_SUFFIX);
if (_tempDir.exists()) {
@@ -116,13 +118,14 @@ public abstract class BaseH3IndexCreator implements
GeoSpatialIndexCreator {
@Override
public void add(@Nullable Geometry geometry)
throws IOException {
- if (geometry == null || !(geometry instanceof Point)) {
- String metricKeyName =
- _tableNameWithType + "-" +
H3IndexType.INDEX_DISPLAY_NAME.toUpperCase(Locale.US) + "-indexingError";
- ServerMetrics.get().addMeteredTableValue(metricKeyName,
ServerMeter.INDEXING_FAILURES, 1);
+ if (_continueOnError && !(geometry instanceof Point)) {
+ MetricUtils.updateIndexingErrorMetric(_tableNameWithType,
H3IndexType.INDEX_DISPLAY_NAME);
_nextDocId++;
return;
}
+ Preconditions.checkState(geometry != null, "Null geometry record found and
continueOnError is disabled");
+ Preconditions.checkState(geometry instanceof Point, "H3 index can only be
applied to Point, got: %s",
+ geometry.getGeometryType());
Coordinate coordinate = geometry.getCoordinate();
// TODO: support multiple resolutions
long h3Id = H3Utils.H3_CORE.latLngToCell(coordinate.y, coordinate.x,
_lowestResolution);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java
index 35240582ca0..72fb3ddf066 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java
@@ -61,9 +61,10 @@ public class OffHeapH3IndexCreator extends
BaseH3IndexCreator {
private long _postingListChunkOffset;
- public OffHeapH3IndexCreator(File indexDir, String columnName, String
tableNameWithType, H3IndexResolution resolution)
+ public OffHeapH3IndexCreator(File indexDir, String columnName, String
tableNameWithType, boolean continueOnError,
+ H3IndexResolution resolution)
throws IOException {
- super(indexDir, columnName, tableNameWithType, resolution);
+ super(indexDir, columnName, tableNameWithType, continueOnError,
resolution);
_postingListFile = new File(_tempDir, POSTING_LIST_FILE_NAME);
_postingListOutputStream = new DataOutputStream(new
BufferedOutputStream(new FileOutputStream(_postingListFile)));
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java
index c9583a45d75..c02b83e1a17 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java
@@ -34,9 +34,10 @@ import org.roaringbitmap.RoaringBitmapWriter;
*/
public class OnHeapH3IndexCreator extends BaseH3IndexCreator {
- public OnHeapH3IndexCreator(File indexDir, String columnName, String
tableNameWithType, H3IndexResolution resolution)
+ public OnHeapH3IndexCreator(File indexDir, String columnName, String
tableNameWithType, boolean continueOnError,
+ H3IndexResolution resolution)
throws IOException {
- super(indexDir, columnName, tableNameWithType, resolution);
+ super(indexDir, columnName, tableNameWithType, continueOnError,
resolution);
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
index 5c3409fe45f..91479775b9f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.inv.json;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
@@ -31,6 +32,8 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.index.json.JsonIndexType;
+import org.apache.pinot.segment.local.utils.MetricUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.memory.CleanerUtil;
@@ -62,6 +65,8 @@ public abstract class BaseJsonIndexCreator implements
JsonIndexCreator {
static final String DICTIONARY_FILE_NAME = "dictionary.buf";
static final String INVERTED_INDEX_FILE_NAME = "inverted.index.buf";
+ final String _tableNameWithType;
+ final boolean _continueOnError;
final JsonIndexConfig _jsonIndexConfig;
final File _indexFile;
final File _tempDir;
@@ -74,8 +79,11 @@ public abstract class BaseJsonIndexCreator implements
JsonIndexCreator {
int _nextFlattenedDocId;
int _maxValueLength;
- BaseJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig
jsonIndexConfig)
+ BaseJsonIndexCreator(File indexDir, String columnName, String
tableNameWithType, boolean continueOnError,
+ JsonIndexConfig jsonIndexConfig)
throws IOException {
+ _tableNameWithType = tableNameWithType;
+ _continueOnError = continueOnError;
_jsonIndexConfig = jsonIndexConfig;
_indexFile = new File(indexDir, columnName +
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
_tempDir = new File(indexDir, columnName + TEMP_DIR_SUFFIX);
@@ -91,7 +99,46 @@ public abstract class BaseJsonIndexCreator implements
JsonIndexCreator {
@Override
public void add(String jsonString)
throws IOException {
- addFlattenedRecords(JsonUtils.flatten(jsonString, _jsonIndexConfig));
+ List<Map<String, String>> flattenedRecord;
+ try {
+ flattenedRecord = JsonUtils.flatten(jsonString, _jsonIndexConfig);
+ if (flattenedRecord == JsonUtils.SKIPPED_FLATTENED_RECORD) {
+ // The default SKIPPED_FLATTENED_RECORD was returned, this can only
happen if the original record could not be
+ // flattened, update the metric
+ MetricUtils.updateIndexingErrorMetric(_tableNameWithType,
JsonIndexType.INDEX_DISPLAY_NAME);
+ }
+ } catch (Exception e) {
+ if (_continueOnError) {
+ // Caught exception while trying to add, update metric and add a
default SKIPPED_FLATTENED_RECORD
+ // This check is needed in the case where
`_jsonIndexConfig.getSkipInvalidJson()` is false,
+ // but _continueOnError is true
+ MetricUtils.updateIndexingErrorMetric(_tableNameWithType,
JsonIndexType.INDEX_DISPLAY_NAME);
+ flattenedRecord = JsonUtils.SKIPPED_FLATTENED_RECORD;
+ } else {
+ throw e;
+ }
+ }
+ addFlattenedRecords(flattenedRecord);
+ }
+
+ @Override
+ public void add(Map value)
+ throws IOException {
+ String valueToAdd;
+ try {
+ // TODO: Avoid this ser/de from map -> string -> json node
+ valueToAdd = JsonUtils.objectToString(value);
+ } catch (JsonProcessingException e) {
+ if (_jsonIndexConfig.getSkipInvalidJson() || _continueOnError) {
+ // Caught exception while trying to add, update metric and add a
default SKIPPED_FLATTENED_RECORD
+ MetricUtils.updateIndexingErrorMetric(_tableNameWithType,
JsonIndexType.INDEX_DISPLAY_NAME);
+ addFlattenedRecords(JsonUtils.SKIPPED_FLATTENED_RECORD);
+ return;
+ } else {
+ throw e;
+ }
+ }
+ add(valueToAdd);
}
/**
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
index 30c5d2b63b3..7b99072124d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
@@ -70,9 +70,10 @@ public class OffHeapJsonIndexCreator extends
BaseJsonIndexCreator {
private int _numPostingListsInLastChunk;
private int _numPostingLists;
- public OffHeapJsonIndexCreator(File indexDir, String columnName,
JsonIndexConfig jsonIndexConfig)
+ public OffHeapJsonIndexCreator(File indexDir, String columnName, String
tableNameWithType, boolean continueOnError,
+ JsonIndexConfig jsonIndexConfig)
throws IOException {
- super(indexDir, columnName, jsonIndexConfig);
+ super(indexDir, columnName, tableNameWithType, continueOnError,
jsonIndexConfig);
_postingListFile = new File(_tempDir, POSTING_LIST_FILE_NAME);
_postingListOutputStream = new DataOutputStream(new
BufferedOutputStream(new FileOutputStream(_postingListFile)));
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
index bb45728c3f4..f09b8b717c5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
@@ -39,9 +39,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
*/
public class OnHeapJsonIndexCreator extends BaseJsonIndexCreator {
- public OnHeapJsonIndexCreator(File indexDir, String columnName,
JsonIndexConfig jsonIndexConfig)
+ public OnHeapJsonIndexCreator(File indexDir, String columnName, String
tableNameWithType, boolean continueOnError,
+ JsonIndexConfig jsonIndexConfig)
throws IOException {
- super(indexDir, columnName, jsonIndexConfig);
+ super(indexDir, columnName, tableNameWithType, continueOnError,
jsonIndexConfig);
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
index c490a79db10..c410003d1b0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
@@ -18,11 +18,14 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.inv.text;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.fst.FST;
+import org.apache.pinot.segment.local.segment.index.fst.FstIndexType;
+import org.apache.pinot.segment.local.utils.MetricUtils;
import org.apache.pinot.segment.local.utils.fst.FSTBuilder;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
@@ -40,6 +43,9 @@ import org.slf4j.LoggerFactory;
public class LuceneFSTIndexCreator implements FSTIndexCreator {
private static final Logger LOGGER =
LoggerFactory.getLogger(LuceneFSTIndexCreator.class);
private final File _fstIndexFile;
+ private final String _columnName;
+ private final String _tableNameWithType;
+ private final boolean _continueOnError;
private final FSTBuilder _fstBuilder;
Integer _dictId;
@@ -50,36 +56,69 @@ public class LuceneFSTIndexCreator implements
FSTIndexCreator {
*
* @param indexDir Index directory
* @param columnName Column name for which index is being created
+ * @param tableNameWithType table name with type
+ * @param continueOnError if true, don't throw exception on add() failures
* @param sortedEntries Sorted entries of the unique values of the column.
* @throws IOException
*/
- public LuceneFSTIndexCreator(File indexDir, String columnName, String[]
sortedEntries)
+ public LuceneFSTIndexCreator(File indexDir, String columnName, String
tableNameWithType, boolean continueOnError,
+ String[] sortedEntries)
throws IOException {
+ this(indexDir, columnName, tableNameWithType, continueOnError,
sortedEntries, new FSTBuilder());
+ }
+
+ @VisibleForTesting
+ public LuceneFSTIndexCreator(File indexDir, String columnName, String
tableNameWithType, boolean continueOnError,
+ String[] sortedEntries, FSTBuilder fstBuilder)
+ throws IOException {
+ _columnName = columnName;
+ _tableNameWithType = tableNameWithType;
+ _continueOnError = continueOnError;
_fstIndexFile = new File(indexDir, columnName +
V1Constants.Indexes.LUCENE_V912_FST_INDEX_FILE_EXTENSION);
- _fstBuilder = new FSTBuilder();
+ _fstBuilder = fstBuilder;
_dictId = 0;
if (sortedEntries != null) {
for (_dictId = 0; _dictId < sortedEntries.length; _dictId++) {
- _fstBuilder.addEntry(sortedEntries[_dictId], _dictId);
+ try {
+ _fstBuilder.addEntry(sortedEntries[_dictId], _dictId);
+ } catch (Exception ex) {
+ if (_continueOnError) {
+ // Caught exception while trying to add, update metric and skip
the document
+ MetricUtils.updateIndexingErrorMetric(_tableNameWithType,
FstIndexType.INDEX_DISPLAY_NAME);
+ } else {
+ LOGGER.error("Caught exception while trying to add to FST index
for table: {}, column: {}",
+ tableNameWithType, columnName, ex);
+ throw ex;
+ }
+ }
}
}
}
public LuceneFSTIndexCreator(IndexCreationContext context)
throws IOException {
- this(context.getIndexDir(), context.getFieldSpec().getName(), (String[])
context.getSortedUniqueElementsArray());
+ this(context.getIndexDir(), context.getFieldSpec().getName(),
context.getTableNameWithType(),
+ context.isContinueOnError(), (String[])
context.getSortedUniqueElementsArray());
}
// Expects dictionary entries in sorted order.
@Override
- public void add(String document) {
+ public void add(String document)
+ throws IOException {
try {
_fstBuilder.addEntry(document, _dictId);
- _dictId++;
- } catch (IOException ex) {
- throw new RuntimeException("Unable to load the schema file", ex);
+ } catch (Exception ex) {
+ if (_continueOnError) {
+ // Caught exception while trying to add, update metric and skip the
document
+ MetricUtils.updateIndexingErrorMetric(_tableNameWithType,
FstIndexType.INDEX_DISPLAY_NAME);
+ } else {
+ LOGGER.error("Caught exception while trying to add to FST index for
table: {}, column: {}",
+ _tableNameWithType, _columnName, ex);
+ throw ex;
+ }
}
+ _dictId++;
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java
index 7ef4d252143..dab54fb6408 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java
@@ -35,6 +35,8 @@ import
org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import
org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
import
org.apache.pinot.segment.local.segment.index.text.AbstractTextIndexCreator;
import
org.apache.pinot.segment.local.segment.index.text.CaseAwareStandardAnalyzer;
+import org.apache.pinot.segment.local.segment.index.text.TextIndexType;
+import org.apache.pinot.segment.local.utils.MetricUtils;
import org.apache.pinot.segment.local.utils.nativefst.FST;
import org.apache.pinot.segment.local.utils.nativefst.FSTHeader;
import org.apache.pinot.segment.local.utils.nativefst.builder.FSTBuilder;
@@ -42,11 +44,14 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.roaringbitmap.Container;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.RoaringBitmapWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
public class NativeTextIndexCreator extends AbstractTextIndexCreator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NativeTextIndexCreator.class);
private static final String TEMP_DIR_SUFFIX = ".nativetext.idx.tmp";
private static final String FST_FILE_NAME = "native.fst";
private static final String INVERTED_INDEX_FILE_NAME = "inverted.index.buf";
@@ -62,6 +67,8 @@ public class NativeTextIndexCreator extends
AbstractTextIndexCreator {
public static final int VERSION = 1;
private final String _columnName;
+ private final String _tableNameWithType;
+ private final boolean _continueOnError;
private final FSTBuilder _fstBuilder;
private final File _indexFile;
private final File _tempDir;
@@ -74,9 +81,11 @@ public class NativeTextIndexCreator extends
AbstractTextIndexCreator {
private int _fstDataSize;
private int _numBitMaps;
- public NativeTextIndexCreator(String column, File indexDir)
+ public NativeTextIndexCreator(String column, String tableNameWithType,
boolean continueOnError, File indexDir)
throws IOException {
_columnName = column;
+ _tableNameWithType = tableNameWithType;
+ _continueOnError = continueOnError;
_fstBuilder = new FSTBuilder();
_indexFile = new File(indexDir, column +
V1Constants.Indexes.NATIVE_TEXT_INDEX_FILE_EXTENSION);
_tempDir = new File(indexDir, column + TEMP_DIR_SUFFIX);
@@ -92,14 +101,36 @@ public class NativeTextIndexCreator extends
AbstractTextIndexCreator {
@Override
public void add(String document) {
- addHelper(document);
+ try {
+ addHelper(document);
+ } catch (RuntimeException e) {
+ if (_continueOnError) {
+ // Caught exception while trying to add, update metric and skip the
document
+ MetricUtils.updateIndexingErrorMetric(_tableNameWithType,
TextIndexType.INDEX_DISPLAY_NAME);
+ } else {
+ LOGGER.error("Caught exception while trying to add to native text
index for table: {}, column: {}",
+ _tableNameWithType, _columnName, e);
+ throw e;
+ }
+ }
_nextDocId++;
}
@Override
public void add(String[] documents, int length) {
- for (int i = 0; i < length; i++) {
- addHelper(documents[i]);
+ try {
+ for (int i = 0; i < length; i++) {
+ addHelper(documents[i]);
+ }
+ } catch (RuntimeException e) {
+ if (_continueOnError) {
+ // Caught exception while trying to add, update metric and skip the
document
+ MetricUtils.updateIndexingErrorMetric(_tableNameWithType,
TextIndexType.INDEX_DISPLAY_NAME);
+ } else {
+ LOGGER.error("Caught exception while trying to add to native text
index for table: {}, column: {}",
+ _tableNameWithType, _columnName, e);
+ throw e;
+ }
}
_nextDocId++;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
index 5d0996ef746..194ae655f58 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
@@ -109,9 +109,9 @@ public class H3IndexType extends
AbstractIndexType<H3IndexConfig, H3IndexReader,
H3IndexResolution resolution =
Objects.requireNonNull(indexConfig).getResolution();
return context.isOnHeap()
? new OnHeapH3IndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
- context.getTableNameWithType(), resolution)
+ context.getTableNameWithType(), context.isContinueOnError(),
resolution)
: new OffHeapH3IndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
- context.getTableNameWithType(), resolution);
+ context.getTableNameWithType(), context.isContinueOnError(),
resolution);
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
index 4943e7d0c3f..17ded67a0e0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
@@ -115,8 +115,9 @@ public class JsonIndexType extends
AbstractIndexType<JsonIndexConfig, JsonIndexR
Preconditions.checkState(storedType == DataType.STRING || storedType ==
DataType.MAP,
"Json index is currently only supported on STRING columns");
return context.isOnHeap() ? new
OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
- indexConfig)
- : new OffHeapJsonIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(), indexConfig);
+ context.getTableNameWithType(), context.isContinueOnError(),
indexConfig)
+ : new OffHeapJsonIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
+ context.getTableNameWithType(), context.isContinueOnError(),
indexConfig);
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
index dfe8616bd05..3c36ae2ac63 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
@@ -116,7 +116,8 @@ public class TextIndexType extends
AbstractIndexType<TextIndexConfig, TextIndexR
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType()
== FieldSpec.DataType.STRING,
"Text index is currently only supported on STRING type columns");
if (indexConfig.getFstType() == FSTType.NATIVE) {
- return new NativeTextIndexCreator(context.getFieldSpec().getName(),
context.getIndexDir());
+ return new NativeTextIndexCreator(context.getFieldSpec().getName(),
context.getTableNameWithType(),
+ context.isContinueOnError(), context.getIndexDir());
} else {
return new LuceneTextIndexCreator(context, indexConfig);
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java
similarity index 54%
copy from
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java
index 218b4f3961e..29695833b86 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java
@@ -16,34 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.index.creator;
+package org.apache.pinot.segment.local.utils;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import java.util.Locale;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
-public interface FSTIndexCreator extends IndexCreator {
+/**
+ * Utils for metrics
+ */
+public class MetricUtils {
- @Override
- default void add(Object value, int dictId)
- throws IOException {
- // FST indexes should do nothing when called for each row
- }
- @Override
- default void add(Object[] values, @Nullable int[] dictIds)
- throws IOException {
- // FST indexes should do nothing when called for each row
+ private MetricUtils() {
}
- /**
- * Adds the next document.
- */
- void add(String document);
-
- /**
- * Adds a set of documents to the index
- */
- void add(String[] document, int length);
+ public static void updateIndexingErrorMetric(String tableNameWithType,
String indexDisplayName) {
+ String metricKeyName =
+ tableNameWithType + "-" + indexDisplayName.toUpperCase(Locale.US) +
"-indexingError";
+ ServerMetrics.get().addMeteredTableValue(metricKeyName,
ServerMeter.INDEXING_FAILURES, 1);
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java
index 7493d43db6f..7e1bee3bf57 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java
@@ -92,9 +92,9 @@ public class H3IndexTest implements
PinotBuffersAfterMethodCheckRule {
try (MutableH3Index mutableH3Index = new
MutableH3Index(h3IndexResolution)) {
try (GeoSpatialIndexCreator onHeapCreator = new
OnHeapH3IndexCreator(TEMP_DIR, onHeapColumnName,
- "myTable_OFFLINE", h3IndexResolution);
+ "myTable_OFFLINE", false, h3IndexResolution);
GeoSpatialIndexCreator offHeapCreator = new
OffHeapH3IndexCreator(TEMP_DIR, offHeapColumnName,
- "myTable_OFFLINE", h3IndexResolution)) {
+ "myTable_OFFLINE", false, h3IndexResolution)) {
int docId = 0;
while (expectedCardinalities.size() < numUniqueH3Ids) {
double longitude = RANDOM.nextDouble() * 360 - 180;
@@ -134,7 +134,7 @@ public class H3IndexTest implements
PinotBuffersAfterMethodCheckRule {
int res = 5;
H3IndexResolution resolution = new
H3IndexResolution(Collections.singletonList(res));
- try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE",
+ try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE", true,
resolution)) {
Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new
Coordinate(10, 20));
creator.add(point);
@@ -160,7 +160,7 @@ public class H3IndexTest implements
PinotBuffersAfterMethodCheckRule {
int res = 5;
H3IndexResolution resolution = new
H3IndexResolution(Collections.singletonList(res));
- try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE",
+ try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE", true,
resolution)) {
Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new
Coordinate(10, 20));
creator.add(point);
@@ -186,7 +186,7 @@ public class H3IndexTest implements
PinotBuffersAfterMethodCheckRule {
int res = 5;
H3IndexResolution resolution = new
H3IndexResolution(Collections.singletonList(res));
- try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE",
+ try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE", true,
resolution)) {
Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new
Coordinate(10, 42));
creator.add(point);
@@ -208,6 +208,60 @@ public class H3IndexTest implements
PinotBuffersAfterMethodCheckRule {
}
}
+ @Test
+ public void testSkipInvalidGeometryContinueOnErrorFalse()
+ throws Exception {
+ String columnName = "skipInvalid";
+ int res = 5;
+ H3IndexResolution resolution = new
H3IndexResolution(Collections.singletonList(res));
+
+ try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE", false,
+ resolution)) {
+ Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new
Coordinate(10, 20));
+ creator.add(point);
+
+ // Invalid serialized bytes should be skipped without throwing exception
+ Assert.assertThrows(IllegalStateException.class, () -> creator.add(new
byte[]{1, 2, 3}, -1));
+ }
+ }
+
+ @Test
+ public void testSkipNullGeometryContinueOnErrorFalse()
+ throws Exception {
+ String columnName = "skipNull";
+ int res = 5;
+ H3IndexResolution resolution = new
H3IndexResolution(Collections.singletonList(res));
+
+ try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE", false,
+ resolution)) {
+ Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new
Coordinate(10, 20));
+ creator.add(point);
+
+ // Explicit null geometry should also be skipped
+ Assert.assertThrows(IllegalStateException.class, () ->
creator.add(null));
+ }
+ }
+
+ @Test
+ public void testSkipNonPointGeometryContinueOnErrorFalse()
+ throws Exception {
+ String columnName = "skipInvalidGeometryType";
+ int res = 5;
+ H3IndexResolution resolution = new
H3IndexResolution(Collections.singletonList(res));
+
+ try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR,
columnName, "myTable_OFFLINE", false,
+ resolution)) {
+ Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new
Coordinate(10, 42));
+ creator.add(point);
+
+ // Explicit non-point geometry should also be skipped
+ Point[] points = new Point[1];
+ points[0] = point;
+ MultiPoint multiPoint =
GeometryUtils.GEOMETRY_FACTORY.createMultiPoint(points);
+ Assert.assertThrows(IllegalStateException.class, () ->
creator.add(multiPoint));
+ }
+ }
+
public static class ConfTest extends AbstractSerdeIndexContract {
protected void assertEquals(H3IndexConfig expected) {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index 8f9b477ae20..b576a443942 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -397,9 +397,24 @@ public class JsonIndexTest implements
PinotBuffersAfterMethodCheckRule {
*/
private void createIndex(boolean createOnHeap, JsonIndexConfig
jsonIndexConfig, String[] records)
throws IOException {
+ createIndex(createOnHeap, jsonIndexConfig, records, false);
+ }
+
+ /**
+ * Creates a JSON index with the given config and adds the given records
+ * @param createOnHeap Whether to create an on-heap index
+ * @param jsonIndexConfig the JSON index config
+ * @param records the records to be added to the index
+ * @param continueOnError whether continueOnError should be enabled or
disabled
+ * @throws IOException on error
+ */
+ private void createIndex(boolean createOnHeap, JsonIndexConfig
jsonIndexConfig, String[] records, boolean continueOnError)
+ throws IOException {
try (JsonIndexCreator indexCreator = createOnHeap
- ? new OnHeapJsonIndexCreator(INDEX_DIR, ON_HEAP_COLUMN_NAME,
jsonIndexConfig)
- : new OffHeapJsonIndexCreator(INDEX_DIR, OFF_HEAP_COLUMN_NAME,
jsonIndexConfig)) {
+ ? new OnHeapJsonIndexCreator(INDEX_DIR, ON_HEAP_COLUMN_NAME,
"myTable_OFFLINE", continueOnError,
+ jsonIndexConfig)
+ : new OffHeapJsonIndexCreator(INDEX_DIR, OFF_HEAP_COLUMN_NAME,
"myTable_OFFLINE", continueOnError,
+ jsonIndexConfig)) {
for (String record : records) {
indexCreator.add(record);
}
@@ -453,7 +468,8 @@ public class JsonIndexTest implements
PinotBuffersAfterMethodCheckRule {
};
String colName = "col";
- try (JsonIndexCreator offHeapCreator = new
OffHeapJsonIndexCreator(INDEX_DIR, colName, getIndexConfig());
+ try (JsonIndexCreator offHeapCreator = new
OffHeapJsonIndexCreator(INDEX_DIR, colName, "myTable_OFFLINE",
+ false, getIndexConfig());
MutableJsonIndexImpl mutableIndex = new
MutableJsonIndexImpl(getIndexConfig(), "table__0__1", "col")) {
for (String record : records) {
offHeapCreator.add(record);
@@ -522,7 +538,8 @@ public class JsonIndexTest implements
PinotBuffersAfterMethodCheckRule {
// @formatter: on
String colName = "col";
- try (JsonIndexCreator offHeapCreator = new
OffHeapJsonIndexCreator(INDEX_DIR, colName, getIndexConfig());
+ try (JsonIndexCreator offHeapCreator = new
OffHeapJsonIndexCreator(INDEX_DIR, colName, "myTable_OFFLINE",
+ false, getIndexConfig());
MutableJsonIndexImpl mutableIndex = new
MutableJsonIndexImpl(getIndexConfig(), "table__0__1", "col")) {
for (String record : records) {
offHeapCreator.add(record);
@@ -817,7 +834,7 @@ public class JsonIndexTest implements
PinotBuffersAfterMethodCheckRule {
}
@Test
- public void testSkipInvalidJsonEnable() throws Exception {
+ public void testSkipInvalidJsonEnableContinueOnErrorFalse() throws Exception
{
JsonIndexConfig jsonIndexConfig = getIndexConfig();
jsonIndexConfig.setSkipInvalidJson(true);
// the braces don't match and cannot be parsed
@@ -849,8 +866,68 @@ public class JsonIndexTest implements
PinotBuffersAfterMethodCheckRule {
}
}
+ @Test
+ public void testSkipInvalidJsonEnableContinueOnErrorTrue() throws Exception {
+ JsonIndexConfig jsonIndexConfig = getIndexConfig();
+ jsonIndexConfig.setSkipInvalidJson(true);
+ // the braces don't match and cannot be parsed
+ String[] records = {"{\"key1\":\"va\""};
+
+ createIndex(true, jsonIndexConfig, records, true);
+ File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME +
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ assertTrue(onHeapIndexFile.exists());
+
+ createIndex(false, jsonIndexConfig, records, true);
+ File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME +
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ assertTrue(offHeapIndexFile.exists());
+
+ try (PinotDataBuffer onHeapBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+ PinotDataBuffer offHeapBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+ JsonIndexReader onHeapReader = new
ImmutableJsonIndexReader(onHeapBuffer, records.length);
+ JsonIndexReader offHeapReader = new
ImmutableJsonIndexReader(offHeapBuffer, records.length);
+ MutableJsonIndexImpl mutableJsonIndex = new
MutableJsonIndexImpl(jsonIndexConfig, "table__0__1", "col")) {
+ for (String record : records) {
+ mutableJsonIndex.add(record);
+ }
+ Map<String, RoaringBitmap> onHeapRes = getMatchingDocsMap(onHeapReader,
"$");
+ Map<String, RoaringBitmap> offHeapRes =
getMatchingDocsMap(offHeapReader, "$");
+ Map<String, RoaringBitmap> mutableRes =
mutableJsonIndex.getMatchingFlattenedDocsMap("$", null);
+ Object expectedRes =
Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT,
RoaringBitmap.bitmapOf(0));
+ assertEquals(onHeapRes, expectedRes);
+ assertEquals(offHeapRes, expectedRes);
+ assertEquals(mutableRes, expectedRes);
+ }
+ }
+
+ @Test
+ public void testSkipInvalidJsonDisabledContinueOnErrorTrue() throws
Exception {
+ // by default, skipInvalidJson is disabled
+ JsonIndexConfig jsonIndexConfig = getIndexConfig();
+ // the braces don't match and cannot be parsed
+ String[] records = {"{\"key1\":\"va\""};
+
+ createIndex(true, jsonIndexConfig, records, true);
+ File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME +
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ assertTrue(onHeapIndexFile.exists());
+
+ createIndex(false, jsonIndexConfig, records, true);
+ File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME +
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ assertTrue(offHeapIndexFile.exists());
+
+ try (PinotDataBuffer onHeapBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+ PinotDataBuffer offHeapBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+ JsonIndexReader onHeapReader = new
ImmutableJsonIndexReader(onHeapBuffer, records.length);
+ JsonIndexReader offHeapReader = new
ImmutableJsonIndexReader(offHeapBuffer, records.length)) {
+ Map<String, RoaringBitmap> onHeapRes = getMatchingDocsMap(onHeapReader,
"$");
+ Map<String, RoaringBitmap> offHeapRes =
getMatchingDocsMap(offHeapReader, "$");
+ Object expectedRes =
Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT,
RoaringBitmap.bitmapOf(0));
+ assertEquals(onHeapRes, expectedRes);
+ assertEquals(offHeapRes, expectedRes);
+ }
+ }
+
@Test(expectedExceptions = JsonProcessingException.class)
- public void testSkipInvalidJsonDisabled() throws Exception {
+ public void testSkipInvalidJsonDisabledContinueOnErrorFalse() throws
Exception {
// by default, skipInvalidJson is disabled
JsonIndexConfig jsonIndexConfig = getIndexConfig();
// the braces don't match and cannot be parsed
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java
new file mode 100644
index 00000000000..e1ed2625716
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.vector.HnswVectorIndexCreator;
+import
org.apache.pinot.segment.local.segment.index.readers.vector.HnswVectorIndexReader;
+import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class HnswVectorIndexCreatorTest {
+ private static final File INDEX_DIR =
+ new File(FileUtils.getTempDirectory(),
HnswVectorIndexCreatorTest.class.toString());
+ private VectorIndexConfig _config;
+
+ @BeforeMethod
+ public void setUp()
+ throws IOException {
+ FileUtils.forceMkdir(INDEX_DIR);
+
+ Map<String, String> properties = new HashMap<>();
+
+ properties.put("vectorIndexType", "HNSW");
+ properties.put("vectorDimension", "1536");
+
+ _config = new VectorIndexConfig(properties);
+ try (HnswVectorIndexCreator creator = new HnswVectorIndexCreator("foo",
INDEX_DIR, _config)) {
+ float[] values1 = new float[] {5.0F, 42.0F, 54.33333F, 42.24F,
1001.045F};
+ creator.add(values1);
+ float[] values2 = new float[] {42.0F, 23423.0F, 42431.32532F,
6785676.3242F, 42.3F};
+ creator.add(values2);
+ float[] values3 = new float[] {1.0F, 2.0F, 3.0F, 4.0F, 5.0F};
+ creator.add(values3);
+ float[] values4 = new float[] {42.678F, 23423423.0F, 42431.32523432F,
6723485.3242F, 42342.3F};
+ creator.add(values4);
+ creator.seal();
+ }
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws IOException {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+
+ @Test
+ public void testIndexWriterReaderWithTop3()
+ throws IOException {
+ // Use VectorIndex reader to validate that reads work
+ try (HnswVectorIndexReader reader = new HnswVectorIndexReader("foo",
INDEX_DIR, 4, _config)) {
+ int[] matchedDocIds = reader.getDocIds(new float[]{5.0F, 42.0F,
54.33333F, 42.24F, 3413.4F}, 3).toArray();
+ // Expect to get 3 matching docIds since topK = 3 is used
+ Assert.assertEquals(matchedDocIds.length, 3);
+ Assert.assertEquals(matchedDocIds[0], 0);
+ Assert.assertEquals(matchedDocIds[1], 2);
+ Assert.assertEquals(matchedDocIds[1], 2);
+ }
+ }
+
+ @Test
+ public void testIndexWriterReaderWithTop1()
+ throws IOException {
+ // Use VectorIndex reader to validate that reads work
+ try (HnswVectorIndexReader reader = new HnswVectorIndexReader("foo",
INDEX_DIR, 4, _config)) {
+ int[] matchedDocIds = reader.getDocIds(new float[]{1.0F, 2.0F, 3.0F,
4.0F, 5.0F}, 1).toArray();
+ // Expect to get 1 matching docId since topK = 1 is used
+ Assert.assertEquals(matchedDocIds.length, 1);
+ Assert.assertEquals(matchedDocIds[0], 2);
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java
index 0e88cc73e9f..870983e97ab 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java
@@ -25,15 +25,18 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
import
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
import
org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader;
+import org.apache.pinot.segment.local.utils.fst.FSTBuilder;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static
org.apache.pinot.segment.spi.V1Constants.Indexes.LUCENE_V912_FST_INDEX_FILE_EXTENSION;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
public class LuceneFSTIndexCreatorTest implements
PinotBuffersAfterMethodCheckRule {
@@ -59,9 +62,8 @@ public class LuceneFSTIndexCreatorTest implements
PinotBuffersAfterMethodCheckRu
uniqueValues[1] = "hello-world123";
uniqueValues[2] = "still";
- FieldSpec fieldSpec = new DimensionFieldSpec("testFSTColumn",
FieldSpec.DataType.STRING, true);
- LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(
- INDEX_DIR, "testFSTColumn", uniqueValues);
+ LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(INDEX_DIR,
"testFSTColumn", "myTable_OFFLINE",
+ false, uniqueValues);
creator.seal();
File fstFile = new File(INDEX_DIR, "testFSTColumn" +
LUCENE_V912_FST_INDEX_FILE_EXTENSION);
try (PinotDataBuffer pinotDataBuffer =
@@ -69,12 +71,64 @@ public class LuceneFSTIndexCreatorTest implements
PinotBuffersAfterMethodCheckRu
LuceneFSTIndexReader reader = new
LuceneFSTIndexReader(pinotDataBuffer)) {
int[] matchedDictIds = reader.getDictIds("hello.*").toArray();
- Assert.assertEquals(2, matchedDictIds.length);
- Assert.assertEquals(0, matchedDictIds[0]);
- Assert.assertEquals(1, matchedDictIds[1]);
+ Assert.assertEquals(matchedDictIds.length, 2);
+ Assert.assertEquals(matchedDictIds[0], 0);
+ Assert.assertEquals(matchedDictIds[1], 1);
matchedDictIds = reader.getDictIds(".*llo").toArray();
- Assert.assertEquals(0, matchedDictIds.length);
+ Assert.assertEquals(matchedDictIds.length, 0);
+
+ matchedDictIds = reader.getDictIds("st.*").toArray();
+ Assert.assertEquals(matchedDictIds.length, 1);
+ Assert.assertEquals(matchedDictIds[0], 2);
+ }
+ }
+
+ @Test
+ public void testIndexWriterReaderWithAddExceptionsContinueOnErrorTrue()
+ throws IOException {
+ String[] uniqueValues = new String[3];
+ uniqueValues[0] = "hello-world";
+ uniqueValues[1] = "hello-world123";
+ uniqueValues[2] = "still";
+
+ FSTBuilder fstBuilder = Mockito.spy(new FSTBuilder());
+ // For the word "still" throw an exception so it is not indexed
+ doThrow(IOException.class).when(fstBuilder).addEntry(eq("still"),
anyInt());
+ LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(INDEX_DIR,
"testFSTColumn", "myTable_OFFLINE",
+ true, uniqueValues, fstBuilder);
+ creator.seal();
+ File fstFile = new File(INDEX_DIR, "testFSTColumn" +
LUCENE_V912_FST_INDEX_FILE_EXTENSION);
+ try (PinotDataBuffer pinotDataBuffer =
+ PinotDataBuffer.mapFile(fstFile, true, 0, fstFile.length(),
ByteOrder.BIG_ENDIAN, "fstIndexFile");
+ LuceneFSTIndexReader reader = new
LuceneFSTIndexReader(pinotDataBuffer)) {
+
+ int[] matchedDictIds = reader.getDictIds("hello.*").toArray();
+ Assert.assertEquals(matchedDictIds.length, 2);
+ Assert.assertEquals(matchedDictIds[0], 0);
+ Assert.assertEquals(matchedDictIds[1], 1);
+
+ matchedDictIds = reader.getDictIds(".*llo").toArray();
+ Assert.assertEquals(matchedDictIds.length, 0);
+
+ // Validate that nothing matches st.*
+ matchedDictIds = reader.getDictIds("st.*").toArray();
+ Assert.assertEquals(matchedDictIds.length, 0);
}
}
+
+ @Test
+ public void testIndexWriterReaderWithAddExceptionsContinueOnErrorFalse()
+ throws IOException {
+ String[] uniqueValues = new String[3];
+ uniqueValues[0] = "hello-world";
+ uniqueValues[1] = "hello-world123";
+ uniqueValues[2] = "still";
+
+ FSTBuilder fstBuilder = Mockito.spy(new FSTBuilder());
+ // For the word "still" throw an exception so it is not indexed
+ doThrow(IOException.class).when(fstBuilder).addEntry(eq("still"),
anyInt());
+ Assert.assertThrows(IOException.class, () -> new
LuceneFSTIndexCreator(INDEX_DIR, "testFSTColumn",
+ "myTable_OFFLINE", false, uniqueValues, fstBuilder));
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java
new file mode 100644
index 00000000000..c8278188be6
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader;
+import org.apache.pinot.segment.spi.index.TextIndexConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class LuceneTextIndexCreatorTest {
+ private static final File INDEX_DIR =
+ new File(FileUtils.getTempDirectory(),
LuceneTextIndexCreatorTest.class.toString());
+
+ @BeforeMethod
+ public void setUp()
+ throws IOException {
+ FileUtils.forceMkdir(INDEX_DIR);
+
+ TextIndexConfig config = new TextIndexConfig(false, null, null, false,
false, null, null, true, 500, null, null,
+ null, null, false, false, 0, false, null);
+ try (LuceneTextIndexCreator creator = new LuceneTextIndexCreator("foo",
INDEX_DIR, true, false, null, null,
+ config)) {
+ creator.add("{\"clean\":\"this\"}");
+ creator.add("{\"retain\":\"this\"}");
+ creator.add("{\"keep\":\"this\"}");
+ creator.add("{\"hold\":\"this\"}");
+ creator.add("{\"clean\":\"that\"}");
+ creator.seal();
+ }
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws IOException {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+
+ @Test
+ public void testIndexWriterReaderMatchClean()
+ throws IOException {
+ // Use TextIndex reader to validate that reads work
+ try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo",
INDEX_DIR, 5, new HashMap<>())) {
+ int[] matchedDocIds = reader.getDocIds("clean").toArray();
+ Assert.assertEquals(matchedDocIds.length, 2);
+ Assert.assertEquals(matchedDocIds[0], 0);
+ Assert.assertEquals(matchedDocIds[1], 4);
+ }
+ }
+
+ @Test
+ public void testIndexWriterReaderMatchHold()
+ throws IOException {
+ // Use TextIndex reader to validate that reads work
+ try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo",
INDEX_DIR, 5, new HashMap<>())) {
+ int[] matchedDocIds = reader.getDocIds("hold").toArray();
+ Assert.assertEquals(matchedDocIds.length, 1);
+ Assert.assertEquals(matchedDocIds[0], 3);
+ }
+ }
+
+ @Test
+ public void testIndexWriterReaderMatchRetain()
+ throws IOException {
+ // Use TextIndex reader to validate that reads work
+ try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo",
INDEX_DIR, 5, new HashMap<>())) {
+ int[] matchedDocIds = reader.getDocIds("retain").toArray();
+ Assert.assertEquals(matchedDocIds.length, 1);
+ Assert.assertEquals(matchedDocIds[0], 1);
+ }
+ }
+
+ @Test
+ public void testIndexWriterReaderMatchWithOrClause()
+ throws IOException {
+ // Use TextIndex reader to validate that reads work
+ try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo",
INDEX_DIR, 5, new HashMap<>())) {
+ int[] matchedDocIds = reader.getDocIds("retain|keep").toArray();
+ Assert.assertEquals(matchedDocIds.length, 2);
+ Assert.assertEquals(matchedDocIds[0], 1);
+ Assert.assertEquals(matchedDocIds[1], 2);
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java
index 34189413aa2..2729c42ef4a 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java
@@ -24,13 +24,15 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
import
org.apache.pinot.segment.local.segment.creator.impl.text.NativeTextIndexCreator;
import
org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static
org.apache.pinot.segment.spi.V1Constants.Indexes.NATIVE_TEXT_INDEX_FILE_EXTENSION;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
@@ -58,8 +60,8 @@ public class NativeTextIndexCreatorTest implements
PinotBuffersAfterMethodCheckR
uniqueValues[2] = "still";
uniqueValues[3] = "zoobar";
- FieldSpec fieldSpec = new DimensionFieldSpec("testFSTColumn",
FieldSpec.DataType.STRING, true);
- try (NativeTextIndexCreator creator = new
NativeTextIndexCreator("testFSTColumn", INDEX_DIR)) {
+ try (NativeTextIndexCreator creator = new
NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE", false,
+ INDEX_DIR)) {
for (int i = 0; i < 4; i++) {
creator.add(uniqueValues[i]);
}
@@ -93,4 +95,81 @@ public class NativeTextIndexCreatorTest implements
PinotBuffersAfterMethodCheckR
}
}
}
+
+ @Test
+ public void testIndexWriterReaderWithAddExceptionsWithContinueOnErrorTrue()
+ throws IOException {
+ String[] uniqueValues = new String[4];
+ uniqueValues[0] = "still";
+ uniqueValues[1] = "zoobar";
+ uniqueValues[2] = "hello-world";
+ uniqueValues[3] = "hello-world123";
+
+ try (NativeTextIndexCreator creator = spy(new
NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE",
+ true, INDEX_DIR))) {
+ // Add a couple of words so they show up in the index
+ for (int i = 0; i < 2; i++) {
+ creator.add(uniqueValues[i]);
+ }
+
+ // Throw exception for the remaining words
+ doThrow(RuntimeException.class).when(creator).analyze(anyString());
+ for (int i = 2; i < 4; i++) {
+ creator.add(uniqueValues[i]);
+ }
+
+ creator.seal();
+ }
+
+ File fstFile = new File(INDEX_DIR, "testFSTColumn" +
NATIVE_TEXT_INDEX_FILE_EXTENSION);
+ try (NativeTextIndexReader reader = new
NativeTextIndexReader("testFSTColumn", fstFile.getParentFile())) {
+ try {
+ int[] matchedDocIds = reader.getDocIds("hello.*").toArray();
+ assertEquals(matchedDocIds.length, 0);
+
+ matchedDocIds = reader.getDocIds(".*llo").toArray();
+ assertEquals(matchedDocIds.length, 0);
+
+ matchedDocIds = reader.getDocIds("wor.*").toArray();
+ assertEquals(matchedDocIds.length, 0);
+
+ matchedDocIds = reader.getDocIds("zoo.*").toArray();
+ assertEquals(matchedDocIds.length, 1);
+ assertEquals(matchedDocIds[0], 1);
+
+ matchedDocIds = reader.getDocIds(".*il.*").toArray();
+ assertEquals(matchedDocIds.length, 1);
+ assertEquals(matchedDocIds[0], 0);
+
+ matchedDocIds = reader.getDocIds(".*").toArray();
+ assertEquals(matchedDocIds.length, 2);
+ assertEquals(matchedDocIds[0], 0);
+ assertEquals(matchedDocIds[1], 1);
+ } finally {
+ reader.closeInTest();
+ }
+ }
+ }
+
+ @Test
+ public void testIndexWriterReaderWithAddExceptionsWithContinueOnErrorFalse()
+ throws IOException {
+ String[] uniqueValues = new String[4];
+ uniqueValues[0] = "still";
+ uniqueValues[1] = "zoobar";
+ uniqueValues[2] = "hello-world";
+ uniqueValues[3] = "hello-world123";
+
+ try (NativeTextIndexCreator creator = spy(new
NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE",
+ false, INDEX_DIR))) {
+ // Add a couple of words so they show up in the index
+ for (int i = 0; i < 2; i++) {
+ creator.add(uniqueValues[i]);
+ }
+
+ // Throw exception for the remaining words
+ doThrow(RuntimeException.class).when(creator).analyze(anyString());
+ Assert.assertThrows(RuntimeException.class, () ->
creator.add(uniqueValues[2]));
+ }
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
index e1349741bfd..f41133629b5 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
@@ -178,7 +178,7 @@ public class FilePerIndexDirectoryTest implements
PinotBuffersAfterMethodCheckRu
throws IOException {
// See https://github.com/apache/pinot/issues/11529
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- NativeTextIndexCreator fooCreator = new NativeTextIndexCreator("foo",
TEMP_DIR)) {
+ NativeTextIndexCreator fooCreator = new NativeTextIndexCreator("foo",
"myTable_OFFLINE", false, TEMP_DIR)) {
fooCreator.add("{\"clean\":\"this\"}");
fooCreator.seal();
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
index 218b4f3961e..f2174082342 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
@@ -40,7 +40,8 @@ public interface FSTIndexCreator extends IndexCreator {
/**
* Adds the next document.
*/
- void add(String document);
+ void add(String document)
+ throws IOException;
/**
* Adds a set of documents to the index
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
index 62be6acc478..7fef8e04827 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.IndexCreator;
-import org.apache.pinot.spi.utils.JsonUtils;
/**
@@ -36,7 +35,7 @@ public interface JsonIndexCreator extends IndexCreator {
default void add(Object value, int dictId)
throws IOException {
if (value instanceof Map) {
- add(JsonUtils.objectToString(value));
+ add((Map) value);
} else {
add((String) value);
}
@@ -52,6 +51,12 @@ public interface JsonIndexCreator extends IndexCreator {
void add(String jsonString)
throws IOException;
+ /**
+ * Adds the next json value for Map type
+ */
+ void add(Map jsonMap)
+ throws IOException;
+
/**
* Seals the index and flushes it to disk.
*/
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index ff2309968fa..83b7d351b89 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -79,7 +79,7 @@ public class JsonUtils {
public static final String ARRAY_INDEX_KEY = ".$index";
public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$";
public static final int MAX_COMBINATIONS = 100_000;
- private static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD =
+ public static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD =
Collections.singletonList(Collections.singletonMap(VALUE_KEY,
SKIPPED_VALUE_REPLACEMENT));
// For querying
@@ -736,14 +736,14 @@ public class JsonUtils {
JsonNode jsonNode;
try {
jsonNode = JsonUtils.stringToJsonNode(jsonString);
- } catch (JsonProcessingException e) {
+ return JsonUtils.flatten(jsonNode, jsonIndexConfig);
+ } catch (Exception e) {
if (jsonIndexConfig.getSkipInvalidJson()) {
return SKIPPED_FLATTENED_RECORD;
} else {
throw e;
}
}
- return JsonUtils.flatten(jsonNode, jsonIndexConfig);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]