This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b97ac02f34 [multistage] scheduler logic fixes (#10728)
b97ac02f34 is described below
commit b97ac02f34e095bb22698e66e3b483d57c89a057
Author: Rong Rong <[email protected]>
AuthorDate: Sun May 7 22:01:17 2023 -0700
[multistage] scheduler logic fixes (#10728)
* fixing query server logic
* fix lint
---------
Co-authored-by: Rong Rong <[email protected]>
---
.../pinot/query/mailbox/GrpcSendingMailbox.java | 4 +-
.../query/mailbox/InMemorySendingMailbox.java | 4 +-
.../apache/pinot/query/runtime/QueryRunner.java | 99 +++++++++++-----------
.../apache/pinot/query/service/QueryServer.java | 22 ++---
.../query/service/dispatch/QueryDispatcher.java | 3 +-
.../runtime/queries/ResourceBasedQueriesTest.java | 37 +++++---
.../src/test/resources/queries/BasicQuery.json | 4 +-
.../test/resources/queries/MetadataTestQuery.json | 66 +++++++++++++++
8 files changed, 159 insertions(+), 80 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 972716bfcc..ebde0f232d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -80,8 +80,8 @@ public class GrpcSendingMailbox implements SendingMailbox {
}
try {
// NOTE: DO NOT use onError() because it will terminate the stream,
and receiver might not get the callback
- _contentObserver.onNext(toMailboxContent(
- TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException("Cancelled by sender", t))));
+
_contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock(
+ new RuntimeException("Cancelled by sender with exception: " +
t.getMessage(), t))));
_contentObserver.onCompleted();
} catch (Exception e) {
// Exception can be thrown if the stream is already closed, so we
simply ignore it
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index e62d86515f..fb96d62043 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -60,7 +60,7 @@ public class InMemorySendingMailbox implements SendingMailbox
{
if (_receivingMailbox == null) {
_receivingMailbox = _mailboxService.getReceivingMailbox(_id);
}
- _receivingMailbox.setErrorBlock(
- TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException("Cancelled by sender", t)));
+
_receivingMailbox.setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
+ new RuntimeException("Cancelled by sender with exception: " +
t.getMessage(), t)));
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 6f1e0618c7..ced674d269 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -198,50 +198,19 @@ public class QueryRunner {
private void runLeafStage(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap,
long timeoutMs, long deadlineMs, long requestId) {
- // TODO: make server query request return via mailbox, this is a hack to
gather the non-streaming data table
- // and package it here for return. But we should really use a
MailboxSendOperator directly put into the
- // server executor.
- MailboxSendOperator mailboxSendOperator = null;
- try {
- boolean isTraceEnabled =
-
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE,
"false"));
- long leafStageStartMillis = System.currentTimeMillis();
- List<ServerPlanRequestContext> serverQueryRequests =
- constructServerQueryRequests(distributedStagePlan,
requestMetadataMap, _helixPropertyStore, _mailboxService,
- deadlineMs);
-
- // send the data table via mailbox in one-off fashion (e.g. no
block-level split, one data table/partition key)
- List<InstanceResponseBlock> serverQueryResults = new
ArrayList<>(serverQueryRequests.size());
- for (ServerPlanRequestContext requestContext : serverQueryRequests) {
- ServerQueryRequest request = new
ServerQueryRequest(requestContext.getInstanceRequest(),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
System.currentTimeMillis());
- serverQueryResults.add(processServerQuery(request,
getQueryWorkerLeafExecutorService()));
- }
- LOGGER.debug(
- "RequestId:" + requestId + " StageId:" +
distributedStagePlan.getStageId() + " Leaf stage v1 processing time:"
- + (System.currentTimeMillis() - leafStageStartMillis) + " ms");
- MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
- OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(_mailboxService, requestId,
sendNode.getPlanFragmentId(),
- distributedStagePlan.getServer(), timeoutMs, deadlineMs,
distributedStagePlan.getStageMetadata(),
- isTraceEnabled);
- MultiStageOperator leafStageOperator =
- new LeafStageTransferableBlockOperator(opChainExecutionContext,
serverQueryResults, sendNode.getDataSchema());
- mailboxSendOperator =
- new MailboxSendOperator(opChainExecutionContext, leafStageOperator,
sendNode.getExchangeType(),
- sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(),
sendNode.getCollationDirections(),
- sendNode.isSortOnSender(), sendNode.getReceiverStageId());
- int blockCounter = 0;
- while
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
- LOGGER.debug("Acquired transferable block: {}", blockCounter++);
- }
- mailboxSendOperator.close();
- } catch (Exception e) {
- LOGGER.error(String.format("Error running leafStage for requestId=%s",
requestId), e);
- if (mailboxSendOperator != null) {
- mailboxSendOperator.cancel(e);
- }
+ boolean isTraceEnabled =
+
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE,
"false"));
+ List<ServerPlanRequestContext> serverPlanRequestContexts =
+ constructServerQueryRequests(distributedStagePlan, requestMetadataMap,
_helixPropertyStore, _mailboxService,
+ deadlineMs);
+ List<ServerQueryRequest> serverQueryRequests = new
ArrayList<>(serverPlanRequestContexts.size());
+ for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
+ serverQueryRequests.add(new
ServerQueryRequest(requestContext.getInstanceRequest(),
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
System.currentTimeMillis()));
}
+ getQueryRunnerExecutorService().submit(() -> {
+ processServerQuery(requestId, timeoutMs, deadlineMs, isTraceEnabled,
distributedStagePlan, serverQueryRequests);
+ });
}
private static List<ServerPlanRequestContext>
constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
@@ -280,15 +249,45 @@ public class QueryRunner {
return requests;
}
- private InstanceResponseBlock processServerQuery(ServerQueryRequest
serverQueryRequest,
- ExecutorService executorService) {
+ private void processServerQuery(long requestId, long timeoutMs, long
deadlineMs, boolean isTraceEnabled,
+ DistributedStagePlan distributedStagePlan, List<ServerQueryRequest>
serverQueryRequests) {
+ MailboxSendOperator mailboxSendOperator = null;
try {
- return _serverExecutor.execute(serverQueryRequest, executorService);
+ // send the data table via mailbox in one-off fashion (e.g. no
block-level split, one data table/partition key)
+ List<InstanceResponseBlock> serverQueryResults = new
ArrayList<>(serverQueryRequests.size());
+ for (ServerQueryRequest request : serverQueryRequests) {
+ InstanceResponseBlock result;
+ try {
+ result = _serverExecutor.execute(request,
getQueryWorkerLeafExecutorService());
+ } catch (Exception e) {
+ InstanceResponseBlock errorResponse = new InstanceResponseBlock();
+
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
+ e.getMessage() + QueryException.getTruncatedStackTrace(e));
+ result = errorResponse;
+ }
+ serverQueryResults.add(result);
+ }
+ MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
+ OpChainExecutionContext opChainExecutionContext =
+ new OpChainExecutionContext(_mailboxService, requestId,
sendNode.getPlanFragmentId(),
+ distributedStagePlan.getServer(), timeoutMs, deadlineMs,
distributedStagePlan.getStageMetadata(),
+ isTraceEnabled);
+ MultiStageOperator leafStageOperator =
+ new LeafStageTransferableBlockOperator(opChainExecutionContext,
serverQueryResults, sendNode.getDataSchema());
+ mailboxSendOperator =
+ new MailboxSendOperator(opChainExecutionContext, leafStageOperator,
sendNode.getExchangeType(),
+ sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(),
sendNode.getCollationDirections(),
+ sendNode.isSortOnSender(), sendNode.getReceiverStageId());
+ int blockCounter = 0;
+ while
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
+ LOGGER.debug("Acquired transferable block: {}", blockCounter++);
+ }
+ mailboxSendOperator.close();
} catch (Exception e) {
- InstanceResponseBlock errorResponse = new InstanceResponseBlock();
- errorResponse.getExceptions()
- .put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() +
QueryException.getTruncatedStackTrace(e));
- return errorResponse;
+ LOGGER.error(String.format("Error running leafStage for requestId=%s",
requestId), e);
+ if (mailboxSendOperator != null) {
+ mailboxSendOperator.cancel(e);
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index 06a10e0aa3..8e7061bff8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -24,8 +24,8 @@ import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
@@ -49,12 +49,10 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
private final int _port;
private Server _server = null;
private final QueryRunner _queryRunner;
- private final ExecutorService _executorService;
public QueryServer(int port, QueryRunner queryRunner) {
_port = port;
_queryRunner = queryRunner;
- _executorService = queryRunner.getQueryRunnerExecutorService();
}
public void start() {
@@ -98,13 +96,17 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
return;
}
- // TODO: break this into parsing and execution, so that responseObserver
can return upon compilation complete.
- // compilation complete indicates dispatch successful.
- _executorService.submit(() ->
_queryRunner.processQuery(distributedStagePlan, requestMetadataMap));
-
- responseObserver.onNext(Worker.QueryResponse.newBuilder()
- .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK,
"").build());
- responseObserver.onCompleted();
+ try {
+ _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+ responseObserver.onNext(Worker.QueryResponse.newBuilder()
+ .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK,
"").build());
+ responseObserver.onCompleted();
+ } catch (Throwable t) {
+ responseObserver.onNext(Worker.QueryResponse.newBuilder()
+ .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR,
QueryException.getTruncatedStackTrace(t))
+ .build());
+ responseObserver.onCompleted();
+ }
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 5454031a9d..c4c48abf59 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -170,7 +170,8 @@ public class QueryDispatcher {
if
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
throw new RuntimeException(
String.format("Unable to execute query plan at stage %s on
server %s: ERROR: %s", resp.getStageId(),
- resp.getVirtualServer(), response));
+ resp.getVirtualServer(),
+
response.getMetadataOrDefault(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR,
"null")));
}
successfulDispatchCalls++;
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 686d644445..6104d9e9ed 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -54,6 +54,7 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -92,6 +93,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
// table will be registered on both servers.
Map<String, Schema> schemaMap = new HashMap<>();
for (Map.Entry<String, QueryTestCase.Table> tableEntry :
testCase._tables.entrySet()) {
+ boolean allowEmptySegment =
!BooleanUtils.toBoolean(extractExtraProps(testCase._extraProps,
"noEmptySegment"));
String tableName = testCaseName + "_" + tableEntry.getKey();
// Testing only OFFLINE table b/c Hybrid table test is a special case
to test separately.
String tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
@@ -110,10 +112,14 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
for (GenericRow row : genericRows) {
if (row == SEGMENT_BREAKER_ROW) {
- factory1.addSegment(tableNameWithType, rows1);
- factory2.addSegment(tableNameWithType, rows2);
- rows1 = new ArrayList<>();
- rows2 = new ArrayList<>();
+ if (allowEmptySegment || rows1.size() > 0) {
+ factory1.addSegment(tableNameWithType, rows1);
+ rows1 = new ArrayList<>();
+ }
+ if (allowEmptySegment || rows2.size() > 0) {
+ factory2.addSegment(tableNameWithType, rows2);
+ rows2 = new ArrayList<>();
+ }
} else {
long partition = 0;
if (partitionColumns == null) {
@@ -130,8 +136,12 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
}
}
}
- factory1.addSegment(tableNameWithType, rows1);
- factory2.addSegment(tableNameWithType, rows2);
+ if (allowEmptySegment || rows1.size() > 0) {
+ factory1.addSegment(tableNameWithType, rows1);
+ }
+ if (allowEmptySegment || rows2.size() > 0) {
+ factory2.addSegment(tableNameWithType, rows2);
+ }
}
boolean anyHaveOutput = testCase._queries.stream().anyMatch(q ->
q._outputs != null);
@@ -348,8 +358,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
Map<String, QueryTestCase> testCaseMap = getTestCases();
List<Object[]> providerContent = new ArrayList<>();
Set<String> validTestCases = new HashSet<>();
- validTestCases.add("basic_test");
- validTestCases.add("framework_test");
+ validTestCases.add("metadata_test");
for (Map.Entry<String, QueryTestCase> testCaseEntry :
testCaseMap.entrySet()) {
String testCaseName = testCaseEntry.getKey();
@@ -377,10 +386,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
if (queryCase._expectedNumSegments != null) {
segmentCount = queryCase._expectedNumSegments;
} else {
- for (String tableName : testCaseEntry.getValue()._tables.keySet())
{
- segmentCount +=
- _tableToSegmentMap.getOrDefault(testCaseName + "_" +
tableName + "_OFFLINE", new HashSet<>()).size();
- }
+ throw new RuntimeException("Unable to test metadata without
expected num segments configuration!");
}
Object[] testEntry = new Object[]{testCaseName, sql, h2Sql,
queryCase._expectedException, segmentCount};
@@ -469,4 +475,11 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
}
return testCaseMap;
}
+
+ private static Object extractExtraProps(Map<String, Object> extraProps,
String propKey) {
+ if (extraProps == null) {
+ return null;
+ }
+ return extraProps.getOrDefault(propKey, null);
+ }
}
diff --git a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
index 21521e36c1..5bcecfe622 100644
--- a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
+++ b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
@@ -60,9 +60,7 @@
},
{
"description": "Correlated subquery test",
- "sql": "SELECT {tbl1}.col1 FROM {tbl1} WHERE {tbl1}.col2 > (SELECT 0.5
* SUM({tbl2}.col3) FROM {tbl2} WHERE {tbl1}.col2 = {tbl1}.col2 AND {tbl1}.col1
= {tbl2}.col1)",
- "expectedNumSegments": 8,
- "comment": "This correlated subquery test is decorrelated to 2 JOINs,
so one table is scanned twice, hence expected to scan 8 segments"
+ "sql": "SELECT {tbl1}.col1 FROM {tbl1} WHERE {tbl1}.col2 > (SELECT 0.5
* SUM({tbl2}.col3) FROM {tbl2} WHERE {tbl1}.col2 = {tbl1}.col2 AND {tbl1}.col1
= {tbl2}.col1)"
}
],
"extraProps": {
diff --git
a/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
b/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
new file mode 100644
index 0000000000..fdccd9cee8
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
@@ -0,0 +1,66 @@
+{
+ "metadata_test": {
+ "comments": "metadata_test is used to test Tracing and Stats metadata
returns",
+ "limitation": "due to current limitation, metadata test requires all data
to be located on a single partition, see:
https://github.com/apache/pinot/issues/10399",
+ "tables": {
+ "tbl1" : {
+ "schema": [
+ {"name": "col1", "type": "STRING"},
+ {"name": "col2", "type": "INT"}
+ ],
+ "inputs": [
+ ["foo", 1],
+ ["bar", 2]
+ ],
+ "partitionColumns": [ ]
+ },
+ "tbl2" : {
+ "schema": [
+ {"name": "col1", "type": "STRING"},
+ {"name": "col2", "type": "INT"},
+ {"name": "col3", "type": "DOUBLE"},
+ {"name": "partitionCol1", "type": "INT"},
+ {"name": "partitionCol2", "type": "STRING"}
+ ],
+ "inputs": [
+ ["foo", 1, 3.1416, 1, "charlie"],
+ ["foo", 3, 3.1416, 1, "charlie"],
+ ["bar", 2, 2.7183, 1, "charlie"],
+ ["------"],
+ ["bar", 4, 2.7183, 1, "charlie"]
+ ],
+ "partitionColumns": [
+ "partitionCol1", "partitionCol2"
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT * FROM {tbl1}",
+ "comment": "only 1 segment for tbl1",
+ "expectedNumSegments": 1
+ },
+ {
+ "sql": "SELECT col1, COUNT(*) FROM {tbl2} GROUP BY col1",
+ "comment": "2 segments for tbl2",
+ "expectedNumSegments": 2
+ },
+ {
+ "sql": "SELECT {tbl1}.col1, {tbl1}.col2, {tbl2}.col3 FROM {tbl1} JOIN
{tbl2} ON {tbl1}.col1 = {tbl2}.col1",
+ "expectedNumSegments": 3
+ },
+ {
+ "sql": "SELECT {tbl1}.col1, {tbl1}.col2, COUNT(*) FROM {tbl1} JOIN
{tbl2} ON {tbl1}.col1 = {tbl2}.col1 GROUP BY {tbl1}.col1, {tbl1}.col2",
+ "expectedNumSegments": 3
+ },
+ {
+ "sql": "SELECT {tbl1}.col1 FROM {tbl1} WHERE {tbl1}.col2 > (SELECT 0.5
* SUM({tbl2}.col3) FROM {tbl2} WHERE {tbl1}.col2 = {tbl1}.col2 AND {tbl1}.col1
= {tbl2}.col1)",
+ "comment": "This correlated subquery test is decorrelated to 2 JOINs,
so one table is scanned twice, hence expected to scan 8 segments",
+ "expectedNumSegments": 4
+ }
+ ],
+ "extraProps": {
+ "noEmptySegment": "true"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]