This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e0d0aeb3d4 [multistage] Make join operator more resilient (#11401)
e0d0aeb3d4 is described below
commit e0d0aeb3d4d47e4e63589f7f96e194403f839075
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Aug 22 16:57:59 2023 -0700
[multistage] Make join operator more resilient (#11401)
* Support hash join right table protection
* early terminate right table operator
---
.../pinot/common/exception/QueryException.java | 4 +
.../common/utils/config/QueryOptionsUtils.java | 12 ++-
.../apache/calcite/rel/hint/PinotHintOptions.java | 10 ++
.../planner/logical/RelToPlanNodeConverter.java | 2 +-
.../query/planner/plannode/AbstractPlanNode.java | 1 +
.../pinot/query/planner/plannode/JoinNode.java | 10 +-
.../apache/pinot/query/runtime/QueryRunner.java | 37 +++++++
.../query/runtime/operator/HashJoinOperator.java | 109 +++++++++++++++++--
.../apache/pinot/query/service/QueryConfig.java | 6 ++
.../runtime/operator/HashJoinOperatorTest.java | 116 ++++++++++++++++++---
.../plan/pipeline/PipelineBreakerExecutorTest.java | 11 +-
.../apache/pinot/spi/utils/CommonConstants.java | 17 +--
12 files changed, 297 insertions(+), 38 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index a6a1a2bfa4..f5aa77b83b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -61,6 +61,7 @@ public class QueryException {
public static final int SERVER_TABLE_MISSING_ERROR_CODE = 230;
public static final int SERVER_SEGMENT_MISSING_ERROR_CODE = 235;
public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240;
+ public static final int SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE = 245;
public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250;
public static final int DATA_TABLE_SERIALIZATION_ERROR_CODE = 260;
public static final int BROKER_GATHER_ERROR_CODE = 300;
@@ -105,6 +106,8 @@ public class QueryException {
new ProcessingException(SERVER_SEGMENT_MISSING_ERROR_CODE);
public static final ProcessingException QUERY_SCHEDULING_TIMEOUT_ERROR =
new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE);
+ public static final ProcessingException SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR
=
+ new ProcessingException(SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
public static final ProcessingException EXECUTION_TIMEOUT_ERROR =
new ProcessingException(EXECUTION_TIMEOUT_ERROR_CODE);
public static final ProcessingException DATA_TABLE_SERIALIZATION_ERROR =
@@ -147,6 +150,7 @@ public class QueryException {
SERVER_TABLE_MISSING_ERROR.setMessage("ServerTableMissing");
SERVER_SEGMENT_MISSING_ERROR.setMessage("ServerSegmentMissing");
QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError");
+
SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR.setMessage("ServerResourceLimitExceededError");
EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError");
DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableSerializationError");
BROKER_GATHER_ERROR.setMessage("BrokerGatherError");
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5b239fde33..36e6baf5bc 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -36,7 +36,6 @@ public class QueryOptionsUtils {
private QueryOptionsUtils() {
}
-
private static final Map<String, String> CONFIG_RESOLVER;
private static final RuntimeException CLASS_LOAD_ERROR;
@@ -189,4 +188,15 @@ public class QueryOptionsUtils {
public static boolean shouldDropResults(Map<String, String> queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS));
}
+
+ @Nullable
+ public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) {
+ String maxRowsInJoin = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_JOIN);
+ return maxRowsInJoin != null ? Integer.parseInt(maxRowsInJoin) : null;
+ }
+
+ @Nullable
+ public static String getJoinOverflowMode(Map<String, String> queryOptions) {
+ return queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
index 9ffeaf8f8c..7abe9a6f06 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
@@ -65,6 +65,16 @@ public class PinotHintOptions {
public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
+ /**
+ * Max rows allowed to build the right table hash collection.
+ */
+ public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join";
+ /**
+ * Mode when join overflow happens, supported values: THROW or BREAK.
+ * THROW(default): Break right table build process, and throw exception,
no JOIN with left table performed.
+ * BREAK: Break right table build process, continue to perform JOIN
operation, results might be partial.
+ */
+ public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";
}
public static class TableHintOptions {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index b0b7545677..4ff7014b49 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -186,7 +186,7 @@ public final class RelToPlanNodeConverter {
List<RexExpression> joinClause =
joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
return new JoinNode(currentStageId, toDataSchema(node.getRowType()),
toDataSchema(node.getLeft().getRowType()),
- toDataSchema(node.getRight().getRowType()), joinType, joinKeys,
joinClause);
+ toDataSchema(node.getRight().getRowType()), joinType, joinKeys,
joinClause, node.getHints());
}
private static DataSchema toDataSchema(RelDataType rowType) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
index c4349097e4..f3ca4e705e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
@@ -89,6 +89,7 @@ public abstract class AbstractPlanNode implements PlanNode,
ProtoSerializable {
public static class NodeHint {
@ProtoProperties
public Map<String, Map<String, String>> _hintOptions;
+
public NodeHint() {
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
index 6d089c6239..fe55facf01 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.plannode;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.RelHint;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
@@ -36,6 +37,8 @@ public class JoinNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _joinClause;
@ProtoProperties
+ private NodeHint _joinHints;
+ @ProtoProperties
private List<String> _leftColumnNames;
@ProtoProperties
private List<String> _rightColumnNames;
@@ -45,13 +48,14 @@ public class JoinNode extends AbstractPlanNode {
}
public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema
leftSchema, DataSchema rightSchema,
- JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression>
joinClause) {
+ JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression>
joinClause, List<RelHint> joinHints) {
super(planFragmentId, dataSchema);
_leftColumnNames = Arrays.asList(leftSchema.getColumnNames());
_rightColumnNames = Arrays.asList(rightSchema.getColumnNames());
_joinRelType = joinRelType;
_joinKeys = joinKeys;
_joinClause = joinClause;
+ _joinHints = new NodeHint(joinHints);
}
public JoinRelType getJoinRelType() {
@@ -66,6 +70,10 @@ public class JoinNode extends AbstractPlanNode {
return _joinClause;
}
+ public NodeHint getJoinHints() {
+ return _joinHints;
+ }
+
public List<String> getLeftColumnNames() {
return _leftColumnNames;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1b33c8bab3..9e109431df 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -25,11 +25,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
@@ -78,6 +80,12 @@ public class QueryRunner {
private OpChainSchedulerService _scheduler;
+ // Join Overflow configs
+ @Nullable
+ private Integer _maxRowsInJoin;
+ @Nullable
+ private String _joinOverflowMode;
+
/**
* Initializes the query executor.
* <p>Should be called only once and before calling any other method.
@@ -89,6 +97,11 @@ public class QueryRunner {
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
_port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT,
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
_helixManager = helixManager;
+ // Set Join Overflow configs
+ _joinOverflowMode =
config.getProperty(QueryConfig.KEY_OF_JOIN_OVERFLOW_MODE);
+ _maxRowsInJoin = config.containsKey(QueryConfig.KEY_OF_MAX_ROWS_IN_JOIN) ?
Integer.parseInt(
+ config.getProperty(QueryConfig.KEY_OF_MAX_ROWS_IN_JOIN)) : null;
+
try {
//TODO: make this configurable
_opChainExecutor = ExecutorServiceUtils.create(config,
"pinot.query.runner.opchain",
@@ -133,6 +146,9 @@ public class QueryRunner {
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan, deadlineMs, requestId,
isTraceEnabled);
+ // Set Join Overflow configs to StageMetadata from request
+ setJoinOverflowConfigs(distributedStagePlan, requestMetadataMap);
+
// run OpChain
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
try {
@@ -157,6 +173,27 @@ public class QueryRunner {
}
}
+ private void setJoinOverflowConfigs(DistributedStagePlan
distributedStagePlan,
+ Map<String, String> requestMetadataMap) {
+ String joinOverflowMode =
QueryOptionsUtils.getJoinOverflowMode(requestMetadataMap);
+ if (joinOverflowMode != null) {
+ distributedStagePlan.getStageMetadata().getCustomProperties()
+
.put(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE,
joinOverflowMode);
+ } else if (_joinOverflowMode != null) {
+ distributedStagePlan.getStageMetadata().getCustomProperties()
+
.put(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE,
_joinOverflowMode);
+ }
+
+ Integer maxRowsInJoin =
QueryOptionsUtils.getMaxRowsInJoin(requestMetadataMap);
+ if (maxRowsInJoin != null) {
+ distributedStagePlan.getStageMetadata().getCustomProperties()
+ .put(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN,
String.valueOf(maxRowsInJoin));
+ } else if (_maxRowsInJoin != null) {
+ distributedStagePlan.getStageMetadata().getCustomProperties()
+ .put(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN,
String.valueOf(_maxRowsInJoin));
+ }
+ }
+
public void cancel(long requestId) {
_scheduler.cancel(requestId);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 7f68116586..c12a67ecf1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -29,17 +29,23 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
/**
@@ -55,9 +61,12 @@ import
org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
* The output is in the format of [left_row, right_row]
*/
// TODO: Move inequi out of hashjoin.
(https://github.com/apache/pinot/issues/9728)
+// TODO: Support memory size based resource limit.
public class HashJoinOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "HASH_JOIN";
private static final int INITIAL_HEURISTIC_SIZE = 16;
+ private static final int DEFAULT_MAX_ROWS_IN_JOIN = 1024 * 1024; // 2^20,
around 1MM rows
+ private static final JoinOverFlowMode DEFAULT_JOIN_OVERFLOW_MODE =
JoinOverFlowMode.THROW;
private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(
JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT,
JoinRelType.FULL, JoinRelType.SEMI, JoinRelType.ANTI);
@@ -83,14 +92,32 @@ public class HashJoinOperator extends MultiStageOperator {
// TODO: Remove this special handling by fixing data block EOS abstraction
or operator's invariant.
private boolean _isTerminated;
private TransferableBlock _upstreamErrorBlock;
- private KeySelector<Object[], Object[]> _leftKeySelector;
- private KeySelector<Object[], Object[]> _rightKeySelector;
+ private final KeySelector<Object[], Object[]> _leftKeySelector;
+ private final KeySelector<Object[], Object[]> _rightKeySelector;
+
+ // Below are specific parameters to protect the hash table from growing too
large.
+ // Once the hash table reaches the limit, we will throw exception or break
the right table build process.
+ /**
+ * Max rows allowed to build the right table hash collection.
+ */
+ private final int _maxRowsInHashTable;
+ /**
+ * Mode when join overflow happens, supported values: THROW or BREAK.
+ * THROW(default): Break right table build process, and throw exception,
no JOIN with left table performed.
+ * BREAK: Break right table build process, continue to perform JOIN
operation, results might be partial.
+ */
+ private final JoinOverFlowMode _joinOverflowMode;
+
+ private int _currentRowsInHashTable = 0;
+ private ProcessingException _resourceLimitExceededException = null;
public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator
leftTableOperator,
MultiStageOperator rightTableOperator, DataSchema leftSchema, JoinNode
node) {
super(context);
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
"Join type: " + node.getJoinRelType() + " is not supported!");
+ _maxRowsInHashTable = getMaxRowInJoin(context.getStageMetadata(),
node.getJoinHints());
+ _joinOverflowMode = getJoinOverflowMode(context.getStageMetadata(),
node.getJoinHints());
_joinType = node.getJoinRelType();
_leftKeySelector = node.getJoinKeys().getLeftJoinKeySelector();
_rightKeySelector = node.getJoinKeys().getRightJoinKeySelector();
@@ -119,6 +146,38 @@ public class HashJoinOperator extends MultiStageOperator {
_upstreamErrorBlock = null;
}
+ private JoinOverFlowMode getJoinOverflowMode(StageMetadata stageMetadata,
AbstractPlanNode.NodeHint joinHints) {
+ if (joinHints != null && joinHints._hintOptions != null &&
joinHints._hintOptions
+ .containsKey(PinotHintOptions.JOIN_HINT_OPTIONS) &&
joinHints._hintOptions.get(
+ PinotHintOptions.JOIN_HINT_OPTIONS)
+ .containsKey(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE)) {
+ return
JoinOverFlowMode.valueOf(joinHints._hintOptions.get(PinotHintOptions.JOIN_HINT_OPTIONS)
+ .get(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE));
+ }
+ if (stageMetadata != null && stageMetadata.getCustomProperties() != null
&& stageMetadata.getCustomProperties()
+
.containsKey(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE))
{
+ return JoinOverFlowMode.valueOf(
+
stageMetadata.getCustomProperties().get(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE));
+ }
+ return DEFAULT_JOIN_OVERFLOW_MODE;
+ }
+
+ private int getMaxRowInJoin(StageMetadata stageMetadata,
AbstractPlanNode.NodeHint joinHints) {
+ if (joinHints != null && joinHints._hintOptions != null &&
joinHints._hintOptions
+ .containsKey(PinotHintOptions.JOIN_HINT_OPTIONS) &&
joinHints._hintOptions.get(
+ PinotHintOptions.JOIN_HINT_OPTIONS)
+ .containsKey(PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN)) {
+ return
Integer.parseInt(joinHints._hintOptions.get(PinotHintOptions.JOIN_HINT_OPTIONS)
+ .get(PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN));
+ }
+ if (stageMetadata != null && stageMetadata.getCustomProperties() != null
&& stageMetadata.getCustomProperties()
+
.containsKey(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN)) {
+ return Integer.parseInt(
+
stageMetadata.getCustomProperties().get(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN));
+ }
+ return DEFAULT_MAX_ROWS_IN_JOIN;
+ }
+
// TODO: Separate left and right table operator.
@Override
public List<MultiStageOperator> getChildOperators() {
@@ -135,7 +194,7 @@ public class HashJoinOperator extends MultiStageOperator {
protected TransferableBlock getNextBlock() {
try {
if (_isTerminated) {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ return
setPartialResultExceptionToBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock());
}
if (!_isHashTableBuilt) {
// Build JOIN hash table
@@ -146,26 +205,48 @@ public class HashJoinOperator extends MultiStageOperator {
}
TransferableBlock leftBlock = _leftTableOperator.nextBlock();
// JOIN each left block with the right block.
- return buildJoinedDataBlock(leftBlock);
+ return setPartialResultExceptionToBlock(buildJoinedDataBlock(leftBlock));
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
}
}
- private void buildBroadcastHashTable() {
+ private void buildBroadcastHashTable()
+ throws ProcessingException {
TransferableBlock rightBlock = _rightTableOperator.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
List<Object[]> container = rightBlock.getContainer();
+ // Row based overflow check.
+ if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) {
+ _resourceLimitExceededException =
+ new
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+ _resourceLimitExceededException.setMessage(
+ "Cannot build in memory hash table for join operator, reach number
of rows limit: "
+ + _maxRowsInHashTable);
+ if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+ throw _resourceLimitExceededException;
+ } else {
+ // Just fill up the buffer.
+ int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable;
+ container = container.subList(0, remainingRows);
+ }
+ }
// put all the rows into corresponding hash collections keyed by the key
selector function.
for (Object[] row : container) {
ArrayList<Object[]> hashCollection =
_broadcastRightTable.computeIfAbsent(
new Key(_rightKeySelector.getKey(row)), k -> new
ArrayList<>(INITIAL_HEURISTIC_SIZE));
int size = hashCollection.size();
- if ((size & size - 1) == 0 && size < Integer.MAX_VALUE / 2) { // is
power of 2
- hashCollection.ensureCapacity(size << 1);
+ if ((size & size - 1) == 0 && size < _maxRowsInHashTable && size <
Integer.MAX_VALUE / 2) { // is power of 2
+ hashCollection.ensureCapacity(Math.min(size << 1,
_maxRowsInHashTable));
}
hashCollection.add(row);
}
+ _currentRowsInHashTable += container.size();
+ if (_currentRowsInHashTable == _maxRowsInHashTable) {
+ // Early terminate right table operator.
+ _rightTableOperator.close();
+ break;
+ }
rightBlock = _rightTableOperator.nextBlock();
}
if (rightBlock.isErrorBlock()) {
@@ -175,8 +256,7 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
- private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock)
- throws Exception {
+ private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) {
if (leftBlock.isErrorBlock()) {
_upstreamErrorBlock = leftBlock;
return _upstreamErrorBlock;
@@ -228,6 +308,13 @@ public class HashJoinOperator extends MultiStageOperator {
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
+ private TransferableBlock setPartialResultExceptionToBlock(TransferableBlock
block) {
+ if (_resourceLimitExceededException != null) {
+ block.getDataBlock().addException(_resourceLimitExceededException);
+ }
+ return block;
+ }
+
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
{
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());
@@ -321,4 +408,8 @@ public class HashJoinOperator extends MultiStageOperator {
private boolean needUnmatchedLeftRows() {
return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
}
+
+ enum JoinOverFlowMode {
+ THROW, BREAK
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index ac18ad9138..925f544ee6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -58,4 +58,10 @@ public class QueryConfig {
*/
public static final String KEY_OF_SERVER_RESPONSE_STATUS_ERROR = "ERROR";
public static final String KEY_OF_SERVER_RESPONSE_STATUS_OK = "OK";
+
+ /**
+ * Configuration for join overflow.
+ */
+ public static final String KEY_OF_JOIN_OVERFLOW_MODE =
"pinot.query.join.overflow.mode";
+ public static final String KEY_OF_MAX_ROWS_IN_JOIN =
"pinot.query.join.max.rows";
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index cc4661ca21..730ccf9f67 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -19,10 +19,15 @@
package org.apache.pinot.query.runtime.operator;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.exception.QueryException;
@@ -73,6 +78,12 @@ public class HashJoinOperatorTest {
return new JoinNode.JoinKeys(leftSelect, rightSelect);
}
+ private static List<RelHint> getJoinHints(Map<String, String> hintsMap) {
+ RelHint.Builder relHintBuilder =
RelHint.builder(PinotHintOptions.JOIN_HINT_OPTIONS);
+ hintsMap.forEach(relHintBuilder::hintOption);
+ return ImmutableList.of(relHintBuilder.build());
+ }
+
@Test
public void shouldHandleHashJoinKeyCollisionInnerJoin() {
DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
@@ -94,7 +105,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
Collections.emptyList());
HashJoinOperator joinOnString =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -129,7 +140,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
Collections.emptyList());
HashJoinOperator joinOnInt =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = joinOnInt.nextBlock();
@@ -161,7 +172,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses,
Collections.emptyList());
HashJoinOperator joinOnInt =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = joinOnInt.nextBlock();
@@ -200,7 +211,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.LEFT,
- getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -232,7 +243,7 @@ public class HashJoinOperatorTest {
});
List<RexExpression> joinClauses = new ArrayList<>();
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -261,7 +272,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.LEFT,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -294,7 +305,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -330,7 +341,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -366,7 +377,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -398,7 +409,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.RIGHT,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
Collections.emptyList());
HashJoinOperator joinOnNum =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = joinOnNum.nextBlock();
@@ -439,7 +450,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.SEMI,
- getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -473,7 +484,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.FULL,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -517,7 +528,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.ANTI,
- getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -550,7 +561,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -581,7 +592,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
Collections.emptyList());
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -590,5 +601,80 @@ public class HashJoinOperatorTest {
Assert.assertTrue(result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE)
.contains("testInnerJoinLeftError"));
}
+
+ @Test
+ public void shouldPropagateJoinLimitError() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ Map<String, String> hintsMap = ImmutableMap.of(
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "THROW",
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "1"
+ );
+ JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
getJoinHints(hintsMap));
+ HashJoinOperator join =
+ new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
+
+ TransferableBlock result = join.nextBlock();
+ Assert.assertTrue(result.isErrorBlock());
+ Assert.assertTrue(
+
result.getDataBlock().getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+ .contains("reach number of rows limit"));
+ }
+
+ @Test
+ public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimit() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ Map<String, String> hintsMap = ImmutableMap.of(
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK",
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "1"
+ );
+ JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
getJoinHints(hintsMap));
+ HashJoinOperator join =
+ new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
+
+ TransferableBlock result = join.nextBlock();
+ Assert.assertFalse(result.isErrorBlock());
+ Assert.assertEquals(result.getNumRows(), 1);
+ Assert.assertTrue(
+
result.getDataBlock().getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+ .contains("reach number of rows limit"));
+ }
}
// TODO: Add more inequi join tests.
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 5c8a132954..7f33dc45a4 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.plan.pipeline;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -91,7 +92,6 @@ public class PipelineBreakerExecutorTest {
ImmutableList.of(_server), ImmutableMap.of()))
.build()).collect(Collectors.toList())).build();
-
@AfterClass
public void tearDownClass() {
ExecutorServiceUtils.close(_executor);
@@ -152,7 +152,8 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode mailboxReceiveNode2 =
new MailboxReceiveNode(0, DATA_SCHEMA, 2,
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null);
+ JoinNode joinNode =
+ new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null, Collections.emptyList());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(mailboxReceiveNode2);
DistributedStagePlan distributedStagePlan =
@@ -247,7 +248,8 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode incorrectlyConfiguredMailboxNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 3,
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null);
+ JoinNode joinNode =
+ new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null, Collections.emptyList());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(incorrectlyConfiguredMailboxNode);
DistributedStagePlan distributedStagePlan =
@@ -285,7 +287,8 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode incorrectlyConfiguredMailboxNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 2,
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null);
+ JoinNode joinNode =
+ new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null, Collections.emptyList());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(incorrectlyConfiguredMailboxNode);
DistributedStagePlan distributedStagePlan =
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 775ff1fc58..7371510ee5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -96,7 +96,6 @@ public class CommonConstants {
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
-
public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
// Whether to rewrite DistinctCount to DistinctCountBitmap
@@ -250,19 +249,19 @@ public class CommonConstants {
// By default, Jersey uses the default unbounded thread pool to process
queries.
// By enabling it, BrokerManagedAsyncExecutorProvider will be used to
create a bounded thread pool.
public static final String
CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR =
- "pinot.broker.enable.bounded.jersey.threadpool.executor";
+ "pinot.broker.enable.bounded.jersey.threadpool.executor";
public static final boolean
DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR = false;
// Default capacities for the bounded thread pool
public static final String
CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
- "pinot.broker.jersey.threadpool.executor.max.pool.size";
+ "pinot.broker.jersey.threadpool.executor.max.pool.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
- Runtime.getRuntime().availableProcessors() * 2;
+ Runtime.getRuntime().availableProcessors() * 2;
public static final String
CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
- "pinot.broker.jersey.threadpool.executor.core.pool.size";
+ "pinot.broker.jersey.threadpool.executor.core.pool.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
- Runtime.getRuntime().availableProcessors() * 2;
+ Runtime.getRuntime().availableProcessors() * 2;
public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE
=
- "pinot.broker.jersey.threadpool.executor.queue.size";
+ "pinot.broker.jersey.threadpool.executor.queue.size";
public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE =
Integer.MAX_VALUE;
// used for SQL GROUP BY during broker reduce
@@ -350,6 +349,10 @@ public class CommonConstants {
public static final String DROP_RESULTS = "dropResults";
+ // Handle JOIN Overflow
+ public static final String MAX_ROWS_IN_JOIN = "maxRowsInJoin";
+ public static final String JOIN_OVERFLOW_MODE = "joinOverflowMode";
+
// TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
@Deprecated
public static final String PRESERVE_TYPE = "preserveType";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]