This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 767cb9b8a1 ASOF JOIN (#15630)
767cb9b8a1 is described below
commit 767cb9b8a1fe7386e00f11705a3430b538bd274d
Author: Yash Mayya <[email protected]>
AuthorDate: Wed May 28 05:42:46 2025 +0100
ASOF JOIN (#15630)
---
pinot-common/src/main/proto/plan.proto | 4 +
.../rel/rules/PinotJoinExchangeNodeInsertRule.java | 11 +-
.../planner/logical/PlanNodeToRelConverter.java | 8 +-
.../planner/logical/RelToPlanNodeConverter.java | 49 +++
.../pinot/query/planner/plannode/JoinNode.java | 26 +-
.../query/planner/serde/PlanNodeDeserializer.java | 10 +-
.../query/planner/serde/PlanNodeSerializer.java | 17 +-
.../query/runtime/operator/AsofJoinOperator.java | 171 +++++++++
.../query/runtime/operator/BaseJoinOperator.java | 46 ++-
.../query/runtime/operator/HashJoinOperator.java | 46 +--
.../runtime/operator/NonEquiJoinOperator.java | 44 +--
.../query/runtime/plan/PlanNodeToOpChain.java | 25 +-
.../src/test/resources/queries/AsOfJoin.json | 418 +++++++++++++++++++++
13 files changed, 776 insertions(+), 99 deletions(-)
diff --git a/pinot-common/src/main/proto/plan.proto
b/pinot-common/src/main/proto/plan.proto
index 5e3d733e45..4e4fbb1684 100644
--- a/pinot-common/src/main/proto/plan.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -84,11 +84,14 @@ enum JoinType {
FULL = 3;
SEMI = 4;
ANTI = 5;
+ ASOF = 6;
+ LEFT_ASOF = 7;
}
enum JoinStrategy {
HASH = 0;
LOOKUP = 1;
+ AS_OF = 2;
}
message JoinNode {
@@ -97,6 +100,7 @@ message JoinNode {
repeated int32 rightKeys = 3;
repeated Expression nonEquiConditions = 4;
JoinStrategy joinStrategy = 5;
+ Expression matchCondition = 6;
}
enum ExchangeType {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 5ecbedb8a5..6ce1c83767 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.logical.LogicalAsofJoin;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
@@ -97,8 +98,14 @@ public class PinotJoinExchangeNodeInsertRule extends
RelOptRule {
}
// TODO: Consider creating different JOIN Rel for each join strategy
- call.transformTo(join.copy(join.getTraitSet(), join.getCondition(),
newLeft, newRight, join.getJoinType(),
- join.isSemiJoinDone()));
+ if (join instanceof LogicalAsofJoin) {
+ // Note that we don't use the MATCH_CONDITION in an ASOF JOIN to
determine the distribution, only the join keys
+ // in the ON clause of the ASOF JOIN.
+ call.transformTo(((LogicalAsofJoin) join).copy(join.getTraitSet(),
List.of(newLeft, newRight)));
+ } else {
+ call.transformTo(join.copy(join.getTraitSet(), join.getCondition(),
newLeft, newRight, join.getJoinType(),
+ join.isSemiJoinDone()));
+ }
}
private static PinotLogicalExchange
createExchangeForLookupJoin(PinotHintOptions.DistributionType distributionType,
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
index f19e9c4e6a..9ead0c3c62 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
@@ -30,6 +30,7 @@ import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalIntersect;
@@ -154,7 +155,12 @@ public final class PlanNodeToRelConverter {
conditions.add(RexExpressionUtils.toRexNode(_builder,
nonEquiCondition));
}
- _builder.join(node.getJoinType(), conditions);
+ if (node.getJoinType() == JoinRelType.ASOF || node.getJoinType() ==
JoinRelType.LEFT_ASOF) {
+ RexNode matchCondition = RexExpressionUtils.toRexNode(_builder,
node.getMatchCondition());
+ _builder.asofJoin(node.getJoinType(), _builder.and(conditions),
matchCondition);
+ } else {
+ _builder.join(node.getJoinType(), conditions);
+ }
} catch (RuntimeException e) {
LOGGER.warn("Failed to convert join node: {}", node, e);
_builder.push(new PinotExplainedRelNode(_builder.getCluster(),
"UnknownJoin", Collections.emptyMap(),
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index b0db847d5a..c9526fa488 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -37,6 +37,7 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalAsofJoin;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
@@ -123,6 +124,13 @@ public final class RelToPlanNodeConverter {
_joinFound = true;
}
result = convertLogicalJoin((LogicalJoin) node);
+ } else if (node instanceof LogicalAsofJoin) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, 1);
+ if (!_joinFound) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS,
1);
+ _joinFound = true;
+ }
+ result = convertLogicalAsofJoin((LogicalAsofJoin) node);
} else if (node instanceof LogicalWindow) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, 1);
if (!_windowFunctionFound) {
@@ -358,6 +366,47 @@ public final class RelToPlanNodeConverter {
joinStrategy);
}
+ private JoinNode convertLogicalAsofJoin(LogicalAsofJoin join) {
+ JoinInfo joinInfo = join.analyzeCondition();
+ DataSchema dataSchema = toDataSchema(join.getRowType());
+ List<PlanNode> inputs = convertInputs(join.getInputs());
+ JoinRelType joinType = join.getJoinType();
+
+ // Basic validations
+ Preconditions.checkState(inputs.size() == 2, "Join should have exactly 2
inputs, got: %s", inputs.size());
+ Preconditions.checkState(joinInfo.nonEquiConditions.isEmpty(),
+ "Non-equi conditions are not supported for ASOF join, got: %s",
joinInfo.nonEquiConditions);
+ Preconditions.checkState(joinType == JoinRelType.ASOF || joinType ==
JoinRelType.LEFT_ASOF,
+ "Join type should be ASOF or LEFT_ASOF, got: %s", joinType);
+
+ PlanNode left = inputs.get(0);
+ PlanNode right = inputs.get(1);
+ int numLeftColumns = left.getDataSchema().size();
+ int numResultColumns = dataSchema.size();
+ int numRightColumns = right.getDataSchema().size();
+ Preconditions.checkState(numLeftColumns + numRightColumns ==
numResultColumns,
+ "Invalid number of columns for join type: %s, left: %s, right: %s,
result: %s", joinType, numLeftColumns,
+ numRightColumns, numResultColumns);
+
+ RexExpression matchCondition =
RexExpressionUtils.fromRexNode(join.getMatchCondition());
+ Preconditions.checkState(matchCondition != null, "ASOF_JOIN must have a
match condition");
+ Preconditions.checkState(matchCondition instanceof
RexExpression.FunctionCall,
+ "ASOF JOIN only supports function call match condition, got: %s",
matchCondition);
+
+ List<RexExpression> matchKeys = ((RexExpression.FunctionCall)
matchCondition).getFunctionOperands();
+ // TODO: Add support for MATCH_CONDITION containing two columns of
different types. In that case, there would be
+ // a CAST RexExpression.FunctionCall on top of the
RexExpression.InputRef, and the physical ASOF join operator
+ // can't currently handle that.
+ Preconditions.checkState(
+ matchKeys.size() == 2 && matchKeys.get(0) instanceof
RexExpression.InputRef
+ && matchKeys.get(1) instanceof RexExpression.InputRef,
+ "ASOF_JOIN only supports match conditions with a comparison between
two columns of the same type");
+
+ return new JoinNode(DEFAULT_STAGE_ID, dataSchema,
NodeHint.fromRelHints(join.getHints()), inputs, joinType,
+ joinInfo.leftKeys, joinInfo.rightKeys,
RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions),
+ JoinNode.JoinStrategy.ASOF,
RexExpressionUtils.fromRexNode(join.getMatchCondition()));
+ }
+
private List<PlanNode> convertInputs(List<RelNode> inputs) {
// NOTE: Inputs can be modified in place. Do not create immutable List
here.
int numInputs = inputs.size();
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
index c07392c298..83fc50d37f 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.plannode;
import java.util.List;
import java.util.Objects;
+import javax.annotation.Nullable;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
@@ -31,16 +32,25 @@ public class JoinNode extends BasePlanNode {
private final List<Integer> _rightKeys;
private final List<RexExpression> _nonEquiConditions;
private final JoinStrategy _joinStrategy;
+ @Nullable
+ private final RexExpression _matchCondition;
public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint,
List<PlanNode> inputs, JoinRelType joinType,
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression>
nonEquiConditions,
JoinStrategy joinStrategy) {
+ this(stageId, dataSchema, nodeHint, inputs, joinType, leftKeys, rightKeys,
nonEquiConditions, joinStrategy, null);
+ }
+
+ public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint,
List<PlanNode> inputs, JoinRelType joinType,
+ List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression>
nonEquiConditions,
+ JoinStrategy joinStrategy, RexExpression matchCondition) {
super(stageId, dataSchema, nodeHint, inputs);
_joinType = joinType;
_leftKeys = leftKeys;
_rightKeys = rightKeys;
_nonEquiConditions = nonEquiConditions;
_joinStrategy = joinStrategy;
+ _matchCondition = matchCondition;
}
public JoinRelType getJoinType() {
@@ -63,9 +73,14 @@ public class JoinNode extends BasePlanNode {
return _joinStrategy;
}
+ @Nullable
+ public RexExpression getMatchCondition() {
+ return _matchCondition;
+ }
+
@Override
public String explain() {
- return "JOIN";
+ return _joinStrategy == JoinStrategy.ASOF ? "ASOF JOIN" : "JOIN";
}
@Override
@@ -76,7 +91,7 @@ public class JoinNode extends BasePlanNode {
@Override
public PlanNode withInputs(List<PlanNode> inputs) {
return new JoinNode(_stageId, _dataSchema, _nodeHint, inputs, _joinType,
_leftKeys, _rightKeys, _nonEquiConditions,
- _joinStrategy);
+ _joinStrategy, _matchCondition);
}
@Override
@@ -93,15 +108,16 @@ public class JoinNode extends BasePlanNode {
JoinNode joinNode = (JoinNode) o;
return _joinType == joinNode._joinType && Objects.equals(_leftKeys,
joinNode._leftKeys) && Objects.equals(
_rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions,
joinNode._nonEquiConditions)
- && _joinStrategy == joinNode._joinStrategy;
+ && _joinStrategy == joinNode._joinStrategy &&
Objects.equals(_matchCondition, joinNode._matchCondition);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys,
_nonEquiConditions, _joinStrategy);
+ return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys,
_nonEquiConditions, _joinStrategy,
+ _matchCondition);
}
public enum JoinStrategy {
- HASH, LOOKUP
+ HASH, LOOKUP, ASOF
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
index 7ea9d0d16b..9cf5cd8000 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
@@ -102,7 +102,9 @@ public class PlanNodeDeserializer {
return new JoinNode(protoNode.getStageId(), extractDataSchema(protoNode),
extractNodeHint(protoNode),
extractInputs(protoNode),
convertJoinType(protoJoinNode.getJoinType()), protoJoinNode.getLeftKeysList(),
protoJoinNode.getRightKeysList(),
convertExpressions(protoJoinNode.getNonEquiConditionsList()),
- convertJoinStrategy(protoJoinNode.getJoinStrategy()));
+ convertJoinStrategy(protoJoinNode.getJoinStrategy()),
+ protoJoinNode.hasMatchCondition() ?
ProtoExpressionToRexExpression.convertExpression(
+ protoJoinNode.getMatchCondition()) : null);
}
private static MailboxReceiveNode
deserializeMailboxReceiveNode(Plan.PlanNode protoNode) {
@@ -284,6 +286,10 @@ public class PlanNodeDeserializer {
return JoinRelType.SEMI;
case ANTI:
return JoinRelType.ANTI;
+ case ASOF:
+ return JoinRelType.ASOF;
+ case LEFT_ASOF:
+ return JoinRelType.LEFT_ASOF;
default:
throw new IllegalStateException("Unsupported JoinType: " + joinType);
}
@@ -295,6 +301,8 @@ public class PlanNodeDeserializer {
return JoinNode.JoinStrategy.HASH;
case LOOKUP:
return JoinNode.JoinStrategy.LOOKUP;
+ case AS_OF:
+ return JoinNode.JoinStrategy.ASOF;
default:
throw new IllegalStateException("Unsupported JoinStrategy: " +
joinStrategy);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
index bea6042d02..359b3895ee 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
@@ -116,14 +116,17 @@ public class PlanNodeSerializer {
@Override
public Void visitJoin(JoinNode node, Plan.PlanNode.Builder builder) {
- Plan.JoinNode joinNode = Plan.JoinNode.newBuilder()
+ Plan.JoinNode.Builder joinNode = Plan.JoinNode.newBuilder()
.setJoinType(convertJoinType(node.getJoinType()))
.addAllLeftKeys(node.getLeftKeys())
.addAllRightKeys(node.getRightKeys())
.addAllNonEquiConditions(convertExpressions(node.getNonEquiConditions()))
- .setJoinStrategy(convertJoinStrategy(node.getJoinStrategy()))
- .build();
- builder.setJoinNode(joinNode);
+ .setJoinStrategy(convertJoinStrategy(node.getJoinStrategy()));
+
+ if (node.getMatchCondition() != null) {
+
joinNode.setMatchCondition(RexExpressionToProtoExpression.convertExpression(node.getMatchCondition()));
+ }
+ builder.setJoinNode(joinNode.build());
return null;
}
@@ -289,6 +292,10 @@ public class PlanNodeSerializer {
return Plan.JoinType.SEMI;
case ANTI:
return Plan.JoinType.ANTI;
+ case ASOF:
+ return Plan.JoinType.ASOF;
+ case LEFT_ASOF:
+ return Plan.JoinType.LEFT_ASOF;
default:
throw new IllegalStateException("Unsupported JoinRelType: " +
joinType);
}
@@ -300,6 +307,8 @@ public class PlanNodeSerializer {
return Plan.JoinStrategy.HASH;
case LOOKUP:
return Plan.JoinStrategy.LOOKUP;
+ case ASOF:
+ return Plan.JoinStrategy.AS_OF;
default:
throw new IllegalStateException("Unsupported JoinStrategy: " +
joinStrategy);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
new file mode 100644
index 0000000000..5c98a9ce29
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+public class AsofJoinOperator extends BaseJoinOperator {
+ private static final String EXPLAIN_NAME = "ASOF_JOIN";
+
+ // The right table is a map from the hash key (columns in the ON join
condition) to a sorted map of match key
+ // (column in the MATCH_CONDITION) to rows.
+ private final Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable;
+ private final KeySelector<?> _leftKeySelector;
+ private final KeySelector<?> _rightKeySelector;
+ private final MatchConditionType _matchConditionType;
+ private final int _leftMatchKeyIndex;
+ private final int _rightMatchKeyIndex;
+
+ public AsofJoinOperator(OpChainExecutionContext context, MultiStageOperator
leftInput, DataSchema leftSchema,
+ MultiStageOperator rightInput, JoinNode node) {
+ super(context, leftInput, leftSchema, rightInput, node);
+ _rightTable = new HashMap<>();
+ _leftKeySelector = KeySelectorFactory.getKeySelector(node.getLeftKeys());
+ _rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys());
+
+ RexExpression matchCondition = node.getMatchCondition();
+ try {
+ _matchConditionType =
+ MatchConditionType.valueOf(((RexExpression.FunctionCall)
matchCondition).getFunctionName().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new UnsupportedOperationException("Unsupported match condition: "
+ matchCondition);
+ }
+
+ List<RexExpression> matchKeys = ((RexExpression.FunctionCall)
matchCondition).getFunctionOperands();
+ _leftMatchKeyIndex = ((RexExpression.InputRef)
matchKeys.get(0)).getIndex();
+ _rightMatchKeyIndex = ((RexExpression.InputRef)
matchKeys.get(1)).getIndex() - leftSchema.size();
+ }
+
+ @Override
+ protected void addRowsToRightTable(List<Object[]> rows) {
+ for (Object[] row : rows) {
+ Comparable<?> matchKey = (Comparable<?>) row[_rightMatchKeyIndex];
+ if (matchKey == null) {
+ // Skip rows with null match keys because they cannot be matched with
any left rows
+ continue;
+ }
+ Object hashKey = _rightKeySelector.getKey(row);
+ // Results need not be deterministic if there are "ties" based on the
match key in an ASOF JOIN, so it's okay to
+ // only keep the last row with the same hash key and match key.
+ _rightTable.computeIfAbsent(hashKey, k -> new TreeMap<>()).put(matchKey,
row);
+ }
+ }
+
+ @Override
+ protected void finishBuildingRightTable() {
+ // no-op
+ }
+
+ @Override
+ protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) {
+ List<Object[]> rows = new ArrayList<>();
+ for (Object[] leftRow : leftBlock.asRowHeap().getRows()) {
+ Comparable<?> matchKey = (Comparable<?>) leftRow[_leftMatchKeyIndex];
+ if (matchKey == null) {
+ // Rows with null match keys cannot be matched with any right rows
+ if (needUnmatchedLeftRows()) {
+ rows.add(joinRow(leftRow, null));
+ }
+ continue;
+ }
+ Object hashKey = _leftKeySelector.getKey(leftRow);
+ NavigableMap<Comparable<?>, Object[]> rightRows =
_rightTable.get(hashKey);
+ if (rightRows == null) {
+ if (needUnmatchedLeftRows()) {
+ rows.add(joinRow(leftRow, null));
+ }
+ } else {
+ Object[] rightRow = closestMatch(matchKey, rightRows);
+ if (rightRow == null) {
+ if (needUnmatchedLeftRows()) {
+ rows.add(joinRow(leftRow, null));
+ }
+ } else {
+ rows.add(joinRow(leftRow, rightRow));
+ }
+ }
+ }
+ return rows;
+ }
+
+ @Nullable
+ private Object[] closestMatch(Comparable<?> matchKey,
NavigableMap<Comparable<?>, Object[]> rightRows) {
+ switch (_matchConditionType) {
+ case GREATER_THAN: {
+ // Find the closest right row that is less than the left row (compared
by their match keys from the match
+ // condition).
+ Map.Entry<Comparable<?>, Object[]> closestMatch =
rightRows.lowerEntry(matchKey);
+ return closestMatch == null ? null : closestMatch.getValue();
+ }
+ case GREATER_THAN_OR_EQUAL: {
+ // Find the closest right row that is less than or equal to the left
row (compared by their match keys from
+ // the match condition).
+ Map.Entry<Comparable<?>, Object[]> closestMatch =
rightRows.floorEntry(matchKey);
+ return closestMatch == null ? null : closestMatch.getValue();
+ }
+ case LESS_THAN: {
+ // Find the closest right row that is greater than the left row
(compared by their match keys from the match
+ // condition).
+ Map.Entry<Comparable<?>, Object[]> closestMatch =
rightRows.higherEntry(matchKey);
+ return closestMatch == null ? null : closestMatch.getValue();
+ }
+ case LESS_THAN_OR_EQUAL: {
+ // Find the closest right row that is greater than or equal to the
left row (compared by their match keys from
+ // the match condition).
+ Map.Entry<Comparable<?>, Object[]> closestMatch =
rightRows.ceilingEntry(matchKey);
+ return closestMatch == null ? null : closestMatch.getValue();
+ }
+ default:
+ throw new IllegalArgumentException("Unsupported match condition type:
" + _matchConditionType);
+ }
+ }
+
+ @Override
+ protected List<Object[]> buildNonMatchRightRows() {
+ // There's only ASOF JOIN and LEFT ASOF JOIN; RIGHT ASOF JOIN is not a
thing
+ throw new UnsupportedOperationException("ASOF JOIN does not support
unmatched right rows");
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ private enum MatchConditionType {
+ GREATER_THAN,
+ GREATER_THAN_OR_EQUAL,
+ LESS_THAN,
+ LESS_THAN_OR_EQUAL
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index 8fee3d0e9e..5040bae620 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -172,7 +172,49 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
return mseBlock;
}
- protected abstract void buildRightTable();
+ protected void buildRightTable() {
+ LOGGER.trace("Building right table for join operator");
+ long startTime = System.currentTimeMillis();
+ int numRows = 0;
+ MseBlock rightBlock = _rightInput.nextBlock();
+ while (rightBlock.isData()) {
+ List<Object[]> rows = ((MseBlock.Data) rightBlock).asRowHeap().getRows();
+ // Row based overflow check.
+ if (rows.size() + numRows > _maxRowsInJoin) {
+ if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+ throwForJoinRowLimitExceeded(
+ "Cannot build in memory hash table for join operator, reached
number of rows limit: " + _maxRowsInJoin);
+ } else {
+ // Just fill up the buffer.
+ int remainingRows = _maxRowsInJoin - numRows;
+ rows = rows.subList(0, remainingRows);
+ _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
+ // setting only the rightTableOperator to be early terminated and
awaits EOS block next.
+ _rightInput.earlyTerminate();
+ }
+ }
+
+ addRowsToRightTable(rows);
+ numRows += rows.size();
+ sampleAndCheckInterruption();
+ rightBlock = _rightInput.nextBlock();
+ }
+
+ MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock;
+ if (eosBlock.isError()) {
+ _eos = eosBlock;
+ } else {
+ _isRightTableBuilt = true;
+ finishBuildingRightTable();
+ }
+
+ _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS,
System.currentTimeMillis() - startTime);
+ LOGGER.trace("Finished building right table for join operator");
+ }
+
+ protected abstract void addRowsToRightTable(List<Object[]> rows);
+
+ protected abstract void finishBuildingRightTable();
protected MseBlock buildJoinedDataBlock() {
LOGGER.trace("Building joined data block for join operator");
@@ -240,7 +282,7 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
}
protected boolean needUnmatchedLeftRows() {
- return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
+ return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL ||
_joinType == JoinRelType.LEFT_ASOF;
}
protected void earlyTerminateLeftInput() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 5d4294546c..0cb1032367 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -36,7 +36,6 @@ import
org.apache.pinot.query.runtime.operator.join.LongLookupTable;
import org.apache.pinot.query.runtime.operator.join.LookupTable;
import org.apache.pinot.query.runtime.operator.join.ObjectLookupTable;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
/**
@@ -93,44 +92,15 @@ public class HashJoinOperator extends BaseJoinOperator {
}
@Override
- protected void buildRightTable() {
- LOGGER.trace("Building hash table for join operator");
- long startTime = System.currentTimeMillis();
- int numRows = 0;
- MseBlock rightBlock = _rightInput.nextBlock();
- while (rightBlock.isData()) {
- MseBlock.Data dataBlock = (MseBlock.Data) rightBlock;
- List<Object[]> rows = dataBlock.asRowHeap().getRows();
- // Row based overflow check.
- if (rows.size() + numRows > _maxRowsInJoin) {
- if (_joinOverflowMode == JoinOverFlowMode.THROW) {
- throwForJoinRowLimitExceeded(
- "Cannot build in memory hash table for join operator, reached
number of rows limit: " + _maxRowsInJoin);
- } else {
- // Just fill up the buffer.
- int remainingRows = _maxRowsInJoin - numRows;
- rows = rows.subList(0, remainingRows);
- _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
- // setting only the rightTableOperator to be early terminated and
awaits EOS block next.
- _rightInput.earlyTerminate();
- }
- }
- for (Object[] row : rows) {
- _rightTable.addRow(_rightKeySelector.getKey(row), row);
- }
- numRows += rows.size();
- sampleAndCheckInterruption();
- rightBlock = _rightInput.nextBlock();
+ protected void addRowsToRightTable(List<Object[]> rows) {
+ for (Object[] row : rows) {
+ _rightTable.addRow(_rightKeySelector.getKey(row), row);
}
- MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock;
- if (eosBlock.isError()) {
- _eos = eosBlock;
- } else {
- _rightTable.finish();
- _isRightTableBuilt = true;
- }
- _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS,
System.currentTimeMillis() - startTime);
- LOGGER.trace("Finished building hash table for join operator");
+ }
+
+ @Override
+ protected void finishBuildingRightTable() {
+ _rightTable.finish();
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
index 9ffcad83c1..34c3e99287 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
/**
@@ -57,42 +56,15 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
}
@Override
- protected void buildRightTable() {
- LOGGER.trace("Building right table for join operator");
- long startTime = System.currentTimeMillis();
- MseBlock rightBlock = _rightInput.nextBlock();
- while (rightBlock.isData()) {
- List<Object[]> rows = ((MseBlock.Data) rightBlock).asRowHeap().getRows();
- int numRowsInRightTable = _rightTable.size();
- // Row based overflow check.
- if (rows.size() + numRowsInRightTable > _maxRowsInJoin) {
- if (_joinOverflowMode == JoinOverFlowMode.THROW) {
- throwForJoinRowLimitExceeded(
- "Cannot build in memory right table for join operator, reached
number of rows limit: " + _maxRowsInJoin);
- } else {
- // Just fill up the buffer.
- int remainingRows = _maxRowsInJoin - numRowsInRightTable;
- rows = rows.subList(0, remainingRows);
- _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
- // setting only the rightTableOperator to be early terminated and
awaits EOS block next.
- _rightInput.earlyTerminate();
- }
- }
- _rightTable.addAll(rows);
- sampleAndCheckInterruption();
- rightBlock = _rightInput.nextBlock();
- }
- MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock;
- if (eosBlock.isError()) {
- _eos = eosBlock;
- } else {
- _isRightTableBuilt = true;
- if (needUnmatchedRightRows()) {
- _matchedRightRows = new BitSet(_rightTable.size());
- }
+ protected void addRowsToRightTable(List<Object[]> rows) {
+ _rightTable.addAll(rows);
+ }
+
+ @Override
+ protected void finishBuildingRightTable() {
+ if (needUnmatchedRightRows()) {
+ _matchedRightRows = new BitSet(_rightTable.size());
}
- _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS,
System.currentTimeMillis() - startTime);
- LOGGER.trace("Finished building right table for join operator");
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
index 5dd8be81d3..c08cb89359 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
@@ -37,6 +37,7 @@ import org.apache.pinot.query.planner.plannode.TableScanNode;
import org.apache.pinot.query.planner.plannode.ValueNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.operator.AggregateOperator;
+import org.apache.pinot.query.runtime.operator.AsofJoinOperator;
import org.apache.pinot.query.runtime.operator.FilterOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.IntersectAllOperator;
@@ -180,16 +181,20 @@ public class PlanNodeToOpChain {
PlanNode right = inputs.get(1);
MultiStageOperator rightOperator = visit(right, context);
JoinNode.JoinStrategy joinStrategy = node.getJoinStrategy();
- if (joinStrategy == JoinNode.JoinStrategy.HASH) {
- if (node.getLeftKeys().isEmpty()) {
- // TODO: Consider adding non-equi as a separate join strategy.
- return new NonEquiJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
- } else {
- return new HashJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
- }
- } else {
- assert joinStrategy == JoinNode.JoinStrategy.LOOKUP;
- return new LookupJoinOperator(context, leftOperator, rightOperator,
node);
+ switch (joinStrategy) {
+ case HASH:
+ if (node.getLeftKeys().isEmpty()) {
+ // TODO: Consider adding non-equi as a separate join strategy.
+ return new NonEquiJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
+ } else {
+ return new HashJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
+ }
+ case LOOKUP:
+ return new LookupJoinOperator(context, leftOperator, rightOperator,
node);
+ case ASOF:
+ return new AsofJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
+ default:
+ throw new IllegalStateException("Unsupported JoinStrategy: " +
joinStrategy);
}
}
diff --git a/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json
b/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json
new file mode 100644
index 0000000000..a2d9686e44
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json
@@ -0,0 +1,418 @@
+{
+ "as_of_join_queries": {
+ "tables": {
+ "t1": {
+ "schema": [
+ {"name": "key_col", "type": "STRING"},
+ {"name": "asof_col", "type": "INT"}
+ ],
+ "inputs": [
+ ["a", 1],
+ ["b", 2],
+ ["c", 3],
+ ["d", 4],
+ ["e", 5]
+ ]
+ },
+ "t2": {
+ "schema": [
+ {"name": "key_col", "type": "STRING"},
+ {"name": "asof_col", "type": "INT"}
+ ],
+ "inputs": [
+ ["b", 2],
+ ["a", 1],
+ ["c", 3],
+ ["a", 2],
+ ["c", 1],
+ ["b", 3],
+ ["d", 5]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["c", 3, "c", 1]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 1],
+ ["b", 2, "b", 2],
+ ["c", 3, "c", 3]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 2],
+ ["b", 2, "b", 3],
+ ["d", 4, "d", 5]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 1],
+ ["b", 2, "b", 2],
+ ["c", 3, "c", 3],
+ ["d", 4, "d", 5]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, null, null],
+ ["b", 2, null, null],
+ ["c", 3, "c", 1],
+ ["d", 4, null, null],
+ ["e", 5, null, null]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 1],
+ ["b", 2, "b", 2],
+ ["c", 3, "c", 3],
+ ["d", 4, null, null],
+ ["e", 5, null, null]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 2],
+ ["b", 2, "b", 3],
+ ["c", 3, null, null],
+ ["d", 4, "d", 5],
+ ["e", 5, null, null]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 1],
+ ["b", 2, "b", 2],
+ ["c", 3, "c", 3],
+ ["d", 4, "d", 5],
+ ["e", 5, null, null]
+ ]
+ }
+ ]
+ },
+ "as_of_join_queries_without_hash_key_join": {
+ "tables": {
+ "t1": {
+ "schema": [
+ {"name": "key_col", "type": "STRING"},
+ {"name": "asof_col", "type": "INT"}
+ ],
+ "inputs": [
+ ["a", 1],
+ ["b", 2],
+ ["c", 3],
+ ["d", 4],
+ ["e", 5]
+ ]
+ },
+ "t2": {
+ "schema": [
+ {"name": "key_col", "type": "STRING"},
+ {"name": "asof_col", "type": "INT"}
+ ],
+ "inputs": [
+ ["b", 2],
+ ["a", 1],
+ ["c", 3],
+ ["a", 4],
+ ["c", 7],
+ ["b", 6],
+ ["d", 5]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON true",
+ "outputs": [
+ ["b", 2, "a", 1],
+ ["c", 3, "b", 2],
+ ["d", 4, "c", 3],
+ ["e", 5, "a", 4]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON true",
+ "outputs": [
+ ["a", 1, null, null],
+ ["b", 2, "a", 1],
+ ["c", 3, "b", 2],
+ ["d", 4, "c", 3],
+ ["e", 5, "a", 4]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >=
{t2}.asof_col) ON true",
+ "outputs": [
+ ["a", 1, "a", 1],
+ ["b", 2, "b", 2],
+ ["c", 3, "c", 3],
+ ["d", 4, "a", 4],
+ ["e", 5, "d", 5]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <
{t2}.asof_col) ON true",
+ "outputs": [
+ ["a", 1, "b", 2],
+ ["b", 2, "c", 3],
+ ["c", 3, "a", 4],
+ ["d", 4, "d", 5],
+ ["e", 5, "b", 6]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <=
{t2}.asof_col) ON true",
+ "outputs": [
+ ["a", 1, "a", 1],
+ ["b", 2, "b", 2],
+ ["c", 3, "c", 3],
+ ["d", 4, "a", 4],
+ ["e", 5, "d", 5]
+ ]
+ }
+ ]
+ },
+ "as_of_join_queries_with_nulls": {
+ "tables": {
+ "t1": {
+ "schema": [
+ {"name": "key_col", "type": "STRING"},
+ {"name": "asof_col", "type": "INT"}
+ ],
+ "inputs": [
+ ["a", 1],
+ ["a", 5],
+ ["b", 3],
+ ["c", null],
+ ["d", 4],
+ ["e", 7],
+ ["f", 10],
+ ["g", 12]
+ ]
+ },
+ "t2": {
+ "schema": [
+ {"name": "key_col", "type": "STRING"},
+ {"name": "asof_col", "type": "INT"}
+ ],
+ "inputs": [
+ ["a", 0],
+ ["a", 2],
+ ["a", null],
+ ["b", 2],
+ ["b", null],
+ ["c", 5],
+ ["d", 4],
+ ["d", 6],
+ ["f", null],
+ ["f", 11],
+ ["g", null],
+ ["h", 9]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 0],
+ ["a", 5, "a", 2],
+ ["b", 3, "b", 2]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 0],
+ ["a", 5, "a", 2],
+ ["b", 3, "b", 2],
+ ["d", 4, "d", 4]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 2],
+ ["d", 4, "d", 6],
+ ["f", 10, "f", 11]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 2],
+ ["d", 4, "d", 4],
+ ["f", 10, "f", 11]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 0],
+ ["a", 5, "a", 2],
+ ["b", 3, "b", 2],
+ ["c", null, null, null],
+ ["d", 4, null, null],
+ ["e", 7, null, null],
+ ["f", 10, null, null],
+ ["g", 12, null, null]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 0],
+ ["a", 5, "a", 2],
+ ["b", 3, "b", 2],
+ ["c", null, null, null],
+ ["d", 4, "d", 4],
+ ["e", 7, null, null],
+ ["f", 10, null, null],
+ ["g", 12, null, null]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 2],
+ ["a", 5, null, null],
+ ["b", 3, null, null],
+ ["c", null, null, null],
+ ["d", 4, "d", 6],
+ ["e", 7, null, null],
+ ["f", 10, "f", 11],
+ ["g", 12, null, null]
+ ]
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "outputs": [
+ ["a", 1, "a", 2],
+ ["a", 5, null, null],
+ ["b", 3, null, null],
+ ["c", null, null, null],
+ ["d", 4, "d", 4],
+ ["e", 7, null, null],
+ ["f", 10, "f", 11],
+ ["g", 12, null, null]
+ ]
+ }
+ ]
+ },
+ "as_of_join_unsupported_scenarios": {
+ "tables": {
+ "t1": {
+ "schema": [
+ {"name": "key_col", "type": "STRING"},
+ {"name": "asof_col", "type": "INT"}
+ ],
+ "inputs": [
+ ["a", 1],
+ ["b", 2],
+ ["c", 3],
+ ["d", 4],
+ ["e", 5]
+ ]
+ },
+ "t2": {
+ "schema": [
+ {"name": "key_col", "type": "STRING"},
+ {"name": "asof_col", "type": "INT"}
+ ],
+ "inputs": [
+ ["b", 2],
+ ["a", 1],
+ ["c", 3],
+ ["a", 2],
+ ["c", 1],
+ ["b", 3],
+ ["d", 5]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col)",
+ "expectedException": ".*exception while parsing query.*",
+ "comment": "Calcite currently doesn't support ASOF JOINs without an ON
clause. This isn't just a parser limitation, since the assumption is also built
into the validator."
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col)",
+ "expectedException": ".*exception while parsing query.*",
+ "comment": "Calcite currently doesn't support ASOF JOINs without an ON
clause. This isn't just a parser limitation, since the assumption is also built
into the validator."
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col AND {t1}.asof_col > 0",
+ "expectedException": ".*ASOF JOIN condition must be a conjunction of
equality comparisons.*"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON {t1}.key_col != {t2}.key_col",
+ "expectedException": ".*ASOF JOIN condition must be a conjunction of
equality comparisons.*"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col OR {t1}.asof_col = {t2}.asof_col",
+ "expectedException": ".*ASOF JOIN condition must be a conjunction of
equality comparisons.*"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} ON {t1}.key_col = {t2}.key_col",
+ "expectedException": ".*exception while parsing query.*",
+ "comment": "MATCH_CONDITION is required for ASOF JOINs"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} ON {t1}.key_col = {t2}.key_col",
+ "expectedException": ".*exception while parsing query.*",
+ "comment": "MATCH_CONDITION is required for ASOF JOINs"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col !=
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison
between columns from the two inputs.*",
+ "comment": "MATCH_CONDITION only supports a single predicate comparing
two columns that is one out of: (>, >=, <, <=)"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col =
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+ "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison
between columns from the two inputs.*",
+ "comment": "MATCH_CONDITION only supports a single predicate comparing
two columns that is one out of: (>, >=, <, <=)"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col OR {t1}.key_col > {t1}.key_col) ON {t1}.key_col = {t2}.key_col",
+ "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison
between columns from the two inputs.*",
+ "comment": "MATCH_CONDITION only supports a single predicate comparing
two columns that is one out of: (>, >=, <, <=)"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.asof_col AND {t1}.key_col > {t1}.key_col) ON {t1}.key_col = {t2}.key_col",
+ "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison
between columns from the two inputs.*",
+ "comment": "MATCH_CONDITION only supports a single predicate comparing
two columns that is one out of: (>, >=, <, <=)"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 0) ON
{t1}.key_col = {t2}.key_col",
+ "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison
between columns from the two inputs.*",
+ "comment": "MATCH_CONDITION only supports a single predicate comparing
two columns that is one out of: (>, >=, <, <=)"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t1}.key_col) ON {t1}.key_col = {t2}.key_col",
+ "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison
between columns from the two inputs.*",
+ "comment": "MATCH_CONDITION only supports a single predicate comparing
two columns that is one out of: (>, >=, <, <=)"
+ },
+ {
+ "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col,
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >
{t2}.key_col) ON {t1}.key_col = {t2}.key_col",
+ "expectedException": ".*ASOF_JOIN only supports match conditions with
a comparison between two columns of the same type.*",
+ "comment": "We currently don't support MATCH_CONDITION comparing
columns of different types"
+ }
+ ]
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]