[ https://issues.apache.org/jira/browse/HIVE-24775?focusedWorklogId=555234&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555234 ]
ASF GitHub Bot logged work on HIVE-24775: ----------------------------------------- Author: ASF GitHub Bot Created on: 20/Feb/21 16:38 Start Date: 20/Feb/21 16:38 Worklog Time Spent: 10m Work Description: kasakrisz commented on a change in pull request #1995: URL: https://github.com/apache/hive/pull/1995#discussion_r579678432 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAggregateIncrementalRewritingRule.java ########## @@ -99,31 +100,61 @@ public void onMatch(RelOptRuleCall call) { final RexBuilder rexBuilder = agg.getCluster().getRexBuilder(); // 1) First branch is query, second branch is MV - final RelNode joinLeftInput = union.getInput(1); + RelNode joinLeftInput = union.getInput(1); final RelNode joinRightInput = union.getInput(0); - // 2) Build conditions for join and filter and start adding + + // 2) Introduce a Project on top of MV scan having all columns from the view plus a boolean literal which indicates + // whether the row with the key values coming from the joinRightInput exists in the view: + // - true means exist + // - null means not exists + // Project also needed to encapsulate the view scan by a subquery -> this is required by + // CalcitePlanner.fixUpASTAggregateIncrementalRebuild + List<RexNode> mvCols = new ArrayList<>(joinLeftInput.getRowType().getFieldCount()); + for (int i = 0; i < joinLeftInput.getRowType().getFieldCount(); ++i) { + mvCols.add(rexBuilder.makeInputRef( + joinLeftInput.getRowType().getFieldList().get(i).getType(), i)); + } + mvCols.add(rexBuilder.makeLiteral(true)); + + joinLeftInput = call.builder() + .push(joinLeftInput) + .project(mvCols) + .build(); + + // 3) Build conditions for join and start adding // expressions for project operator List<RexNode> projExprs = new ArrayList<>(); List<RexNode> joinConjs = new ArrayList<>(); - List<RexNode> filterConjs = new ArrayList<>(); int groupCount = agg.getGroupCount(); int totalCount = agg.getGroupCount() + agg.getAggCallList().size(); - for (int leftPos = 0, rightPos = totalCount; + for (int leftPos = 0, rightPos = totalCount + 1; leftPos < groupCount; leftPos++, rightPos++) { RexNode leftRef = rexBuilder.makeInputRef( joinLeftInput.getRowType().getFieldList().get(leftPos).getType(), leftPos); RexNode rightRef = rexBuilder.makeInputRef( joinRightInput.getRowType().getFieldList().get(leftPos).getType(), rightPos); projExprs.add(rightRef); - joinConjs.add(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, - ImmutableList.of(leftRef, rightRef))); - filterConjs.add(rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, - ImmutableList.of(leftRef))); + + RexNode nsEqExpr = rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_DISTINCT_FROM, + ImmutableList.of(leftRef, rightRef)); + joinConjs.add(nsEqExpr); } - // 3) Add the expressions that correspond to the aggregation + + // 4) Create join node + RexNode joinCond = RexUtil.composeConjunction(rexBuilder, joinConjs); + RelNode join = call.builder() + .push(joinLeftInput) + .push(joinRightInput) + .join(JoinRelType.RIGHT, joinCond) + .build(); + + int flagIndex = joinLeftInput.getRowType().getFieldCount() - 1; + RexNode flagNode = rexBuilder.makeInputRef( + join.getRowType().getFieldList().get(flagIndex).getType(), flagIndex); + + // 5) Add the expressions that correspond to the aggregation // functions - RexNode caseFilterCond = RexUtil.composeConjunction(rexBuilder, filterConjs); - for (int i = 0, leftPos = groupCount, rightPos = totalCount + groupCount; + for (int i = 0, leftPos = groupCount, rightPos = totalCount + 1 + groupCount; leftPos < totalCount; i++, leftPos++, rightPos++) { // case when mv2.deptno IS NULL AND mv2.deptname IS NULL then s else source.s + mv2.s end Review comment: Unfortunately relying on the flag only is not enough. Aggregate functions eliminates null values: ``` insert into t1(a,b) values (1, NULL), (1, 10); select a, sum(b) from t1 group by a ``` results ``` 1, 10 ``` However ``` select 10 + NULL ``` is `NULL`. The comment is not valid any more. I will update it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 555234) Time Spent: 1h 20m (was: 1h 10m) > Incorrect null handling when rebuilding Materialized view incrementally > ----------------------------------------------------------------------- > > Key: HIVE-24775 > URL: https://issues.apache.org/jira/browse/HIVE-24775 > Project: Hive > Issue Type: Bug > Reporter: Krisztian Kasa > Assignee: Krisztian Kasa > Priority: Major > Labels: pull-request-available > Time Spent: 1h 20m > Remaining Estimate: 0h > > {code} > CREATE TABLE t1 (a int, b varchar(256), c decimal(10,2), d int) STORED AS orc > TBLPROPERTIES ('transactional'='true'); > INSERT INTO t1 VALUES > (NULL, 'null_value', 100.77, 7), > (1, 'calvin', 978.76, 3), > (1, 'charlie', 9.8, 1); > CREATE MATERIALIZED VIEW mat1 TBLPROPERTIES ('transactional'='true') AS > SELECT a, b, sum(d) > FROM t1 > WHERE c > 10.0 > GROUP BY a, b; > INSERT INTO t1 VALUES > (NULL, 'null_value', 100.88, 8), > (1, 'charlie', 15.8, 1); > ALTER MATERIALIZED VIEW mat1 REBUILD; > SELECT * FROM mat1 > ORDER BY a, b; > {code} > View contains: > {code} > 1 calvin 3 > 1 charlie 1 > NULL null_value 8 > NULL null_value 7 > {code} > but it should contain: > {code} > 1 calvin 3 > 1 charlie 1 > NULL null_value 15 > {code} > Rows with aggregate key columns having NULL values are not aggregated because > incremental materialized view rebuild plan is altered by > [applyPreJoinOrderingTransforms|https://github.com/apache/hive/blob/76732ad27e139fbdef25b820a07cf35934771083/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java#L1975]: > IS NOT NULL filter added for each of these columns on top of the view scan > when joining with the branch pulls the rows inserted after the last rebuild: > {code} > HiveProject($f0=[$3], $f1=[$4], $f2=[CASE(AND(IS NULL($0), IS NULL($1)), $5, > +($5, $2))]) > HiveFilter(condition=[OR(AND(IS NULL($0), IS NULL($1)), AND(=($0, $3), > =($1, $4)))]) > HiveJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[right], > algorithm=[none], cost=[not available]) > HiveProject(a=[$0], b=[$1], _c2=[$2]) > HiveFilter(condition=[AND(IS NOT NULL($0), IS NOT NULL($1))]) > HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1]) > HiveProject(a=[$0], b=[$1], $f2=[$2]) > HiveAggregate(group=[{0, 1}], agg#0=[sum($3)]) > HiveFilter(condition=[AND(<(1, $6.writeid), >($2, 10))]) > HiveTableScan(table=[[default, t1]], table:alias=[t1]) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)