[
https://issues.apache.org/jira/browse/CALCITE-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759377#comment-16759377
]
Volodymyr Vysotskyi commented on CALCITE-2223:
----------------------------------------------
Yes, that's correct that usual cartesian join cannot be planned in Drill, but
there is relaxation for this limitation: it is allowed for the case when one of
the join inputs returns a single value (or nothing).
For the case of this query, {{SINGLE_VALUE}} aggregate function is applied to
the result of the subquery in the filter condition. Later, joins are reordered
somehow, and received resulting plan:
{noformat}
00-00 Screen : rowType = RecordType(ANY last_name, ANY n_name): rowcount =
1.0, cumulative cost = {1455.1 rows, 191524.1 cpu, 474685.0 io, 0.0 network,
440.00000000000006 memory}, id = 1165
00-01 Project(last_name=[$0], n_name=[$2]) : rowType = RecordType(ANY
last_name, ANY n_name): rowcount = 1.0, cumulative cost = {1455.0 rows,
191524.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id = 1164
00-02 Project(last_name=[$0], n_nationkey=[$2], n_name=[$3], $f0=[$1]) :
rowType = RecordType(ANY last_name, ANY n_nationkey, ANY n_name, ANY $f0):
rowcount = 1.0, cumulative cost = {1454.0 rows, 191522.0 cpu, 474685.0 io, 0.0
network, 440.00000000000006 memory}, id = 1163
00-03 SelectionVectorRemover : rowType = RecordType(ANY last_name, ANY
$f0, ANY n_nationkey, ANY n_name): rowcount = 1.0, cumulative cost = {1453.0
rows, 191518.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id =
1162
00-04 Limit(fetch=[1]) : rowType = RecordType(ANY last_name, ANY
$f0, ANY n_nationkey, ANY n_name): rowcount = 1.0, cumulative cost = {1452.0
rows, 191517.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id =
1161
00-05 Limit(fetch=[1]) : rowType = RecordType(ANY last_name, ANY
$f0, ANY n_nationkey, ANY n_name): rowcount = 1.0, cumulative cost = {1451.0
rows, 191513.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id =
1160
00-06 HashJoin(condition=[=($2, $1)], joinType=[inner],
semi-join: =[false]) : rowType = RecordType(ANY last_name, ANY $f0, ANY
n_nationkey, ANY n_name): rowcount = 463.0, cumulative cost = {1450.0 rows,
191509.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id = 1159
00-08 NestedLoopJoin(condition=[true], joinType=[inner]) :
rowType = RecordType(ANY last_name, ANY $f0): rowcount = 463.0, cumulative cost
= {937.0 rows, 185703.0 cpu, 474635.0 io, 0.0 network, 0.0 memory}, id = 1157
00-10 Scan(table=[[cp, employee.json]],
groupscan=[EasyGroupScan [selectionRoot=classpath:/employee.json, numFiles=1,
columns=[`last_name`], files=[classpath:/employee.json]]]) : rowType =
RecordType(ANY last_name): rowcount = 463.0, cumulative cost = {463.0 rows,
463.0 cpu, 474630.0 io, 0.0 network, 0.0 memory}, id = 1153
00-09 StreamAgg(group=[{}], agg#0=[SINGLE_VALUE($0)]) :
rowType = RecordType(ANY $f0): rowcount = 1.0, cumulative cost = {11.0 rows,
40.0 cpu, 5.0 io, 0.0 network, 0.0 memory}, id = 1156
00-11 Filter(condition=[=($0, 1)]) : rowType =
RecordType(ANY r_regionkey): rowcount = 1.0, cumulative cost = {10.0 rows, 28.0
cpu, 5.0 io, 0.0 network, 0.0 memory}, id = 1155
00-12 Scan(table=[[cp, tpch/region.parquet]],
groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=classpath:/tpch/region.parquet]],
selectionRoot=classpath:/tpch/region.parquet, numFiles=1, numRowGroups=1,
usedMetadataFile=false, columns=[`r_regionkey`]]]) : rowType = RecordType(ANY
r_regionkey): rowcount = 5.0, cumulative cost = {5.0 rows, 5.0 cpu, 5.0 io, 0.0
network, 0.0 memory}, id = 1154
00-07 Scan(table=[[cp, tpch/nation.parquet]],
groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=classpath:/tpch/nation.parquet]],
selectionRoot=classpath:/tpch/nation.parquet, numFiles=1, numRowGroups=1,
usedMetadataFile=false, columns=[`n_nationkey`, `n_name`]]]) : rowType =
RecordType(ANY n_nationkey, ANY n_name): rowcount = 25.0, cumulative cost =
{25.0 rows, 50.0 cpu, 50.0 io, 0.0 network, 0.0 memory}, id = 1158
{noformat}
So one of the inputs of {{NestedLoopJoin}} is {{StreamAgg(group=[{}],
agg#0=[SINGLE_VALUE($0)])}} which returns single row.
Regarding the check for single value, one of the conditions is [aggregate with
empty
groupset|https://github.com/apache/drill/blob/f6c63bf5dbc7bcd14b202249d013cd974a96a68a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java#L275].
> ProjectMergeRule is infinitely matched when is applied after
> ProjectReduceExpressionsRule
> -----------------------------------------------------------------------------------------
>
> Key: CALCITE-2223
> URL: https://issues.apache.org/jira/browse/CALCITE-2223
> Project: Calcite
> Issue Type: Bug
> Reporter: Volodymyr Vysotskyi
> Assignee: Julian Hyde
> Priority: Critical
> Attachments:
> TestLimitWithExchanges_testPushLimitPastUnionExchange.png, heap_overview.png,
> provenance_contents.png
>
>
> For queries like this:
> {code:sql}
> select t1.f from (select cast(f as int) f, f from (select cast(f as int) f
> from (values('1')) t(f))) as t1
> {code}
> OOM is thrown when {{ProjectMergeRule}} is applied before applying
> {{ProjectReduceExpressionsRule}} in VolcanoPlanner.
> A simple test to reproduce this issue (in {{RelOptRulesTest}}):
> {code:java}
> @Test public void testOomProjectMergeRule() {
> RelBuilder relBuilder =
> RelBuilder.create(RelBuilderTest.config().build());
> RelNode relNode = relBuilder
> .values(new String[]{"f"}, "1")
> .project(
> relBuilder.alias(
> relBuilder.cast(relBuilder.field(0), SqlTypeName.INTEGER),
> "f"))
> .project(
> relBuilder.alias(
> relBuilder.cast(relBuilder.field(0), SqlTypeName.INTEGER),
> "f0"),
> relBuilder.alias(relBuilder.field(0), "f"))
> .project(
> relBuilder.alias(relBuilder.field(0), "f"))
> .build();
> RelOptPlanner planner = relNode.getCluster().getPlanner();
> RuleSet ruleSet =
> RuleSets.ofList(
> ReduceExpressionsRule.PROJECT_INSTANCE,
> new ProjectMergeRuleWithLongerName(),
> EnumerableRules.ENUMERABLE_PROJECT_RULE,
> EnumerableRules.ENUMERABLE_VALUES_RULE);
> Program program = Programs.of(ruleSet);
> RelTraitSet toTraits =
> relNode.getCluster().traitSet()
> .replace(0, EnumerableConvention.INSTANCE);
> RelNode output = program.run(planner, relNode, toTraits,
> ImmutableList.<RelOptMaterialization>of(),
> ImmutableList.<RelOptLattice>of());
> // check for output
> }
> /**
> * ProjectMergeRule inheritor which has
> * class name greater than ProjectReduceExpressionsRule class name
> (String.compareTo()).
> *
> * It is needed for RuleQueue.popMatch() method
> * to apply this rule before ProjectReduceExpressionsRule.
> */
> private static class ProjectMergeRuleWithLongerName extends
> ProjectMergeRule {
> public ProjectMergeRuleWithLongerName() {
> super(true, RelFactories.LOGICAL_BUILDER);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)