This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new be6dd7e092 add instrumentation to json index
getMatchingFlattenedDocsMap() (#13164)
be6dd7e092 is described below
commit be6dd7e092435444c2d0366c12478653705b47e3
Author: Christopher Peck <[email protected]>
AuthorDate: Thu May 23 15:08:10 2024 -0700
add instrumentation to json index getMatchingFlattenedDocsMap() (#13164)
---
.../JsonExtractIndexTransformFunction.java | 56 ++++++-----
.../accounting/ResourceManagerAccountingTest.java | 110 +++++++++++++++++++++
.../realtime/impl/json/MutableJsonIndexImpl.java | 2 +
.../readers/json/ImmutableJsonIndexReader.java | 2 +
4 files changed, 147 insertions(+), 23 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
index b499b7384c..ffc835e7cb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
@@ -48,6 +48,8 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
private JsonIndexReader _jsonIndexReader;
private Object _defaultValue;
private Map<String, RoaringBitmap> _valueToMatchingDocsMap;
+ private boolean _isSingleValue;
+ private String _filterJsonPath;
@Override
public String getName() {
@@ -91,12 +93,12 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
throw new IllegalArgumentException("Result type argument must be a
literal");
}
String resultsType = ((LiteralTransformFunction)
thirdArgument).getStringLiteral().toUpperCase();
- boolean isSingleValue = !resultsType.endsWith("_ARRAY");
- if (isSingleValue && _jsonPathString.contains("[*]")) {
+ _isSingleValue = !resultsType.endsWith("_ARRAY");
+ if (_isSingleValue && _jsonPathString.contains("[*]")) {
throw new IllegalArgumentException(
"[*] syntax in json path is unsupported for singleValue field
json_extract_index");
}
- DataType dataType = isSingleValue ? DataType.valueOf(resultsType)
+ DataType dataType = _isSingleValue ? DataType.valueOf(resultsType)
: DataType.valueOf(resultsType.substring(0, resultsType.length() - 6));
if (arguments.size() >= 4) {
@@ -105,7 +107,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
throw new IllegalArgumentException("Default value must be a literal");
}
- if (isSingleValue) {
+ if (_isSingleValue) {
_defaultValue = dataType.convert(((LiteralTransformFunction)
fourthArgument).getStringLiteral());
} else {
try {
@@ -124,21 +126,15 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
}
}
- String filterJsonPath = null;
if (arguments.size() == 5) {
TransformFunction fifthArgument = arguments.get(4);
if (!(fifthArgument instanceof LiteralTransformFunction)) {
throw new IllegalArgumentException("JSON path filter argument must be
a literal");
}
- filterJsonPath = ((LiteralTransformFunction)
fifthArgument).getStringLiteral();
+ _filterJsonPath = ((LiteralTransformFunction)
fifthArgument).getStringLiteral();
}
- _resultMetadata = new TransformResultMetadata(dataType, isSingleValue,
false);
- _valueToMatchingDocsMap =
_jsonIndexReader.getMatchingFlattenedDocsMap(_jsonPathString, filterJsonPath);
- if (isSingleValue) {
- // For single value result type, it's more efficient to use original
docIDs map
- _jsonIndexReader.convertFlattenedDocIdsToDocIds(_valueToMatchingDocsMap);
- }
+ _resultMetadata = new TransformResultMetadata(dataType, _isSingleValue,
false);
}
@Override
@@ -152,7 +148,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int[] inputDocIds = valueBlock.getDocIds();
initIntValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap, false);
+ getValueToMatchingDocsMap(), false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[inputDocIds[i]];
if (value == null) {
@@ -174,7 +170,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int[] inputDocIds = valueBlock.getDocIds();
initLongValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap, false);
+ getValueToMatchingDocsMap(), false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
@@ -196,7 +192,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int[] inputDocIds = valueBlock.getDocIds();
initFloatValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap, false);
+ getValueToMatchingDocsMap(), false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
@@ -218,7 +214,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int[] inputDocIds = valueBlock.getDocIds();
initDoubleValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap, false);
+ getValueToMatchingDocsMap(), false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
@@ -240,7 +236,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int[] inputDocIds = valueBlock.getDocIds();
initBigDecimalValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap, false);
+ getValueToMatchingDocsMap(), false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
@@ -262,7 +258,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int[] inputDocIds = valueBlock.getDocIds();
initStringValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap, false);
+ getValueToMatchingDocsMap(), false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
@@ -283,7 +279,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int numDocs = valueBlock.getNumDocs();
initIntValuesMV(numDocs);
String[][] valuesFromIndex =
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap);
+ getValueToMatchingDocsMap());
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
@@ -311,7 +307,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int numDocs = valueBlock.getNumDocs();
initLongValuesMV(numDocs);
String[][] valuesFromIndex =
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap);
+ getValueToMatchingDocsMap());
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
if (value.length == 0) {
@@ -338,7 +334,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int numDocs = valueBlock.getNumDocs();
initFloatValuesMV(numDocs);
String[][] valuesFromIndex =
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap);
+ getValueToMatchingDocsMap());
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
if (value.length == 0) {
@@ -365,7 +361,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int numDocs = valueBlock.getNumDocs();
initDoubleValuesMV(numDocs);
String[][] valuesFromIndex =
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap);
+ getValueToMatchingDocsMap());
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
if (value.length == 0) {
@@ -392,7 +388,7 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
int numDocs = valueBlock.getNumDocs();
initStringValuesMV(numDocs);
String[][] valuesFromIndex =
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
- _valueToMatchingDocsMap);
+ getValueToMatchingDocsMap());
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
if (value.length == 0) {
@@ -411,4 +407,18 @@ public class JsonExtractIndexTransformFunction extends
BaseTransformFunction {
}
return _stringValuesMV;
}
+
+ /**
+ * Lazily initialize _valueToMatchingDocsMap, so that map generation is
skipped when filtering excludes all values
+ */
+ private Map<String, RoaringBitmap> getValueToMatchingDocsMap() {
+ if (_valueToMatchingDocsMap == null) {
+ _valueToMatchingDocsMap =
_jsonIndexReader.getMatchingFlattenedDocsMap(_jsonPathString, _filterJsonPath);
+ if (_isSingleValue) {
+ // For single value result type, it's more efficient to use original
docIDs map
+
_jsonIndexReader.convertFlattenedDocIdsToDocIds(_valueToMatchingDocsMap);
+ }
+ }
+ return _valueToMatchingDocsMap;
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 469989843c..f8c0e4562d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -18,17 +18,21 @@
*/
package org.apache.pinot.core.accounting;
+import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.pinot.common.datatable.DataTable;
@@ -47,8 +51,15 @@ import
org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.segment.local.realtime.impl.json.MutableJsonIndexImpl;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
+import
org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.Tracing;
@@ -368,6 +379,105 @@ public class ResourceManagerAccountingTest {
Assert.assertTrue(earlyTerminationOccurred.get());
}
+ /**
+ * Test instrumentation in getMatchingFlattenedDocsMap() from
+ * {@link org.apache.pinot.segment.spi.index.reader.JsonIndexReader}
+ *
+ * Since getMatchingFlattenedDocsMap() can collect a large map before
processing any blocks, it is required to
+ * check for OOM during map generation. This test generates a mutable and
immutable json index, and generates a map
+ * as would happen in json_extract_index execution.
+ *
+ * It is roughly equivalent to running json_extract_index(col, '$.key',
'STRING').
+ */
+ @Test
+ public void testJsonIndexExtractMapOOM()
+ throws Exception {
+ HashMap<String, Object> configs = new HashMap<>();
+ ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+ ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+
LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.OFF);
+
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.OFF);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO,
0.00f);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
0.00f);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+ "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
false);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
0.00f);
+
+ PinotConfiguration config = getConfig(2, 2, configs);
+ ResourceManager rm = getResourceManager(2, 2, 1, 1, configs);
+ // init accountant and start watcher task
+ Tracing.ThreadAccountantOps.initializeThreadAccountant(config,
"testJsonIndexExtractMapOOM");
+
+ Supplier<String> randomJsonValue = () -> {
+ Random random = new Random();
+ int length = random.nextInt(1000);
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ sb.append((char) (random.nextInt(26) + 'a'));
+ }
+ return "{\"key\":\"" + sb + "\"}";
+ };
+
+ File indexDir = new File(FileUtils.getTempDirectory(),
"testJsonIndexExtractMapOOM");
+ FileUtils.forceMkdir(indexDir);
+ String colName = "col";
+ try (JsonIndexCreator offHeapIndexCreator = new
OffHeapJsonIndexCreator(indexDir, colName, new JsonIndexConfig());
+ MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(new
JsonIndexConfig())) {
+ // build json indexes
+ for (int i = 0; i < 1000000; i++) {
+ String val = randomJsonValue.get();
+ offHeapIndexCreator.add(val);
+ mutableJsonIndex.add(val);
+ }
+ offHeapIndexCreator.seal();
+
+ CountDownLatch latch = new CountDownLatch(2);
+ AtomicBoolean mutableEarlyTerminationOccurred = new AtomicBoolean(false);
+
+ // test mutable json index .getMatchingFlattenedDocsMap()
+ rm.getQueryRunners().submit(() -> {
+ Tracing.ThreadAccountantOps.setupRunner("testJsonExtractIndexId1");
+ try {
+ mutableJsonIndex.getMatchingFlattenedDocsMap("key", null);
+ } catch (EarlyTerminationException e) {
+ mutableEarlyTerminationOccurred.set(true);
+ Tracing.ThreadAccountantOps.clear();
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ // test immutable json index .getMatchingFlattenedDocsMap()
+ File indexFile = new File(indexDir, colName +
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ AtomicBoolean immutableEarlyTerminationOccurred = new
AtomicBoolean(false);
+ rm.getQueryRunners().submit(() -> {
+ Tracing.ThreadAccountantOps.setupRunner("testJsonExtractIndexId2");
+ try {
+ try (PinotDataBuffer offHeapDataBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
+ ImmutableJsonIndexReader offHeapIndexReader = new
ImmutableJsonIndexReader(offHeapDataBuffer, 1000000)) {
+ offHeapIndexReader.getMatchingFlattenedDocsMap("key", null);
+ } catch (IOException e) {
+ Assert.fail("failed .getMatchingFlattenedDocsMap for the immutable
json index");
+ }
+ } catch (EarlyTerminationException e) {
+ immutableEarlyTerminationOccurred.set(true);
+ Tracing.ThreadAccountantOps.clear();
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ latch.await();
+ Assert.assertTrue(mutableEarlyTerminationOccurred.get(),
+ "Expected early termination reading the mutable index");
+ Assert.assertTrue(immutableEarlyTerminationOccurred.get(),
+ "Expected early termination reading the immutable index");
+ }
+ }
+
/**
* Test thread memory usage tracking and query killing in multi-thread
environment, add @Test to run.
*/
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 6e46120aae..23de292693 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -50,6 +50,7 @@ import
org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.roaringbitmap.IntConsumer;
@@ -482,6 +483,7 @@ public class MutableJsonIndexImpl implements
MutableJsonIndex {
}
if (!flattenedDocIds.isEmpty()) {
valueToMatchingFlattenedDocIdsMap.put(entry.getKey().substring(jsonPathKey.length()
+ 1), flattenedDocIds);
+
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(valueToMatchingFlattenedDocIdsMap.size());
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
index 3500b11172..e94eee4170 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
@@ -48,6 +48,7 @@ import
org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.roaringbitmap.IntConsumer;
@@ -458,6 +459,7 @@ public class ImmutableJsonIndexReader implements
JsonIndexReader {
if (!docIds.isEmpty()) {
result.put(key.substring(jsonPathKey.length() + 1), docIds);
+
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(result.size());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]