[ https://issues.apache.org/jira/browse/HIVE-7205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14084215#comment-14084215 ]
Yin Huai commented on HIVE-7205: -------------------------------- Oh, I see. In the current patch, "isLastInput" takes special care for MuxOperator, so we will not generate wrong results. However, with this version, if my understanding is correct, we have to buffer rows from all tables in the reduce side join operator for cases like the last query in correlationoptimizer15.q (the right most table will not be streamable and we will have a higher memory footprint). I am not sure we want this behavior. I think one thing we may want to investigate is what will be the minimal change that can just fix the bug. I totally agree to improve the logic of startGroup()/endGroup()/flush(). I guess we need to have a clear plan first. [~ashutoshc] [~navis] I may not be able to come up with a patch soon. When will be our next release? > Wrong results when union all of grouping followed by group by with > correlation optimization > ------------------------------------------------------------------------------------------- > > Key: HIVE-7205 > URL: https://issues.apache.org/jira/browse/HIVE-7205 > Project: Hive > Issue Type: Bug > Affects Versions: 0.12.0, 0.13.0, 0.13.1 > Reporter: dima machlin > Assignee: Navis > Priority: Critical > Attachments: HIVE-7205.1.patch.txt, HIVE-7205.2.patch.txt, > HIVE-7205.3.patch.txt > > > use case : > table TBL (a string,b string) contains single row : 'a','a' > the following query : > {code:sql} > select b, sum(cc) from ( > select b,count(1) as cc from TBL group by b > union all > select a as b,count(1) as cc from TBL group by a > ) z > group by b > {code} > returns > a 1 > a 1 > while set hive.optimize.correlation=true; > if we change set hive.optimize.correlation=false; > it returns correct results : a 2 > The plan with correlation optimization : > {code:sql} > ABSTRACT SYNTAX TREE: > (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_UNION (TOK_QUERY (TOK_FROM > (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR > TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR > (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) (TOK_QUERY > (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION > (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL a) b) > (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL > a))))) z)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT > (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION sum > (TOK_TABLE_OR_COL cc)))) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) > STAGE DEPENDENCIES: > Stage-1 is a root stage > Stage-0 is a root stage > STAGE PLANS: > Stage: Stage-1 > Map Reduce > Alias -> Map Operator Tree: > null-subquery1:z-subquery1:TBL > TableScan > alias: TBL > Select Operator > expressions: > expr: b > type: string > outputColumnNames: b > Group By Operator > aggregations: > expr: count(1) > bucketGroup: false > keys: > expr: b > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: 0 > value expressions: > expr: _col1 > type: bigint > null-subquery2:z-subquery2:TBL > TableScan > alias: TBL > Select Operator > expressions: > expr: a > type: string > outputColumnNames: a > Group By Operator > aggregations: > expr: count(1) > bucketGroup: false > keys: > expr: a > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: 1 > value expressions: > expr: _col1 > type: bigint > Reduce Operator Tree: > Demux Operator > Group By Operator > aggregations: > expr: count(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Union > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Mux Operator > Group By Operator > aggregations: > expr: sum(_col1) > bucketGroup: false > keys: > expr: _col0 > type: string > mode: complete > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: > org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > Group By Operator > aggregations: > expr: count(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Union > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Mux Operator > Group By Operator > aggregations: > expr: sum(_col1) > bucketGroup: false > keys: > expr: _col0 > type: string > mode: complete > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: > org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > Stage: Stage-0 > Fetch Operator > limit: -1 > {code} > Plan without correlation optimization : > {code:sql} > ABSTRACT SYNTAX TREE: > (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_UNION (TOK_QUERY (TOK_FROM > (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR > TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR > (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) (TOK_QUERY > (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION > (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL a) b) > (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL > a))))) z)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT > (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION sum > (TOK_TABLE_OR_COL cc)))) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) > STAGE DEPENDENCIES: > Stage-1 is a root stage > Stage-2 depends on stages: Stage-1, Stage-3 > Stage-3 is a root stage > Stage-0 is a root stage > STAGE PLANS: > Stage: Stage-1 > Map Reduce > Alias -> Map Operator Tree: > null-subquery2:z-subquery2:TBL > TableScan > alias: TBL > Select Operator > expressions: > expr: a > type: string > outputColumnNames: a > Group By Operator > aggregations: > expr: count(1) > bucketGroup: false > keys: > expr: a > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: -1 > value expressions: > expr: _col1 > type: bigint > Reduce Operator Tree: > Group By Operator > aggregations: > expr: count(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: > org.apache.hadoop.mapred.SequenceFileInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > Stage: Stage-2 > Map Reduce > Alias -> Map Operator Tree: > > maprfs:/user/hadoop/tmp/hive/hive_2014-06-10_15-42-37_188_2403850033480056671-5/-mr-10002 > > TableScan > Union > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Group By Operator > aggregations: > expr: sum(_col1) > bucketGroup: false > keys: > expr: _col0 > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: -1 > value expressions: > expr: _col1 > type: bigint > > maprfs:/user/hadoop/tmp/hive/hive_2014-06-10_15-42-37_188_2403850033480056671-5/-mr-10003 > > TableScan > Union > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > Group By Operator > aggregations: > expr: sum(_col1) > bucketGroup: false > keys: > expr: _col0 > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: -1 > value expressions: > expr: _col1 > type: bigint > Reduce Operator Tree: > Group By Operator > aggregations: > expr: sum(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > Stage: Stage-3 > Map Reduce > Alias -> Map Operator Tree: > null-subquery1:z-subquery1:TBL > TableScan > alias: TBL > Select Operator > expressions: > expr: b > type: string > outputColumnNames: b > Group By Operator > aggregations: > expr: count(1) > bucketGroup: false > keys: > expr: b > type: string > mode: hash > outputColumnNames: _col0, _col1 > Reduce Output Operator > key expressions: > expr: _col0 > type: string > sort order: + > Map-reduce partition columns: > expr: _col0 > type: string > tag: -1 > value expressions: > expr: _col1 > type: bigint > Reduce Operator Tree: > Group By Operator > aggregations: > expr: count(VALUE._col0) > bucketGroup: false > keys: > expr: KEY._col0 > type: string > mode: mergepartial > outputColumnNames: _col0, _col1 > Select Operator > expressions: > expr: _col0 > type: string > expr: _col1 > type: bigint > outputColumnNames: _col0, _col1 > File Output Operator > compressed: false > GlobalTableId: 0 > table: > input format: > org.apache.hadoop.mapred.SequenceFileInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > Stage: Stage-0 > Fetch Operator > limit: -1 > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)