This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 102a216366 Enhance the instrumentation for a corner case where the
query doesn't go through DocIdSetOp (#10729)
102a216366 is described below
commit 102a216366de72108429c4007ce634320854097a
Author: Jia Guo <[email protected]>
AuthorDate: Mon May 8 22:23:43 2023 -0700
Enhance the instrumentation for a corner case where the query doesn't go
through DocIdSetOp (#10729)
* Enhance the instrumentation for a corner case where the query doesn't go
through DocIdSetOp
* Enhance the instrumentation for a corner case where the query doesn't go
through DocIdSetOp
* Trigger Test
* Enhance the instrumentation for a corner case where the query doesn't go
through DocIdSetOp
* Enhance the instrumentation for a corner case where the query doesn't go
through DocIdSetOp
---
.../query/DictionaryBasedDistinctOperator.java | 28 ++++++++++++++--------
...fflineClusterMemBasedServerQueryKilingTest.java | 23 ++++++++++++++----
2 files changed, 37 insertions(+), 14 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java
index e8f1e015ca..edb5f8a109 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.trace.Tracing;
/**
@@ -98,23 +99,16 @@ public class DictionaryBasedDistinctOperator extends
BaseOperator<DistinctResult
records = new ArrayList<>(actualLimit);
_numDocsScanned = actualLimit;
-
- for (int i = 0; i < actualLimit; i++) {
- records.add(new Record(new Object[]{_dictionary.getInternal(i)}));
- }
+ iterateOnDictionary(dictLength, actualLimit, records);
} else {
if (_dictionary.isSorted()) {
records = new ArrayList<>(actualLimit);
if (_isAscending) {
_numDocsScanned = actualLimit;
- for (int i = 0; i < actualLimit; i++) {
- records.add(new Record(new Object[]{_dictionary.getInternal(i)}));
- }
+ iterateOnDictionary(dictLength, actualLimit, records);
} else {
_numDocsScanned = actualLimit;
- for (int i = dictLength - 1; i >= (dictLength - actualLimit); i--) {
- records.add(new Record(new Object[]{_dictionary.getInternal(i)}));
- }
+ iterateOnDictionaryDesc(dictLength, actualLimit, records);
}
} else {
// DictionaryBasedDistinctOperator cannot handle nulls.
@@ -134,6 +128,20 @@ public class DictionaryBasedDistinctOperator extends
BaseOperator<DistinctResult
return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
}
+ private void iterateOnDictionary(int dictLength, int actualLimit,
List<Record> records) {
+ for (int i = 0; i < actualLimit; i++) {
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(i);
+ records.add(new Record(new Object[]{_dictionary.getInternal(i)}));
+ }
+ }
+
+ private void iterateOnDictionaryDesc(int dictLength, int actualLimit,
List<Record> records) {
+ for (int i = dictLength - 1, j = 0; i >= (dictLength - actualLimit); i--,
j++) {
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(j);
+ records.add(new Record(new Object[]{_dictionary.getInternal(i)}));
+ }
+ }
+
@Override
public String toExplainString() {
return EXPLAIN_NAME;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java
index bfcf96c962..8a07cc7c53 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java
@@ -68,10 +68,16 @@ public class OfflineClusterMemBasedServerQueryKilingTest
extends BaseClusterInte
public static final String BOOLEAN_DIM_SV1 = "booleanDimSV1";
private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;
+private static final int NUM_DOCS = 3_000_000;
+
private static final String OOM_QUERY =
"SELECT PERCENTILETDigest(doubleDimSV1, 50) AS digest, intDimSV1 FROM
mytable GROUP BY intDimSV1"
+ " ORDER BY digest LIMIT 30000";
+ private static final String OOM_QUERY_2 =
+ "SELECT stringDimSV2 FROM mytable GROUP BY stringDimSV2"
+ + " ORDER BY stringDimSV2 LIMIT 1000000";
+
private static final String DIGEST_QUERY_1 =
"SELECT PERCENTILETDigest(doubleDimSV1, 50) AS digest FROM mytable";
private static final String COUNT_STAR_QUERY =
@@ -151,7 +157,7 @@ public class OfflineClusterMemBasedServerQueryKilingTest
extends BaseClusterInte
serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+
CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.0f);
serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
- +
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.60f);
+ +
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.25f);
serverConf.setProperty(
CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." +
CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
"org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
@@ -188,7 +194,7 @@ public class OfflineClusterMemBasedServerQueryKilingTest
extends BaseClusterInte
protected long getCountStarResult() {
- return 3_000_000;
+ return NUM_DOCS * 3;
}
protected String getTimeColumnName() {
@@ -214,6 +220,15 @@ public class OfflineClusterMemBasedServerQueryKilingTest
extends BaseClusterInte
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got
killed because"));
}
+ @Test
+ public void testDigestOOM2()
+ throws Exception {
+ JsonNode queryResponse = postQuery(OOM_QUERY_2);
+ LOGGER.info("testDigestOOM: {}", queryResponse);
+
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException"));
+ Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got
killed because"));
+ }
+
@Test
public void testDigestOOMMultipleQueries()
throws Exception {
@@ -284,14 +299,14 @@ public class OfflineClusterMemBasedServerQueryKilingTest
extends BaseClusterInte
try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
fileWriter.create(avroSchema, avroFile);
- int numDocs = 1_000_000;
+ int numDocs = NUM_DOCS;
int randBound = numDocs / 2;
Random random = new Random(0);
IntStream randomInt = random.ints(0, 100_000);
for (int docId = 0; docId < numDocs; docId++) {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(STRING_DIM_SV1, "test query killing");
- record.put(STRING_DIM_SV2, "test query killing");
+ record.put(STRING_DIM_SV2, "test query killing" + docId);
record.put(INT_DIM_SV1, random.nextInt(randBound));
record.put(LONG_DIM_SV1, random.nextLong());
record.put(DOUBLE_DIM_SV1, random.nextDouble());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]