This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 399f033ec3 [Multi-stage] Remove PhysicalPlanContext and clean up
executor logic (#11439)
399f033ec3 is described below
commit 399f033ec3917df2bc478b5904406a95e0bc7258
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 25 17:24:47 2023 -0700
[Multi-stage] Remove PhysicalPlanContext and clean up executor logic
(#11439)
---
.../MultiStageBrokerRequestHandler.java | 8 +-
.../apache/pinot/query/runtime/QueryRunner.java | 114 ++++++++-------------
.../pinot/query/runtime/operator/OpChain.java | 35 +++----
.../runtime/plan/OpChainExecutionContext.java | 26 +++--
.../query/runtime/plan/PhysicalPlanContext.java | 95 -----------------
.../query/runtime/plan/PhysicalPlanVisitor.java | 90 +++++++---------
.../plan/pipeline/PipelineBreakerExecutor.java | 47 ++++-----
.../plan/server/ServerPlanRequestContext.java | 13 +--
.../plan/server/ServerPlanRequestUtils.java | 58 +++++------
.../plan/server/ServerPlanRequestVisitor.java | 2 +-
.../query/service/dispatch/QueryDispatcher.java | 16 +--
.../pinot/query/service/server/QueryServer.java | 13 ++-
.../pinot/query/runtime/QueryRunnerTest.java | 4 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 4 +-
.../executor/OpChainSchedulerServiceTest.java | 11 +-
.../runtime/operator/LiteralValueOperatorTest.java | 4 -
.../operator/MailboxReceiveOperatorTest.java | 4 +-
.../runtime/operator/MailboxSendOperatorTest.java | 4 +-
.../pinot/query/runtime/operator/OpChainTest.java | 35 ++++---
.../query/runtime/operator/OperatorTestUtil.java | 17 ++-
.../operator/SortedMailboxReceiveOperatorTest.java | 5 +-
.../plan/pipeline/PipelineBreakerExecutorTest.java | 29 +++---
.../service/dispatch/QueryDispatcherTest.java | 4 +-
23 files changed, 247 insertions(+), 391 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 3a65691073..2080bd4a64 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -166,8 +166,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return new
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
errorMessage));
}
- boolean traceEnabled = Boolean.parseBoolean(
-
sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE,
"false"));
+ Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
+ boolean traceEnabled =
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
@@ -177,8 +177,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
long executionStartTimeNs = System.nanoTime();
try {
- queryResults = _queryDispatcher.submitAndReduce(requestContext,
dispatchableSubPlan, queryTimeoutMs,
- sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
+ queryResults = _queryDispatcher.submitAndReduce(requestContext,
dispatchableSubPlan, queryTimeoutMs, queryOptions,
+ stageIdStatsMap);
} catch (Throwable t) {
String consolidatedMessage =
ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId,
query, consolidatedMessage);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index f48f5561fb..021af52f80 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -20,15 +20,12 @@ package org.apache.pinot.query.runtime;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -39,7 +36,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
@@ -50,14 +46,12 @@ import
org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
@@ -72,17 +66,13 @@ public class QueryRunner {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryRunner.class);
private static final String PINOT_V1_SERVER_QUERY_CONFIG_PREFIX =
"pinot.server.query.executor";
- // This is a temporary before merging the 2 type of executor.
- private ServerQueryExecutorV1Impl _serverExecutor;
private HelixManager _helixManager;
- private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
- private MailboxService _mailboxService;
- private String _hostname;
- private int _port;
+ private ServerMetrics _serverMetrics;
private ExecutorService _opChainExecutor;
-
private OpChainSchedulerService _scheduler;
+ private MailboxService _mailboxService;
+ private ServerQueryExecutorV1Impl _leafQueryExecutor;
// Group-by settings
@Nullable
@@ -102,12 +92,14 @@ public class QueryRunner {
*/
public void init(PinotConfiguration config, InstanceDataManager
instanceDataManager, HelixManager helixManager,
ServerMetrics serverMetrics) {
+ _helixManager = helixManager;
+ _serverMetrics = serverMetrics;
+
String instanceName =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
- _hostname =
instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ?
instanceName.substring(
+ String hostname =
instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ?
instanceName.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
- _port =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+ int port =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
- _helixManager = helixManager;
// TODO: Consider using separate config for intermediate stage and leaf
stage
String numGroupsLimitStr =
config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
@@ -121,29 +113,28 @@ public class QueryRunner {
String joinOverflowModeStr =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
_joinOverflowMode = joinOverflowModeStr != null ?
JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
+ //TODO: make this configurable
+ _opChainExecutor =
+ ExecutorServiceUtils.create(config, "pinot.query.runner.opchain",
"op_chain_worker_on_" + port + "_port");
+ _scheduler = new OpChainSchedulerService(getOpChainExecutorService());
+ _mailboxService = new MailboxService(hostname, port, config);
try {
- //TODO: make this configurable
- _opChainExecutor = ExecutorServiceUtils.create(config,
"pinot.query.runner.opchain",
- "op_chain_worker_on_" + _port + "_port");
- _scheduler = new OpChainSchedulerService(getOpChainExecutorService());
- _mailboxService = new MailboxService(_hostname, _port, config);
- _serverExecutor = new ServerQueryExecutorV1Impl();
- _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX),
instanceDataManager, serverMetrics);
+ _leafQueryExecutor = new ServerQueryExecutorV1Impl();
+
_leafQueryExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX),
instanceDataManager, serverMetrics);
} catch (Exception e) {
throw new RuntimeException(e);
}
+
+ LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}",
hostname, port);
}
- public void start()
- throws TimeoutException {
- _helixPropertyStore = _helixManager.getHelixPropertyStore();
+ public void start() {
_mailboxService.start();
- _serverExecutor.start();
+ _leafQueryExecutor.start();
}
- public void shutDown()
- throws TimeoutException {
- _serverExecutor.shutDown();
+ public void shutDown() {
+ _leafQueryExecutor.shutDown();
_mailboxService.shutdown();
ExecutorServiceUtils.close(_opChainExecutor);
}
@@ -156,17 +147,15 @@ public class QueryRunner {
*/
public void processQuery(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadata) {
long requestId =
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
- long timeoutMs =
Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));
- boolean isTraceEnabled =
-
Boolean.parseBoolean(requestMetadata.getOrDefault(CommonConstants.Broker.Request.TRACE,
"false"));
+ long timeoutMs =
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
setStageCustomProperties(distributedStagePlan.getStageMetadata().getCustomProperties(),
requestMetadata);
// run pre-stage execution for all pipeline breakers
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan, deadlineMs,
- requestId, isTraceEnabled);
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan,
+ requestMetadata, requestId, deadlineMs);
// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock()
!= null) {
@@ -193,13 +182,15 @@ public class QueryRunner {
}
// run OpChain
+ OpChainExecutionContext executionContext =
+ new OpChainExecutionContext(_mailboxService, requestId,
distributedStagePlan.getStageId(),
+ distributedStagePlan.getServer(), deadlineMs, requestMetadata,
distributedStagePlan.getStageMetadata(),
+ pipelineBreakerResult);
OpChain opChain;
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
- opChain = compileLeafStage(requestId, distributedStagePlan,
requestMetadata, pipelineBreakerResult, deadlineMs,
- isTraceEnabled);
+ opChain = compileLeafStage(executionContext, distributedStagePlan);
} else {
- opChain = compileIntermediateStage(requestId, distributedStagePlan,
requestMetadata, pipelineBreakerResult,
- deadlineMs, isTraceEnabled);
+ opChain =
PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(),
executionContext);
}
_scheduler.register(opChain);
}
@@ -248,51 +239,36 @@ public class QueryRunner {
return _opChainExecutor;
}
- private OpChain compileIntermediateStage(long requestId,
DistributedStagePlan distributedStagePlan,
- Map<String, String> requestMetadataMap, PipelineBreakerResult
pipelineBreakerResult, long deadlineMs,
- boolean isTraceEnabled) {
- PlanNode stageRoot = distributedStagePlan.getStageRoot();
- OpChainExecutionContext opChainContext = new
OpChainExecutionContext(_mailboxService, requestId,
- stageRoot.getPlanFragmentId(), distributedStagePlan.getServer(),
deadlineMs,
- distributedStagePlan.getStageMetadata(), pipelineBreakerResult,
isTraceEnabled);
- return PhysicalPlanVisitor.walkPlanNode(stageRoot,
- new PhysicalPlanContext(opChainContext, pipelineBreakerResult));
- }
-
- private OpChain compileLeafStage(long requestId, DistributedStagePlan
distributedStagePlan,
- Map<String, String> requestMetadataMap, PipelineBreakerResult
pipelineBreakerResult, long deadlineMs,
- boolean isTraceEnabled) {
- OpChainExecutionContext opChainContext = new
OpChainExecutionContext(_mailboxService, requestId,
- distributedStagePlan.getStageId(), distributedStagePlan.getServer(),
deadlineMs,
- distributedStagePlan.getStageMetadata(), pipelineBreakerResult,
isTraceEnabled);
- PhysicalPlanContext planContext = new PhysicalPlanContext(opChainContext,
pipelineBreakerResult);
- List<ServerPlanRequestContext> serverPlanRequestContexts =
ServerPlanRequestUtils.constructServerQueryRequests(
- planContext, distributedStagePlan, requestMetadataMap,
_helixPropertyStore);
+ private OpChain compileLeafStage(OpChainExecutionContext executionContext,
+ DistributedStagePlan distributedStagePlan) {
+ List<ServerPlanRequestContext> serverPlanRequestContexts =
+ ServerPlanRequestUtils.constructServerQueryRequests(executionContext,
distributedStagePlan,
+ _helixManager.getHelixPropertyStore());
List<ServerQueryRequest> serverQueryRequests = new
ArrayList<>(serverPlanRequestContexts.size());
+ long queryArrivalTimeMs = System.currentTimeMillis();
for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
- serverQueryRequests.add(new
ServerQueryRequest(requestContext.getInstanceRequest(),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
System.currentTimeMillis()));
+ serverQueryRequests.add(
+ new ServerQueryRequest(requestContext.getInstanceRequest(),
_serverMetrics, queryArrivalTimeMs));
}
MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
- OpChainExecutionContext opChainExecutionContext = new
OpChainExecutionContext(planContext);
MultiStageOperator leafStageOperator =
- new LeafStageTransferableBlockOperator(opChainExecutionContext,
this::processServerQueryRequest,
- serverQueryRequests, sendNode.getDataSchema());
+ new LeafStageTransferableBlockOperator(executionContext,
this::processServerQueryRequest, serverQueryRequests,
+ sendNode.getDataSchema());
MailboxSendOperator mailboxSendOperator =
- new MailboxSendOperator(opChainExecutionContext, leafStageOperator,
sendNode.getDistributionType(),
+ new MailboxSendOperator(executionContext, leafStageOperator,
sendNode.getDistributionType(),
sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(),
sendNode.getCollationDirections(),
sendNode.isSortOnSender(), sendNode.getReceiverStageId());
- return new OpChain(opChainExecutionContext, mailboxSendOperator,
Collections.emptyList());
+ return new OpChain(executionContext, mailboxSendOperator);
}
private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest
request) {
InstanceResponseBlock result;
try {
- result = _serverExecutor.execute(request, getOpChainExecutorService());
+ result = _leafQueryExecutor.execute(request,
getOpChainExecutorService());
} catch (Exception e) {
InstanceResponseBlock errorResponse = new InstanceResponseBlock();
-
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
- e.getMessage() + QueryException.getTruncatedStackTrace(e));
+ errorResponse.getExceptions()
+ .put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() +
QueryException.getTruncatedStackTrace(e));
result = errorResponse;
}
return result;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index d5414439ed..360bef6324 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.runtime.operator;
-import java.util.List;
import java.util.function.Consumer;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -34,31 +33,21 @@ import org.slf4j.LoggerFactory;
public class OpChain implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(OpChain.class);
- private final MultiStageOperator _root;
- private final List<String> _receivingMailboxIds;
private final OpChainId _id;
private final OpChainStats _stats;
- private final Consumer<OpChainId> _opChainFinishCallback;
+ private final MultiStageOperator _root;
+ private final Consumer<OpChainId> _finishCallback;
- public OpChain(OpChainExecutionContext context, MultiStageOperator root,
List<String> receivingMailboxIds) {
- this(context, root, receivingMailboxIds, (id) -> { });
+ public OpChain(OpChainExecutionContext context, MultiStageOperator root) {
+ this(context, root, (id) -> {
+ });
}
- public OpChain(OpChainExecutionContext context, MultiStageOperator root,
List<String> receivingMailboxIds,
- Consumer<OpChainId> opChainFinishCallback) {
- _root = root;
- _receivingMailboxIds = receivingMailboxIds;
+ public OpChain(OpChainExecutionContext context, MultiStageOperator root,
Consumer<OpChainId> finishCallback) {
_id = context.getId();
_stats = context.getStats();
- _opChainFinishCallback = opChainFinishCallback;
- }
-
- public Operator<TransferableBlock> getRoot() {
- return _root;
- }
-
- public List<String> getReceivingMailboxIds() {
- return _receivingMailboxIds;
+ _root = root;
+ _finishCallback = finishCallback;
}
public OpChainId getId() {
@@ -70,6 +59,10 @@ public class OpChain implements AutoCloseable {
return _stats;
}
+ public Operator<TransferableBlock> getRoot() {
+ return _root;
+ }
+
@Override
public String toString() {
return "OpChain{" + _id + "}";
@@ -86,7 +79,7 @@ public class OpChain implements AutoCloseable {
try {
_root.close();
} finally {
- _opChainFinishCallback.accept(getId());
+ _finishCallback.accept(getId());
LOGGER.trace("OpChain callback called");
}
}
@@ -102,7 +95,7 @@ public class OpChain implements AutoCloseable {
try {
_root.cancel(e);
} finally {
- _opChainFinishCallback.accept(getId());
+ _finishCallback.accept(getId());
LOGGER.trace("OpChain callback called");
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 9963c91db1..cb4326e8b9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.query.runtime.plan;
+import java.util.Map;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.operator.OpChainStats;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
+import org.apache.pinot.spi.utils.CommonConstants;
/**
@@ -36,32 +38,30 @@ public class OpChainExecutionContext {
private final int _stageId;
private final VirtualServerAddress _server;
private final long _deadlineMs;
+ private final Map<String, String> _requestMetadata;
private final StageMetadata _stageMetadata;
private final OpChainId _id;
private final OpChainStats _stats;
+ private final PipelineBreakerResult _pipelineBreakerResult;
private final boolean _traceEnabled;
public OpChainExecutionContext(MailboxService mailboxService, long
requestId, int stageId,
- VirtualServerAddress server, long deadlineMs, StageMetadata
stageMetadata,
- PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
+ VirtualServerAddress server, long deadlineMs, Map<String, String>
requestMetadata, StageMetadata stageMetadata,
+ PipelineBreakerResult pipelineBreakerResult) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
_server = server;
_deadlineMs = deadlineMs;
+ _requestMetadata = requestMetadata;
_stageMetadata = stageMetadata;
_id = new OpChainId(requestId, server.workerId(), stageId);
_stats = new OpChainStats(_id.toString());
+ _pipelineBreakerResult = pipelineBreakerResult;
if (pipelineBreakerResult != null &&
pipelineBreakerResult.getOpChainStats() != null) {
_stats.getOperatorStatsMap().putAll(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap());
}
- _traceEnabled = traceEnabled;
- }
-
- public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
- this(physicalPlanContext.getMailboxService(),
physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
- physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(),
physicalPlanContext.getStageMetadata(),
- physicalPlanContext.getPipelineBreakerResult(),
physicalPlanContext.isTraceEnabled());
+ _traceEnabled =
Boolean.parseBoolean(requestMetadata.get(CommonConstants.Broker.Request.TRACE));
}
public MailboxService getMailboxService() {
@@ -84,6 +84,10 @@ public class OpChainExecutionContext {
return _deadlineMs;
}
+ public Map<String, String> getRequestMetadata() {
+ return _requestMetadata;
+ }
+
public StageMetadata getStageMetadata() {
return _stageMetadata;
}
@@ -96,6 +100,10 @@ public class OpChainExecutionContext {
return _stats;
}
+ public PipelineBreakerResult getPipelineBreakerResult() {
+ return _pipelineBreakerResult;
+ }
+
public boolean isTraceEnabled() {
return _traceEnabled;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
deleted file mode 100644
index 00b7ac40e7..0000000000
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.plan;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
-
-
-public class PhysicalPlanContext {
- protected final MailboxService _mailboxService;
- protected final long _requestId;
- protected final int _stageId;
- private final long _deadlineMs;
- protected final VirtualServerAddress _server;
- protected final StageMetadata _stageMetadata;
- protected final PipelineBreakerResult _pipelineBreakerResult;
- protected final List<String> _receivingMailboxIds = new ArrayList<>();
- private final OpChainExecutionContext _opChainExecutionContext;
- private final boolean _traceEnabled;
-
- public PhysicalPlanContext(OpChainExecutionContext opChainContext,
PipelineBreakerResult pipelineBreakerResult) {
- _mailboxService = opChainContext.getMailboxService();
- _requestId = opChainContext.getRequestId();
- _stageId = opChainContext.getStageId();
- _deadlineMs = opChainContext.getDeadlineMs();
- _server = opChainContext.getServer();
- _stageMetadata = opChainContext.getStageMetadata();
- _pipelineBreakerResult = pipelineBreakerResult;
- _traceEnabled = opChainContext.isTraceEnabled();
- _opChainExecutionContext = opChainContext;
- }
-
- public long getRequestId() {
- return _requestId;
- }
-
- public int getStageId() {
- return _stageId;
- }
-
- public long getDeadlineMs() {
- return _deadlineMs;
- }
-
- public VirtualServerAddress getServer() {
- return _server;
- }
-
- public StageMetadata getStageMetadata() {
- return _stageMetadata;
- }
-
- public PipelineBreakerResult getPipelineBreakerResult() {
- return _pipelineBreakerResult;
- }
-
- public MailboxService getMailboxService() {
- return _mailboxService;
- }
-
- public void addReceivingMailboxIds(List<String> receivingMailboxIds) {
- _receivingMailboxIds.addAll(receivingMailboxIds);
- }
-
- public List<String> getReceivingMailboxIds() {
- return _receivingMailboxIds;
- }
-
- public OpChainExecutionContext getOpChainExecutionContext() {
- return _opChainExecutionContext;
- }
-
- public boolean isTraceEnabled() {
- return _traceEnabled;
- }
-}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 2340545437..144cf86e27 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -57,64 +57,56 @@ import
org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
* this works only for the intermediate stage nodes, leaf stage nodes are
expected to compile into
* v1 operators at this point in time.
*
- * <p>This class should be used statically via {@link #walkPlanNode(PlanNode,
PhysicalPlanContext)}
+ * <p>This class should be used statically via {@link #walkPlanNode(PlanNode,
OpChainExecutionContext)}
*/
-public class PhysicalPlanVisitor implements
PlanNodeVisitor<MultiStageOperator, PhysicalPlanContext> {
+public class PhysicalPlanVisitor implements
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
private static final PhysicalPlanVisitor INSTANCE = new
PhysicalPlanVisitor();
- public static OpChain walkPlanNode(PlanNode node, PhysicalPlanContext
context) {
+ public static OpChain walkPlanNode(PlanNode node, OpChainExecutionContext
context) {
MultiStageOperator root = node.visit(INSTANCE, context);
- return new OpChain(context.getOpChainExecutionContext(), root,
context.getReceivingMailboxIds());
+ return new OpChain(context, root);
}
@Override
- public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node,
PhysicalPlanContext context) {
+ public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node,
OpChainExecutionContext context) {
if (node.isSortOnReceiver()) {
- SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
- new
SortedMailboxReceiveOperator(context.getOpChainExecutionContext(),
node.getDistributionType(),
- node.getDataSchema(), node.getCollationKeys(),
node.getCollationDirections(),
- node.getCollationNullDirections(), node.isSortOnSender(),
node.getSenderStageId());
-
context.addReceivingMailboxIds(sortedMailboxReceiveOperator.getMailboxIds());
- return sortedMailboxReceiveOperator;
+ return new SortedMailboxReceiveOperator(context,
node.getDistributionType(), node.getDataSchema(),
+ node.getCollationKeys(), node.getCollationDirections(),
node.getCollationNullDirections(),
+ node.isSortOnSender(), node.getSenderStageId());
} else {
- MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(context.getOpChainExecutionContext(),
node.getDistributionType(),
- node.getSenderStageId());
- context.addReceivingMailboxIds(mailboxReceiveOperator.getMailboxIds());
- return mailboxReceiveOperator;
+ return new MailboxReceiveOperator(context, node.getDistributionType(),
node.getSenderStageId());
}
}
@Override
- public MultiStageOperator visitMailboxSend(MailboxSendNode node,
PhysicalPlanContext context) {
+ public MultiStageOperator visitMailboxSend(MailboxSendNode node,
OpChainExecutionContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
- return new MailboxSendOperator(context.getOpChainExecutionContext(),
nextOperator, node.getDistributionType(),
- node.getPartitionKeySelector(), node.getCollationKeys(),
node.getCollationDirections(), node.isSortOnSender(),
- node.getReceiverStageId());
+ return new MailboxSendOperator(context, nextOperator,
node.getDistributionType(), node.getPartitionKeySelector(),
+ node.getCollationKeys(), node.getCollationDirections(),
node.isSortOnSender(), node.getReceiverStageId());
}
@Override
- public MultiStageOperator visitAggregate(AggregateNode node,
PhysicalPlanContext context) {
+ public MultiStageOperator visitAggregate(AggregateNode node,
OpChainExecutionContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
DataSchema inputSchema = node.getInputs().get(0).getDataSchema();
DataSchema resultSchema = node.getDataSchema();
- return new AggregateOperator(context.getOpChainExecutionContext(),
nextOperator, resultSchema, inputSchema,
- node.getAggCalls(), node.getGroupSet(), node.getAggType(),
node.getFilterArgIndices(), node.getNodeHint());
+ return new AggregateOperator(context, nextOperator, resultSchema,
inputSchema, node.getAggCalls(),
+ node.getGroupSet(), node.getAggType(), node.getFilterArgIndices(),
node.getNodeHint());
}
@Override
- public MultiStageOperator visitWindow(WindowNode node, PhysicalPlanContext
context) {
+ public MultiStageOperator visitWindow(WindowNode node,
OpChainExecutionContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
- return new WindowAggregateOperator(context.getOpChainExecutionContext(),
nextOperator, node.getGroupSet(),
- node.getOrderSet(), node.getOrderSetDirection(),
node.getOrderSetNullDirection(), node.getAggCalls(),
- node.getLowerBound(), node.getUpperBound(), node.getWindowFrameType(),
node.getConstants(),
- node.getDataSchema(), node.getInputs().get(0).getDataSchema());
+ return new WindowAggregateOperator(context, nextOperator,
node.getGroupSet(), node.getOrderSet(),
+ node.getOrderSetDirection(), node.getOrderSetNullDirection(),
node.getAggCalls(), node.getLowerBound(),
+ node.getUpperBound(), node.getWindowFrameType(), node.getConstants(),
node.getDataSchema(),
+ node.getInputs().get(0).getDataSchema());
}
@Override
- public MultiStageOperator visitSetOp(SetOpNode setOpNode,
PhysicalPlanContext context) {
+ public MultiStageOperator visitSetOp(SetOpNode setOpNode,
OpChainExecutionContext context) {
List<MultiStageOperator> inputs = new ArrayList<>();
for (PlanNode input : setOpNode.getInputs()) {
MultiStageOperator visited = input.visit(this, context);
@@ -122,66 +114,60 @@ public class PhysicalPlanVisitor implements
PlanNodeVisitor<MultiStageOperator,
}
switch (setOpNode.getSetOpType()) {
case UNION:
- return new UnionOperator(context.getOpChainExecutionContext(), inputs,
- setOpNode.getInputs().get(0).getDataSchema());
+ return new UnionOperator(context, inputs,
setOpNode.getInputs().get(0).getDataSchema());
case INTERSECT:
- return new IntersectOperator(context.getOpChainExecutionContext(),
inputs,
- setOpNode.getInputs().get(0).getDataSchema());
+ return new IntersectOperator(context, inputs,
setOpNode.getInputs().get(0).getDataSchema());
case MINUS:
- return new MinusOperator(context.getOpChainExecutionContext(), inputs,
- setOpNode.getInputs().get(0).getDataSchema());
+ return new MinusOperator(context, inputs,
setOpNode.getInputs().get(0).getDataSchema());
default:
throw new IllegalStateException();
}
}
@Override
- public MultiStageOperator visitExchange(ExchangeNode exchangeNode,
PhysicalPlanContext context) {
+ public MultiStageOperator visitExchange(ExchangeNode exchangeNode,
OpChainExecutionContext context) {
throw new UnsupportedOperationException("ExchangeNode should not be
visited");
}
@Override
- public MultiStageOperator visitFilter(FilterNode node, PhysicalPlanContext
context) {
+ public MultiStageOperator visitFilter(FilterNode node,
OpChainExecutionContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
- return new FilterOperator(context.getOpChainExecutionContext(),
nextOperator, node.getDataSchema(),
- node.getCondition());
+ return new FilterOperator(context, nextOperator, node.getDataSchema(),
node.getCondition());
}
@Override
- public MultiStageOperator visitJoin(JoinNode node, PhysicalPlanContext
context) {
+ public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext
context) {
PlanNode left = node.getInputs().get(0);
PlanNode right = node.getInputs().get(1);
MultiStageOperator leftOperator = left.visit(this, context);
MultiStageOperator rightOperator = right.visit(this, context);
- return new HashJoinOperator(context.getOpChainExecutionContext(),
leftOperator, rightOperator, left.getDataSchema(),
- node);
+ return new HashJoinOperator(context, leftOperator, rightOperator,
left.getDataSchema(), node);
}
@Override
- public MultiStageOperator visitProject(ProjectNode node, PhysicalPlanContext
context) {
+ public MultiStageOperator visitProject(ProjectNode node,
OpChainExecutionContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
- return new TransformOperator(context.getOpChainExecutionContext(),
nextOperator, node.getDataSchema(),
- node.getProjects(), node.getInputs().get(0).getDataSchema());
+ return new TransformOperator(context, nextOperator, node.getDataSchema(),
node.getProjects(),
+ node.getInputs().get(0).getDataSchema());
}
@Override
- public MultiStageOperator visitSort(SortNode node, PhysicalPlanContext
context) {
+ public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext
context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
boolean isInputSorted = nextOperator instanceof
SortedMailboxReceiveOperator;
- return new SortOperator(context.getOpChainExecutionContext(),
nextOperator, node.getCollationKeys(),
- node.getCollationDirections(), node.getCollationNullDirections(),
node.getFetch(), node.getOffset(),
- node.getDataSchema(), isInputSorted);
+ return new SortOperator(context, nextOperator, node.getCollationKeys(),
node.getCollationDirections(),
+ node.getCollationNullDirections(), node.getFetch(), node.getOffset(),
node.getDataSchema(), isInputSorted);
}
@Override
- public MultiStageOperator visitTableScan(TableScanNode node,
PhysicalPlanContext context) {
+ public MultiStageOperator visitTableScan(TableScanNode node,
OpChainExecutionContext context) {
throw new UnsupportedOperationException("Stage node of type TableScanNode
is not supported!");
}
@Override
- public MultiStageOperator visitValue(ValueNode node, PhysicalPlanContext
context) {
- return new LiteralValueOperator(context.getOpChainExecutionContext(),
node.getDataSchema(), node.getLiteralRows());
+ public MultiStageOperator visitValue(ValueNode node, OpChainExecutionContext
context) {
+ return new LiteralValueOperator(context, node.getDataSchema(),
node.getLiteralRows());
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 69663f5d33..8972a89e18 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -35,7 +35,6 @@ import
org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,31 +56,29 @@ public class PipelineBreakerExecutor {
* @param scheduler scheduler service to run the pipeline breaker main
thread.
* @param mailboxService mailbox service to attach the {@link
MailboxReceiveNode} against.
* @param distributedStagePlan the distributed stage plan to run pipeline
breaker on.
- * @param deadlineMs execution deadline
+ * @param requestMetadata request metadata, including query options
* @param requestId request ID
- * @param isTraceEnabled whether to enable trace.
+ * @param deadlineMs execution deadline
* @return pipeline breaker result;
* - If exception occurs, exception block will be wrapped in {@link
TransferableBlock} and assigned to each PB node.
* - Normal stats will be attached to each PB node and downstream
execution should return with stats attached.
*/
@Nullable
public static PipelineBreakerResult
executePipelineBreakers(OpChainSchedulerService scheduler,
- MailboxService mailboxService, DistributedStagePlan
distributedStagePlan, long deadlineMs, long requestId,
- boolean isTraceEnabled) {
+ MailboxService mailboxService, DistributedStagePlan
distributedStagePlan, Map<String, String> requestMetadata,
+ long requestId, long deadlineMs) {
PipelineBreakerContext pipelineBreakerContext = new
PipelineBreakerContext();
PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(),
pipelineBreakerContext);
if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
try {
- PlanNode stageRoot = distributedStagePlan.getStageRoot();
// TODO: This PlanRequestContext needs to indicate it is a pre-stage
opChain and only listens to pre-stage
// OpChain receive-mail callbacks.
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query
information
- OpChainExecutionContext opChainContext =
- new OpChainExecutionContext(mailboxService, requestId,
stageRoot.getPlanFragmentId(),
- distributedStagePlan.getServer(), deadlineMs,
distributedStagePlan.getStageMetadata(), null,
- isTraceEnabled);
- PhysicalPlanContext physicalPlanContext = new
PhysicalPlanContext(opChainContext, null);
- return PipelineBreakerExecutor.execute(scheduler,
pipelineBreakerContext, physicalPlanContext);
+ OpChainExecutionContext opChainExecutionContext =
+ new OpChainExecutionContext(mailboxService, requestId,
distributedStagePlan.getStageId(),
+ distributedStagePlan.getServer(), deadlineMs, requestMetadata,
distributedStagePlan.getStageMetadata(),
+ null);
+ return execute(scheduler, pipelineBreakerContext,
opChainExecutionContext);
} catch (Exception e) {
LOGGER.error("Caught exception executing pipeline breaker for request:
{}, stage: {}", requestId,
distributedStagePlan.getStageId(), e);
@@ -93,36 +90,36 @@ public class PipelineBreakerExecutor {
}
}
- private static PipelineBreakerResult execute(OpChainSchedulerService
scheduler, PipelineBreakerContext context,
- PhysicalPlanContext physicalPlanContext)
+ private static PipelineBreakerResult execute(OpChainSchedulerService
scheduler,
+ PipelineBreakerContext pipelineBreakerContext, OpChainExecutionContext
opChainExecutionContext)
throws Exception {
Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new
HashMap<>();
- for (Map.Entry<Integer, PlanNode> e :
context.getPipelineBreakerMap().entrySet()) {
+ for (Map.Entry<Integer, PlanNode> e :
pipelineBreakerContext.getPipelineBreakerMap().entrySet()) {
int key = e.getKey();
PlanNode planNode = e.getValue();
if (!(planNode instanceof MailboxReceiveNode)) {
throw new UnsupportedOperationException("Only MailboxReceiveNode is
supported to run as pipeline breaker now");
}
- OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode,
physicalPlanContext);
- pipelineWorkerMap.put(key, tempOpChain.getRoot());
+ OpChain opChain = PhysicalPlanVisitor.walkPlanNode(planNode,
opChainExecutionContext);
+ pipelineWorkerMap.put(key, opChain.getRoot());
}
- return runMailboxReceivePipelineBreaker(scheduler, context,
pipelineWorkerMap, physicalPlanContext);
+ return runMailboxReceivePipelineBreaker(scheduler, pipelineBreakerContext,
pipelineWorkerMap,
+ opChainExecutionContext);
}
private static PipelineBreakerResult
runMailboxReceivePipelineBreaker(OpChainSchedulerService scheduler,
- PipelineBreakerContext context, Map<Integer,
Operator<TransferableBlock>> pipelineWorkerMap,
- PhysicalPlanContext physicalPlanContext)
+ PipelineBreakerContext pipelineBreakerContext, Map<Integer,
Operator<TransferableBlock>> pipelineWorkerMap,
+ OpChainExecutionContext opChainExecutionContext)
throws Exception {
PipelineBreakerOperator pipelineBreakerOperator =
- new
PipelineBreakerOperator(physicalPlanContext.getOpChainExecutionContext(),
pipelineWorkerMap);
+ new PipelineBreakerOperator(opChainExecutionContext,
pipelineWorkerMap);
CountDownLatch latch = new CountDownLatch(1);
OpChain pipelineBreakerOpChain =
- new OpChain(physicalPlanContext.getOpChainExecutionContext(),
pipelineBreakerOperator,
- physicalPlanContext.getReceivingMailboxIds(), (id) ->
latch.countDown());
+ new OpChain(opChainExecutionContext, pipelineBreakerOperator, (id) ->
latch.countDown());
scheduler.register(pipelineBreakerOpChain);
- long timeoutMs = physicalPlanContext.getDeadlineMs() -
System.currentTimeMillis();
+ long timeoutMs = opChainExecutionContext.getDeadlineMs() -
System.currentTimeMillis();
if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
- return new PipelineBreakerResult(context.getNodeIdMap(),
pipelineBreakerOperator.getResultMap(),
+ return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(),
pipelineBreakerOperator.getResultMap(),
pipelineBreakerOperator.getErrorBlock(),
pipelineBreakerOpChain.getStats());
} else {
throw new TimeoutException(
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index fb1c3af0c1..bc78678fa3 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -20,7 +20,7 @@ package org.apache.pinot.query.runtime.plan.server;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.config.table.TableType;
@@ -29,20 +29,21 @@ import org.apache.pinot.spi.config.table.TableType;
* {@link PinotQuery} to execute on server.
*/
public class ServerPlanRequestContext {
- private final PhysicalPlanContext _planContext;
+ private final OpChainExecutionContext _executionContext;
private final TableType _tableType;
private PinotQuery _pinotQuery;
private InstanceRequest _instanceRequest;
- public ServerPlanRequestContext(PhysicalPlanContext planContext, PinotQuery
pinotQuery, TableType tableType) {
- _planContext = planContext;
+ public ServerPlanRequestContext(OpChainExecutionContext executionContext,
PinotQuery pinotQuery,
+ TableType tableType) {
+ _executionContext = executionContext;
_pinotQuery = pinotQuery;
_tableType = tableType;
}
- public PhysicalPlanContext getPlanContext() {
- return _planContext;
+ public OpChainExecutionContext getExecutionContext() {
+ return _executionContext;
}
public TableType getTableType() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 7f9e756acc..466b29af14 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -43,7 +43,7 @@ import
org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -61,6 +61,9 @@ import org.slf4j.LoggerFactory;
public class ServerPlanRequestUtils {
+ private ServerPlanRequestUtils() {
+ }
+
private static final int DEFAULT_LEAF_NODE_LIMIT = Integer.MAX_VALUE;
private static final Logger LOGGER =
LoggerFactory.getLogger(ServerPlanRequestUtils.class);
private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
@@ -70,22 +73,16 @@ public class ServerPlanRequestUtils {
new
ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
- private ServerPlanRequestUtils() {
- // do not instantiate.
- }
-
/**
* Entry point to construct a {@link ServerPlanRequestContext} for executing
leaf-stage runner.
*
- * @param planContext physical plan context of the stage.
+ * @param executionContext execution context of the stage.
* @param distributedStagePlan distributed stage plan of the stage.
- * @param requestMetadataMap metadata map
* @param helixPropertyStore helix property store used to fetch table config
and schema for leaf-stage execution.
* @return a list of server plan request context to be run
*/
- public static List<ServerPlanRequestContext>
constructServerQueryRequests(PhysicalPlanContext planContext,
- DistributedStagePlan distributedStagePlan, Map<String, String>
requestMetadataMap,
- ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
+ public static List<ServerPlanRequestContext>
constructServerQueryRequests(OpChainExecutionContext executionContext,
+ DistributedStagePlan distributedStagePlan,
ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
WorkerMetadata workerMetadata =
distributedStagePlan.getCurrentWorkerMetadata();
String rawTableName = StageMetadata.getTableName(stageMetadata);
@@ -101,15 +98,15 @@ public class ServerPlanRequestUtils {
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- requests.add(ServerPlanRequestUtils.build(planContext,
distributedStagePlan, requestMetadataMap, tableConfig,
- schema, StageMetadata.getTimeBoundary(stageMetadata),
TableType.OFFLINE, tableEntry.getValue()));
+ requests.add(ServerPlanRequestUtils.build(executionContext,
distributedStagePlan, tableConfig, schema,
+ StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE,
tableEntry.getValue()));
} else if (TableType.REALTIME.name().equals(tableType)) {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- requests.add(ServerPlanRequestUtils.build(planContext,
distributedStagePlan, requestMetadataMap, tableConfig,
- schema, StageMetadata.getTimeBoundary(stageMetadata),
TableType.REALTIME, tableEntry.getValue()));
+ requests.add(ServerPlanRequestUtils.build(executionContext,
distributedStagePlan, tableConfig, schema,
+ StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME,
tableEntry.getValue()));
} else {
throw new IllegalArgumentException("Unsupported table type key: " +
tableType);
}
@@ -117,18 +114,15 @@ public class ServerPlanRequestUtils {
return requests;
}
- private static ServerPlanRequestContext build(PhysicalPlanContext
planContext, DistributedStagePlan stagePlan,
- Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema
schema, TimeBoundaryInfo timeBoundaryInfo,
+ private static ServerPlanRequestContext build(OpChainExecutionContext
executionContext,
+ DistributedStagePlan stagePlan, TableConfig tableConfig, Schema schema,
TimeBoundaryInfo timeBoundaryInfo,
TableType tableType, List<String> segmentList) {
// Before-visit: construct the ServerPlanRequestContext baseline
// Making a unique requestId for leaf stages otherwise it causes problem
on stats/metrics/tracing.
- long requestId =
-
(Long.parseLong(requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID))
<< 16) + (
- (long) stagePlan.getStageId() << 8) + (tableType ==
TableType.REALTIME ? 1 : 0);
- long timeoutMs =
Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
- boolean traceEnabled =
Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE));
+ long requestId = (executionContext.getRequestId() << 16) + ((long)
stagePlan.getStageId() << 8) + (
+ tableType == TableType.REALTIME ? 1 : 0);
PinotQuery pinotQuery = new PinotQuery();
- Integer leafNodeLimit =
QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap);
+ Integer leafNodeLimit =
QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getRequestMetadata());
if (leafNodeLimit != null) {
pinotQuery.setLimit(leafNodeLimit);
} else {
@@ -136,7 +130,7 @@ public class ServerPlanRequestUtils {
}
LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
pinotQuery.setExplain(false);
- ServerPlanRequestContext serverContext = new
ServerPlanRequestContext(planContext, pinotQuery, tableType);
+ ServerPlanRequestContext serverContext = new
ServerPlanRequestContext(executionContext, pinotQuery, tableType);
// visit the plan and create query physical plan.
ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(),
serverContext);
@@ -152,7 +146,7 @@ public class ServerPlanRequestUtils {
QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
// 2. set pinot query options according to requestMetadataMap
- updateQueryOptions(pinotQuery, requestMetadataMap, timeoutMs,
traceEnabled);
+ updateQueryOptions(pinotQuery, executionContext);
// 3. wrapped around in broker request
BrokerRequest brokerRequest = new BrokerRequest();
@@ -168,7 +162,7 @@ public class ServerPlanRequestUtils {
InstanceRequest instanceRequest = new InstanceRequest();
instanceRequest.setRequestId(requestId);
instanceRequest.setBrokerId("unknown");
-
instanceRequest.setEnableTrace(Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE)));
+ instanceRequest.setEnableTrace(executionContext.isTraceEnabled());
instanceRequest.setSearchSegments(segmentList);
instanceRequest.setQuery(brokerRequest);
@@ -179,16 +173,10 @@ public class ServerPlanRequestUtils {
/**
* Helper method to update query options.
*/
- private static void updateQueryOptions(PinotQuery pinotQuery, Map<String,
String> requestMetadataMap, long timeoutMs,
- boolean traceEnabled) {
- Map<String, String> queryOptions = new HashMap<>();
- // put default timeout and trace options
- queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
String.valueOf(timeoutMs));
- if (traceEnabled) {
- queryOptions.put(CommonConstants.Broker.Request.TRACE, "true");
- }
- // overwrite with requestMetadataMap to carry query options from request:
- queryOptions.putAll(requestMetadataMap);
+ private static void updateQueryOptions(PinotQuery pinotQuery,
OpChainExecutionContext executionContext) {
+ Map<String, String> queryOptions = new
HashMap<>(executionContext.getRequestMetadata());
+ queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
+ Long.toString(executionContext.getDeadlineMs() -
System.currentTimeMillis()));
pinotQuery.setQueryOptions(queryOptions);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index 5e3873fbcd..be7db3ea63 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -110,7 +110,7 @@ public class ServerPlanRequestVisitor implements
PlanNodeVisitor<Void, ServerPla
staticSide = node.getInputs().get(1);
}
staticSide.visit(this, context);
- PipelineBreakerResult pipelineBreakerResult =
context.getPlanContext().getPipelineBreakerResult();
+ PipelineBreakerResult pipelineBreakerResult =
context.getExecutionContext().getPipelineBreakerResult();
int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide);
List<TransferableBlock> transferableBlocks =
pipelineBreakerResult.getResultMap().getOrDefault(
resultMapId, Collections.emptyList());
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 2561779c2b..bf1993af3d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -87,15 +87,14 @@ public class QueryDispatcher {
}
public ResultTable submitAndReduce(RequestContext context,
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
- Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator>
executionStatsAggregator,
- boolean traceEnabled)
+ Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator>
executionStatsAggregator)
throws Exception {
long requestId = context.getRequestId();
try {
submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions);
long reduceStartTimeNs = System.nanoTime();
ResultTable resultTable =
- runReducer(requestId, dispatchableSubPlan, timeoutMs,
executionStatsAggregator, traceEnabled,
+ runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions,
executionStatsAggregator,
_mailboxService);
context.setReduceTimeNanos(System.nanoTime() - reduceStartTimeNs);
return resultTable;
@@ -184,7 +183,8 @@ public class QueryDispatcher {
@VisibleForTesting
public static ResultTable runReducer(long requestId, DispatchableSubPlan
dispatchableSubPlan, long timeoutMs,
- Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean
traceEnabled, MailboxService mailboxService) {
+ Map<String, String> queryOptions, @Nullable Map<Integer,
ExecutionStatsAggregator> statsAggregatorMap,
+ MailboxService mailboxService) {
// NOTE: Reduce stage is always stage 0
DispatchablePlanFragment dispatchablePlanFragment =
dispatchableSubPlan.getQueryStageList().get(0);
PlanFragment planFragment = dispatchablePlanFragment.getPlanFragment();
@@ -199,8 +199,8 @@ public class QueryDispatcher {
.addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
OpChainExecutionContext opChainExecutionContext =
new OpChainExecutionContext(mailboxService, requestId,
planFragment.getFragmentId(),
- workerMetadataList.get(0).getVirtualServerAddress(),
System.currentTimeMillis() + timeoutMs, stageMetadata,
- null, traceEnabled);
+ workerMetadataList.get(0).getVirtualServerAddress(),
System.currentTimeMillis() + timeoutMs, queryOptions,
+ stageMetadata, null);
MailboxReceiveOperator receiveOperator =
new MailboxReceiveOperator(opChainExecutionContext,
receiveNode.getDistributionType(),
receiveNode.getSenderStageId());
@@ -210,9 +210,9 @@ public class QueryDispatcher {
return resultTable;
}
- private static void collectStats(DispatchableSubPlan dispatchableSubPlan,
@Nullable OpChainStats opChainStats,
+ private static void collectStats(DispatchableSubPlan dispatchableSubPlan,
OpChainStats opChainStats,
@Nullable Map<Integer, ExecutionStatsAggregator>
executionStatsAggregatorMap) {
- if (executionStatsAggregatorMap != null && opChainStats != null) {
+ if (executionStatsAggregatorMap != null) {
LOGGER.info("Extracting broker query execution stats, Runtime: {}ms",
opChainStats.getExecutionTime());
for (Map.Entry<String, OperatorStats> entry :
opChainStats.getOperatorStatsMap().entrySet()) {
OperatorStats operatorStats = entry.getValue();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 893e0afc5e..3b1d18655d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -98,10 +98,10 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
public void submit(Worker.QueryRequest request,
StreamObserver<Worker.QueryResponse> responseObserver) {
// Deserialize the request
List<DistributedStagePlan> distributedStagePlans;
- Map<String, String> requestMetadataMap;
- requestMetadataMap = request.getMetadataMap();
- long requestId =
Long.parseLong(requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
- long timeoutMs =
Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+ Map<String, String> requestMetadata;
+ requestMetadata = request.getMetadataMap();
+ long requestId =
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
+ long timeoutMs =
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
// 1. Deserialized request
try {
@@ -114,7 +114,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
// 2. Submit distributed stage plans
SubmissionService submissionService = new
SubmissionService(_querySubmissionExecutorService);
distributedStagePlans.forEach(distributedStagePlan ->
submissionService.submit(() -> {
- _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+ _queryRunner.processQuery(distributedStagePlan, requestMetadata);
}));
// 3. await response successful or any failure which cancels all other
tasks.
try {
@@ -123,8 +123,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
LOGGER.error("error occurred during stage submission for {}:\n{}",
requestId, t);
responseObserver.onNext(Worker.QueryResponse.newBuilder()
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
- QueryException.getTruncatedStackTrace(t))
- .build());
+ QueryException.getTruncatedStackTrace(t)).build());
responseObserver.onCompleted();
return;
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 7b27883210..afe5aa34ab 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -208,7 +209,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
processDistributedStagePlans(dispatchableSubPlan, stageId,
requestMetadataMap);
}
try {
- QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs,
null, false, _mailboxService);
+ QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs,
Collections.emptyMap(), null,
+ _mailboxService);
Assert.fail("Should have thrown exception!");
} catch (RuntimeException e) {
// NOTE: The actual message is (usually) something like:
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index f784f4b586..5112cf4caf 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -137,8 +137,8 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
}
}
ResultTable resultTable =
- QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs,
executionStatsAggregatorMap, true,
- _mailboxService);
+ QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs,
Collections.emptyMap(),
+ executionStatsAggregatorMap, _mailboxService);
return resultTable.getRows();
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 38ab949875..e79f46e671 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.query.runtime.executor;
-import com.google.common.collect.ImmutableList;
+import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -68,8 +68,9 @@ public class OpChainSchedulerServiceTest {
private OpChain getChain(MultiStageOperator operator) {
VirtualServerAddress address = new VirtualServerAddress("localhost", 1234,
1);
- OpChainExecutionContext context = new OpChainExecutionContext(null, 123L,
1, address, 0, null, null, true);
- return new OpChain(context, operator, ImmutableList.of());
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(null, 123L, 1, address, Long.MAX_VALUE,
Collections.emptyMap(), null, null);
+ return new OpChain(context, operator);
}
@Test
@@ -131,8 +132,8 @@ public class OpChainSchedulerServiceTest {
OpChainSchedulerService schedulerService = new
OpChainSchedulerService(_executor);
CountDownLatch latch = new CountDownLatch(1);
- Mockito.when(_operatorA.nextBlock()).thenReturn(
- TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException("foo")));
+ Mockito.when(_operatorA.nextBlock())
+ .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException("foo")));
Mockito.doAnswer(inv -> {
latch.countDown();
return null;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
index 83dea7c82d..4913527d29 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
@@ -25,7 +25,6 @@ import
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -40,9 +39,6 @@ public class LiteralValueOperatorTest {
private AutoCloseable _mocks;
- @Mock
- private PhysicalPlanContext _context;
-
@Mock
private VirtualServerAddress _serverAddress;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 76be227fcd..e9c8661ff9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -105,12 +105,12 @@ public class MailboxReceiveOperatorTest {
@Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS,
Long.MAX_VALUE, null);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS,
Long.MAX_VALUE, _stageMetadata1);
//noinspection resource
new MailboxReceiveOperator(context,
RelDistribution.Type.RANGE_DISTRIBUTED, 1);
}
- @Test(enabled = true)
+ @Test
public void shouldTimeout()
throws InterruptedException {
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 85893cb6b5..c01447d7cd 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -189,8 +189,8 @@ public class MailboxSendOperatorTest {
StageMetadata stageMetadata = new
StageMetadata.Builder().setWorkerMetadataList(
Collections.singletonList(new
WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID,
_server, Long.MAX_VALUE, stageMetadata, null,
- false);
+ new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID,
_server, Long.MAX_VALUE,
+ Collections.emptyMap(), stageMetadata, null);
return new MailboxSendOperator(context, _sourceOperator, _exchange, null,
null, false);
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 3bd295fe4c..cab25f9127 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -50,6 +50,7 @@ import
org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
@@ -132,7 +133,7 @@ public class OpChainTest {
Thread.sleep(100);
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
});
- OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(),
_sourceOperator, new ArrayList<>());
+ OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(),
_sourceOperator);
opChain.getStats().executing();
opChain.getRoot().nextBlock();
opChain.getStats().queued();
@@ -142,7 +143,7 @@ public class OpChainTest {
Thread.sleep(20);
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
});
- opChain = new OpChain(OperatorTestUtil.getDefaultContext(),
_sourceOperator, new ArrayList<>());
+ opChain = new OpChain(OperatorTestUtil.getDefaultContext(),
_sourceOperator);
opChain.getStats().executing();
opChain.getRoot().nextBlock();
opChain.getStats().queued();
@@ -155,7 +156,7 @@ public class OpChainTest {
OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
DummyMultiStageOperator dummyMultiStageOperator = new
DummyMultiStageOperator(context);
- OpChain opChain = new OpChain(context, dummyMultiStageOperator, new
ArrayList<>());
+ OpChain opChain = new OpChain(context, dummyMultiStageOperator);
opChain.getStats().executing();
opChain.getRoot().nextBlock();
opChain.getStats().queued();
@@ -168,8 +169,8 @@ public class OpChainTest {
opChain.getStats().getOperatorStatsMap().get(dummyMultiStageOperator.getOperatorId()).getExecutionStats();
long time =
Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName()));
- assertTrue(time >= 1000 && time <= 2000, "Expected " +
DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS
- + " to be in [1000, 2000] but found " + time);
+ assertTrue(time >= 1000 && time <= 2000,
+ "Expected " + DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS + " to
be in [1000, 2000] but found " + time);
}
@Test
@@ -177,7 +178,7 @@ public class OpChainTest {
OpChainExecutionContext context =
OperatorTestUtil.getDefaultContextWithTracingDisabled();
DummyMultiStageOperator dummyMultiStageOperator = new
DummyMultiStageOperator(context);
- OpChain opChain = new OpChain(context, dummyMultiStageOperator, new
ArrayList<>());
+ OpChain opChain = new OpChain(context, dummyMultiStageOperator);
opChain.getStats().executing();
opChain.getRoot().nextBlock();
opChain.getStats().queued();
@@ -192,13 +193,14 @@ public class OpChainTest {
int receivedStageId = 2;
int senderStageId = 1;
- OpChainExecutionContext context = new
OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, null,
true);
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(_mailboxService1, 1, senderStageId,
_serverAddress, Long.MAX_VALUE,
+ Collections.singletonMap(CommonConstants.Broker.Request.TRACE,
"true"), _receivingStageMetadata, null);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context,
dummyOperatorWaitTime);
- OpChain opChain = new OpChain(context, operators.peek(), new
ArrayList<>());
+ OpChain opChain = new OpChain(context, operators.peek());
opChain.getStats().executing();
while (!opChain.getRoot().nextBlock().isEndOfStreamBlock()) {
// Drain the opchain
@@ -206,8 +208,8 @@ public class OpChainTest {
opChain.getStats().queued();
OpChainExecutionContext secondStageContext =
- new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1,
_serverAddress,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, null,
true);
+ new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1,
_serverAddress, Long.MAX_VALUE,
+ Collections.singletonMap(CommonConstants.Broker.Request.TRACE,
"true"), _receivingStageMetadata, null);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext,
RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -231,20 +233,21 @@ public class OpChainTest {
int receivedStageId = 2;
int senderStageId = 1;
- OpChainExecutionContext context = new
OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, null,
false);
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(_mailboxService1, 1, senderStageId,
_serverAddress, Long.MAX_VALUE,
+ Collections.emptyMap(), _receivingStageMetadata, null);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context,
dummyOperatorWaitTime);
- OpChain opChain = new OpChain(context, operators.peek(), new
ArrayList<>());
+ OpChain opChain = new OpChain(context, operators.peek());
opChain.getStats().executing();
opChain.getRoot().nextBlock();
opChain.getStats().queued();
OpChainExecutionContext secondStageContext =
- new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1,
_serverAddress,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, null,
false);
+ new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1,
_serverAddress, Long.MAX_VALUE,
+ Collections.emptyMap(), _receivingStageMetadata, null);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext,
RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 55878658f7..5f139e5545 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.datablock.DataBlock;
@@ -31,6 +32,7 @@ import
org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
public class OperatorTestUtil {
@@ -75,22 +77,19 @@ public class OperatorTestUtil {
public static OpChainExecutionContext getOpChainContext(MailboxService
mailboxService,
VirtualServerAddress receiverAddress, long deadlineMs, StageMetadata
stageMetadata) {
- return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress,
deadlineMs, stageMetadata, null, false);
+ return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress,
deadlineMs, Collections.emptyMap(),
+ stageMetadata, null);
}
public static OpChainExecutionContext getDefaultContext() {
VirtualServerAddress virtualServerAddress = new
VirtualServerAddress("mock", 80, 0);
- return new OpChainExecutionContext(null, 1, 2, virtualServerAddress,
Long.MAX_VALUE, null, null, true);
+ return new OpChainExecutionContext(null, 1, 2, virtualServerAddress,
Long.MAX_VALUE,
+ Collections.singletonMap(CommonConstants.Broker.Request.TRACE,
"true"), null, null);
}
public static OpChainExecutionContext getDefaultContextWithTracingDisabled()
{
VirtualServerAddress virtualServerAddress = new
VirtualServerAddress("mock", 80, 0);
- return new OpChainExecutionContext(null, 1, 2, virtualServerAddress,
Long.MAX_VALUE, null, null, false);
- }
-
- public static OpChainExecutionContext getContext(long requestId, int stageId,
- VirtualServerAddress virtualServerAddress) {
- return new OpChainExecutionContext(null, requestId, stageId,
virtualServerAddress, Long.MAX_VALUE, null, null,
- true);
+ return new OpChainExecutionContext(null, 1, 2, virtualServerAddress,
Long.MAX_VALUE, Collections.emptyMap(), null,
+ null);
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 38940e0892..0c6e60561d 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -115,7 +115,7 @@ public class SortedMailboxReceiveOperatorTest {
@Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS,
Long.MAX_VALUE, null);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS,
Long.MAX_VALUE, _stageMetadata1);
//noinspection resource
new SortedMailboxReceiveOperator(context,
RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -125,8 +125,7 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldThrowOnEmptyCollationKey() {
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS,
System.currentTimeMillis() + 10L,
- _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS,
Long.MAX_VALUE, _stageMetadata1);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON,
DATA_SCHEMA, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), false, 1);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index b5ba82e48d..e7fa0e7db2 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -131,7 +132,7 @@ public class PipelineBreakerExecutorTest {
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan,
- System.currentTimeMillis() + 10_000L, 0, false);
+ Collections.emptyMap(), 0, Long.MAX_VALUE);
// then
// should have single PB result, receive 2 data blocks, EOS block
shouldn't be included
@@ -172,7 +173,7 @@ public class PipelineBreakerExecutorTest {
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan,
- System.currentTimeMillis() + 10_000L, 0, false);
+ Collections.emptyMap(), 0, Long.MAX_VALUE);
// then
// should have two PB result, receive 2 data blocks, one each, EOS block
shouldn't be included
@@ -200,7 +201,7 @@ public class PipelineBreakerExecutorTest {
// when
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan,
- System.currentTimeMillis() + 10_000L, 0, false);
+ Collections.emptyMap(), 0, Long.MAX_VALUE);
// then
// should return empty block list
@@ -215,23 +216,23 @@ public class PipelineBreakerExecutorTest {
@Test
public void shouldReturnErrorBlocksFailureWhenPBTimeout() {
- MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+ MailboxReceiveNode mailboxReceiveNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 1,
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
DistributedStagePlan distributedStagePlan =
- new DistributedStagePlan(0, RECEIVER_ADDRESS,
incorrectlyConfiguredMailboxNode, _stageMetadata1);
+ new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode,
_stageMetadata1);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
- Object[] row1 = new Object[]{1, 1};
- Object[] row2 = new Object[]{2, 3};
- when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA,
row1),
- OperatorTestUtil.block(DATA_SCHEMA, row2),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ CountDownLatch latch = new CountDownLatch(1);
+ when(_mailbox1.poll()).thenAnswer(invocation -> {
+ latch.await();
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ });
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan,
- System.currentTimeMillis() - 10_000L, 0, false);
+ Collections.emptyMap(), 0, System.currentTimeMillis() + 100);
// then
// should contain only failure error blocks
@@ -239,6 +240,8 @@ public class PipelineBreakerExecutorTest {
TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
Assert.assertNotNull(errorBlock);
Assert.assertTrue(errorBlock.isErrorBlock());
+
+ latch.countDown();
}
@Test
@@ -268,7 +271,7 @@ public class PipelineBreakerExecutorTest {
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan,
- System.currentTimeMillis() + 10_000L, 0, false);
+ Collections.emptyMap(), 0, Long.MAX_VALUE);
// then
// should pass when one PB returns result, the other returns empty.
@@ -307,7 +310,7 @@ public class PipelineBreakerExecutorTest {
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan,
- System.currentTimeMillis() + 10_000L, 0, false);
+ Collections.emptyMap(), 0, Long.MAX_VALUE);
// then
// should fail even if one of the 2 PB doesn't contain error block from
sender.
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 417f5b5e53..08505e6e33 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -125,7 +125,7 @@ public class QueryDispatcherTest extends QueryTestSet {
context.setRequestId(requestId);
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
try {
- _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L,
Collections.emptyMap(), null, false);
+ _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L,
Collections.emptyMap(), null);
Assert.fail("Method call above should have failed");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Error dispatching query"));
@@ -149,7 +149,7 @@ public class QueryDispatcherTest extends QueryTestSet {
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
try {
// will throw b/c mailboxService is mocked
- _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L,
Collections.emptyMap(), null, false);
+ _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L,
Collections.emptyMap(), null);
Assert.fail("Method call above should have failed");
} catch (NullPointerException e) {
// Expected
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]