This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8684e046c0 Skip invalid json string rather than throwing error during
json indexing (#12238)
8684e046c0 is described below
commit 8684e046c0bce504224a3a6179be6b51117511ce
Author: Xuanyi Li <[email protected]>
AuthorDate: Thu Feb 8 15:20:21 2024 -0800
Skip invalid json string rather than throwing error during json indexing
(#12238)
---
.../realtime/impl/json/MutableJsonIndexImpl.java | 3 +-
.../impl/inv/json/BaseJsonIndexCreator.java | 2 +-
.../segment/local/segment/index/JsonIndexTest.java | 47 ++++++++++++++++++++++
.../pinot/spi/config/table/JsonIndexConfig.java | 20 +++++++--
.../java/org/apache/pinot/spi/utils/JsonUtils.java | 19 ++++++++-
5 files changed, 84 insertions(+), 7 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 46e7798204..7e632a19b2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -78,8 +78,7 @@ public class MutableJsonIndexImpl implements MutableJsonIndex
{
public void add(String jsonString)
throws IOException {
try {
- List<Map<String, String>> flattenedRecords =
- JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString),
_jsonIndexConfig);
+ List<Map<String, String>> flattenedRecords =
JsonUtils.flatten(jsonString, _jsonIndexConfig);
_writeLock.lock();
try {
addFlattenedRecords(flattenedRecords);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
index 264bc6044b..5c3409fe45 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
@@ -91,7 +91,7 @@ public abstract class BaseJsonIndexCreator implements
JsonIndexCreator {
@Override
public void add(String jsonString)
throws IOException {
-
addFlattenedRecords(JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString),
_jsonIndexConfig));
+ addFlattenedRecords(JsonUtils.flatten(jsonString, _jsonIndexConfig));
}
/**
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index 4d3cbf8cae..461f8eb93e 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -18,9 +18,11 @@
*/
package org.apache.pinot.segment.local.segment.index;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@@ -69,6 +71,7 @@ public class JsonIndexTest {
FileUtils.deleteDirectory(INDEX_DIR);
}
+
@Test
public void testSmallIndex()
throws Exception {
@@ -371,6 +374,50 @@ public class JsonIndexTest {
}
}
+ @Test
+ public void testSkipInvalidJsonEnable() throws Exception {
+ JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+ jsonIndexConfig.setSkipInvalidJson(true);
+ // the braces don't match and cannot be parsed
+ String[] records = {"{\"key1\":\"va\""};
+
+ createIndex(true, jsonIndexConfig, records);
+ File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME +
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ Assert.assertTrue(onHeapIndexFile.exists());
+
+ createIndex(false, jsonIndexConfig, records);
+ File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME +
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ Assert.assertTrue(offHeapIndexFile.exists());
+
+ try (PinotDataBuffer onHeapDataBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+ PinotDataBuffer offHeapDataBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+ JsonIndexReader onHeapIndexReader = new
ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
+ JsonIndexReader offHeapIndexReader = new
ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
+ MutableJsonIndexImpl mutableJsonIndex = new
MutableJsonIndexImpl(jsonIndexConfig)) {
+ for (String record : records) {
+ mutableJsonIndex.add(record);
+ }
+ Map<String, RoaringBitmap> onHeapRes =
onHeapIndexReader.getMatchingDocsMap("");
+ Map<String, RoaringBitmap> offHeapRes =
offHeapIndexReader.getMatchingDocsMap("");
+ Map<String, RoaringBitmap> mutableRes =
mutableJsonIndex.getMatchingDocsMap("");
+ Map<String, RoaringBitmap> expectedRes =
Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT,
+ RoaringBitmap.bitmapOf(0));
+ Assert.assertEquals(expectedRes, onHeapRes);
+ Assert.assertEquals(expectedRes, offHeapRes);
+ Assert.assertEquals(expectedRes, mutableRes);
+ }
+ }
+
+ @Test(expectedExceptions = JsonProcessingException.class)
+ public void testSkipInvalidJsonDisabled() throws Exception {
+ // by default, skipInvalidJson is disabled
+ JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+ // the braces don't match and cannot be parsed
+ String[] records = {"{\"key1\":\"va\""};
+
+ createIndex(true, jsonIndexConfig, records);
+ }
+
public static class ConfTest extends AbstractSerdeIndexContract {
@Test
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
index cada2fe4a4..1a0964138e 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
@@ -41,6 +41,8 @@ import javax.annotation.Nullable;
* - excludeFields: Exclude the given fields, e.g. "b", "c", even if it is
under the included paths.
* - maxValueLength: Exclude field values which are longer than this length. A
value of "0" disables this filter.
* Excluded values will be replaced with
JsonUtils.SKIPPED_VALUE_REPLACEMENT.
+ * - skipInvalidJson: If the raw data is not a valid json string, then replace
with {"":SKIPPED_VALUE_REPLACEMENT}
+ * and continue indexing on following Json records.
*/
public class JsonIndexConfig extends IndexConfig {
public static final JsonIndexConfig DISABLED = new JsonIndexConfig(true);
@@ -52,6 +54,7 @@ public class JsonIndexConfig extends IndexConfig {
private Set<String> _excludePaths;
private Set<String> _excludeFields;
private int _maxValueLength = 0;
+ private boolean _skipInvalidJson = false;
public JsonIndexConfig() {
super(false);
@@ -68,7 +71,8 @@ public class JsonIndexConfig extends IndexConfig {
@JsonProperty("includePaths") @Nullable Set<String> includePaths,
@JsonProperty("excludePaths") @Nullable Set<String> excludePaths,
@JsonProperty("excludeFields") @Nullable Set<String> excludeFields,
- @JsonProperty("maxValueLength") int maxValueLength) {
+ @JsonProperty("maxValueLength") int maxValueLength,
+ @JsonProperty("skipInvalidJson") boolean skipInvalidJson) {
super(disabled);
_maxLevels = maxLevels;
_excludeArray = excludeArray;
@@ -77,6 +81,7 @@ public class JsonIndexConfig extends IndexConfig {
_excludePaths = excludePaths;
_excludeFields = excludeFields;
_maxValueLength = maxValueLength;
+ _skipInvalidJson = skipInvalidJson;
}
public int getMaxLevels() {
@@ -143,6 +148,14 @@ public class JsonIndexConfig extends IndexConfig {
_maxValueLength = maxValueLength;
}
+ public boolean getSkipInvalidJson() {
+ return _skipInvalidJson;
+ }
+
+ public void setSkipInvalidJson(boolean skipInvalidJson) {
+ _skipInvalidJson = skipInvalidJson;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -158,12 +171,13 @@ public class JsonIndexConfig extends IndexConfig {
return _maxLevels == config._maxLevels && _excludeArray ==
config._excludeArray
&& _disableCrossArrayUnnest == config._disableCrossArrayUnnest &&
Objects.equals(_includePaths,
config._includePaths) && Objects.equals(_excludePaths,
config._excludePaths) && Objects.equals(_excludeFields,
- config._excludeFields) && _maxValueLength == config._maxValueLength;
+ config._excludeFields) && _maxValueLength == config._maxValueLength
+ && _skipInvalidJson == config._skipInvalidJson;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _maxLevels, _excludeArray,
_disableCrossArrayUnnest, _includePaths,
- _excludePaths, _excludeFields, _maxValueLength);
+ _excludePaths, _excludeFields, _maxValueLength, _skipInvalidJson);
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index 8593b2f8cb..396c21ad74 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -76,6 +76,8 @@ public class JsonUtils {
public static final String ARRAY_INDEX_KEY = ".$index";
public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$";
public static final int MAX_COMBINATIONS = 100_000;
+ private static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD =
+ Collections.singletonList(Collections.singletonMap(VALUE_KEY,
SKIPPED_VALUE_REPLACEMENT));
// For querying
public static final String WILDCARD = "*";
@@ -356,7 +358,7 @@ public class JsonUtils {
* ]
* </pre>
*/
- public static List<Map<String, String>> flatten(JsonNode node,
JsonIndexConfig jsonIndexConfig) {
+ protected static List<Map<String, String>> flatten(JsonNode node,
JsonIndexConfig jsonIndexConfig) {
try {
return flatten(node, jsonIndexConfig, 0, "$", false);
} catch (OutOfMemoryError oom) {
@@ -719,4 +721,19 @@ public class JsonUtils {
}
}
}
+
+ public static List<Map<String, String>> flatten(String jsonString,
JsonIndexConfig jsonIndexConfig)
+ throws IOException {
+ JsonNode jsonNode;
+ try {
+ jsonNode = JsonUtils.stringToJsonNode(jsonString);
+ } catch (JsonProcessingException e) {
+ if (jsonIndexConfig.getSkipInvalidJson()) {
+ return SKIPPED_FLATTENED_RECORD;
+ } else {
+ throw e;
+ }
+ }
+ return JsonUtils.flatten(jsonNode, jsonIndexConfig);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]