[ 
https://issues.apache.org/jira/browse/CALCITE-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759377#comment-16759377
 ] 

Volodymyr Vysotskyi commented on CALCITE-2223:
----------------------------------------------

Yes, that's correct that usual cartesian join cannot be planned in Drill, but 
there is relaxation for this limitation: it is allowed for the case when one of 
the join inputs returns a single value (or nothing). 
For the case of this query, {{SINGLE_VALUE}} aggregate function is applied to 
the result of the subquery in the filter condition. Later, joins are reordered 
somehow, and received resulting plan:
{noformat}
00-00    Screen : rowType = RecordType(ANY last_name, ANY n_name): rowcount = 
1.0, cumulative cost = {1455.1 rows, 191524.1 cpu, 474685.0 io, 0.0 network, 
440.00000000000006 memory}, id = 1165
00-01      Project(last_name=[$0], n_name=[$2]) : rowType = RecordType(ANY 
last_name, ANY n_name): rowcount = 1.0, cumulative cost = {1455.0 rows, 
191524.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id = 1164
00-02        Project(last_name=[$0], n_nationkey=[$2], n_name=[$3], $f0=[$1]) : 
rowType = RecordType(ANY last_name, ANY n_nationkey, ANY n_name, ANY $f0): 
rowcount = 1.0, cumulative cost = {1454.0 rows, 191522.0 cpu, 474685.0 io, 0.0 
network, 440.00000000000006 memory}, id = 1163
00-03          SelectionVectorRemover : rowType = RecordType(ANY last_name, ANY 
$f0, ANY n_nationkey, ANY n_name): rowcount = 1.0, cumulative cost = {1453.0 
rows, 191518.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id = 
1162
00-04            Limit(fetch=[1]) : rowType = RecordType(ANY last_name, ANY 
$f0, ANY n_nationkey, ANY n_name): rowcount = 1.0, cumulative cost = {1452.0 
rows, 191517.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id = 
1161
00-05              Limit(fetch=[1]) : rowType = RecordType(ANY last_name, ANY 
$f0, ANY n_nationkey, ANY n_name): rowcount = 1.0, cumulative cost = {1451.0 
rows, 191513.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id = 
1160
00-06                HashJoin(condition=[=($2, $1)], joinType=[inner], 
semi-join: =[false]) : rowType = RecordType(ANY last_name, ANY $f0, ANY 
n_nationkey, ANY n_name): rowcount = 463.0, cumulative cost = {1450.0 rows, 
191509.0 cpu, 474685.0 io, 0.0 network, 440.00000000000006 memory}, id = 1159
00-08                  NestedLoopJoin(condition=[true], joinType=[inner]) : 
rowType = RecordType(ANY last_name, ANY $f0): rowcount = 463.0, cumulative cost 
= {937.0 rows, 185703.0 cpu, 474635.0 io, 0.0 network, 0.0 memory}, id = 1157
00-10                    Scan(table=[[cp, employee.json]], 
groupscan=[EasyGroupScan [selectionRoot=classpath:/employee.json, numFiles=1, 
columns=[`last_name`], files=[classpath:/employee.json]]]) : rowType = 
RecordType(ANY last_name): rowcount = 463.0, cumulative cost = {463.0 rows, 
463.0 cpu, 474630.0 io, 0.0 network, 0.0 memory}, id = 1153
00-09                    StreamAgg(group=[{}], agg#0=[SINGLE_VALUE($0)]) : 
rowType = RecordType(ANY $f0): rowcount = 1.0, cumulative cost = {11.0 rows, 
40.0 cpu, 5.0 io, 0.0 network, 0.0 memory}, id = 1156
00-11                      Filter(condition=[=($0, 1)]) : rowType = 
RecordType(ANY r_regionkey): rowcount = 1.0, cumulative cost = {10.0 rows, 28.0 
cpu, 5.0 io, 0.0 network, 0.0 memory}, id = 1155
00-12                        Scan(table=[[cp, tpch/region.parquet]], 
groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath 
[path=classpath:/tpch/region.parquet]], 
selectionRoot=classpath:/tpch/region.parquet, numFiles=1, numRowGroups=1, 
usedMetadataFile=false, columns=[`r_regionkey`]]]) : rowType = RecordType(ANY 
r_regionkey): rowcount = 5.0, cumulative cost = {5.0 rows, 5.0 cpu, 5.0 io, 0.0 
network, 0.0 memory}, id = 1154
00-07                  Scan(table=[[cp, tpch/nation.parquet]], 
groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath 
[path=classpath:/tpch/nation.parquet]], 
selectionRoot=classpath:/tpch/nation.parquet, numFiles=1, numRowGroups=1, 
usedMetadataFile=false, columns=[`n_nationkey`, `n_name`]]]) : rowType = 
RecordType(ANY n_nationkey, ANY n_name): rowcount = 25.0, cumulative cost = 
{25.0 rows, 50.0 cpu, 50.0 io, 0.0 network, 0.0 memory}, id = 1158
{noformat}
So one of the inputs of {{NestedLoopJoin}} is {{StreamAgg(group=[{}], 
agg#0=[SINGLE_VALUE($0)])}} which returns single row.

Regarding the check for single value, one of the conditions is [aggregate with 
empty 
groupset|https://github.com/apache/drill/blob/f6c63bf5dbc7bcd14b202249d013cd974a96a68a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java#L275].

> ProjectMergeRule is infinitely matched when is applied after 
> ProjectReduceExpressionsRule
> -----------------------------------------------------------------------------------------
>
>                 Key: CALCITE-2223
>                 URL: https://issues.apache.org/jira/browse/CALCITE-2223
>             Project: Calcite
>          Issue Type: Bug
>            Reporter: Volodymyr Vysotskyi
>            Assignee: Julian Hyde
>            Priority: Critical
>         Attachments: 
> TestLimitWithExchanges_testPushLimitPastUnionExchange.png, heap_overview.png, 
> provenance_contents.png
>
>
> For queries like this:
> {code:sql}
> select t1.f from (select cast(f as int) f, f from (select cast(f as int) f 
> from (values('1')) t(f))) as t1
> {code}
> OOM is thrown when {{ProjectMergeRule}} is applied before applying 
> {{ProjectReduceExpressionsRule}} in VolcanoPlanner.
>  A simple test to reproduce this issue (in {{RelOptRulesTest}}):
> {code:java}
>   @Test public void testOomProjectMergeRule() {
>     RelBuilder relBuilder = 
> RelBuilder.create(RelBuilderTest.config().build());
>     RelNode relNode = relBuilder
>         .values(new String[]{"f"}, "1")
>         .project(
>             relBuilder.alias(
>                 relBuilder.cast(relBuilder.field(0), SqlTypeName.INTEGER),
>                 "f"))
>         .project(
>             relBuilder.alias(
>                 relBuilder.cast(relBuilder.field(0), SqlTypeName.INTEGER),
>                 "f0"),
>             relBuilder.alias(relBuilder.field(0), "f"))
>         .project(
>             relBuilder.alias(relBuilder.field(0), "f"))
>         .build();
>     RelOptPlanner planner = relNode.getCluster().getPlanner();
>     RuleSet ruleSet =
>         RuleSets.ofList(
>             ReduceExpressionsRule.PROJECT_INSTANCE,
>             new ProjectMergeRuleWithLongerName(),
>             EnumerableRules.ENUMERABLE_PROJECT_RULE,
>             EnumerableRules.ENUMERABLE_VALUES_RULE);
>     Program program = Programs.of(ruleSet);
>     RelTraitSet toTraits =
>         relNode.getCluster().traitSet()
>             .replace(0, EnumerableConvention.INSTANCE);
>     RelNode output = program.run(planner, relNode, toTraits,
>         ImmutableList.<RelOptMaterialization>of(), 
> ImmutableList.<RelOptLattice>of());
>     // check for output
>   }
>   /**
>    * ProjectMergeRule inheritor which has
>    * class name greater than ProjectReduceExpressionsRule class name 
> (String.compareTo()).
>    *
>    * It is needed for RuleQueue.popMatch() method
>    * to apply this rule before ProjectReduceExpressionsRule.
>    */
>   private static class ProjectMergeRuleWithLongerName extends 
> ProjectMergeRule {
>     public ProjectMergeRuleWithLongerName() {
>       super(true, RelFactories.LOGICAL_BUILDER);
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to