Hello All,
I encounter a bug when executing TPCH queries with skewed join optimization enabled: In particular, if the skewed join optimization is enabled but not triggered (i.e., the number of rows with the same key is less than "hive.skewjoin.key") all the following jobs of the query are filtered out mistakenly at runtime (for instance only stage 6 and 22 are executed from the plan attached). The corresponding query using only common joins executes correctly. Similar behaviour is observed for multiple TPCH queries. If anyone can comment on this issue or give me any pointers on what could go wrong I would really appreciate it. I can also provide the queries and guidance in reproducing the error if anyone from the development team is interested. Thanks a lot! Adrian
ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME customer10) c) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME orders10) o) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME lineitem10) l) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME supplier10) s) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME nation10) n) (TOK_TABREF (TOK_TABNAME region10) r) (and (= (. (TOK_TABLE_OR_COL n) n_regionkey) (. (TOK_TABLE_OR_COL r) r_regionkey)) (= (. (TOK_TABLE_OR_COL r) r_name) 'ASIA')))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR (TOK_TABLE_OR_COL n_nationkey))))) n1) (= (. (TOK_TABLE_OR_COL s) s_nationkey) (. (TOK_TABLE_OR_COL n1) n_nationkey)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR (TOK_TABLE_OR_COL s_suppkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL s_nationkey))))) s1) (= (. (TOK_TABLE_OR_COL l) l_suppkey) (. (TOK_TABLE_OR_COL s1) s_suppkey)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_extendedprice)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_discount)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_orderkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL s_nationkey))))) l1) (and (and (= (. (TOK_TABLE_OR_COL l1) l_orderkey) (. (TOK_TABLE_OR_COL o) o_orderkey)) (>= (. (TOK_TABLE_OR_COL o) o_orderdate) '1994-01-01')) (< (. (TOK_TABLE_OR_COL o) o_orderdate) '1995-01-01')))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_extendedprice)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_discount)) (TOK_SELEXPR (TOK_TABLE_OR_COL s_nationkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL o_custkey))))) o1) (and (= (. (TOK_TABLE_OR_COL c) c_nationkey) (. (TOK_TABLE_OR_COL o1) s_nationkey)) (= (. (TOK_TABLE_OR_COL c) c_custkey) (. (TOK_TABLE_OR_COL o1) o_custkey))))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME q5_local_supplier_volume))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR (TOK_FUNCTION sum (* (TOK_TABLE_OR_COL l_extendedprice) (- 1 (TOK_TABLE_OR_COL l_discount)))) revenue)) (TOK_GROUPBY (TOK_TABLE_OR_COL n_name)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEDESC (TOK_TABLE_OR_COL revenue))))) STAGE DEPENDENCIES: Stage-6 is a root stage Stage-22 depends on stages: Stage-6 , consists of Stage-27 Stage-27 Stage-21 depends on stages: Stage-27 Stage-7 depends on stages: Stage-6, Stage-21 Stage-20 depends on stages: Stage-7 , consists of Stage-26 Stage-26 Stage-19 depends on stages: Stage-26 Stage-8 depends on stages: Stage-7, Stage-19 Stage-18 depends on stages: Stage-8 , consists of Stage-25 Stage-25 Stage-17 depends on stages: Stage-25 Stage-1 depends on stages: Stage-8, Stage-17 Stage-16 depends on stages: Stage-1 , consists of Stage-24 Stage-24 Stage-15 depends on stages: Stage-24 Stage-2 depends on stages: Stage-1, Stage-15 Stage-14 depends on stages: Stage-2 , consists of Stage-23 Stage-23 Stage-13 depends on stages: Stage-23 Stage-3 depends on stages: Stage-2, Stage-13 Stage-4 depends on stages: Stage-3 Stage-0 depends on stages: Stage-4 Stage-5 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-6 Map Reduce Alias -> Map Operator Tree: o1:l1:s1:n1:n TableScan alias: n Reduce Output Operator key expressions: expr: n_regionkey type: int sort order: + Map-reduce partition columns: expr: n_regionkey type: int tag: 0 value expressions: expr: n_nationkey type: int expr: n_name type: string o1:l1:s1:n1:r TableScan alias: r Filter Operator predicate: expr: (r_name = 'ASIA') type: boolean Reduce Output Operator key expressions: expr: r_regionkey type: int sort order: + Map-reduce partition columns: expr: r_regionkey type: int tag: 1 Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col0} {VALUE._col1} 1 handleSkewJoin: true outputColumnNames: _col0, _col1 Select Operator expressions: expr: _col1 type: string expr: _col0 type: int outputColumnNames: _col0, _col1 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Stage: Stage-22 Conditional Operator Stage: Stage-27 Map Reduce Local Work Alias -> Map Local Tables: 1 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: 1 HashTable Sink Operator condition expressions: 0 {0_VALUE_0} {0_VALUE_1} 1 handleSkewJoin: false keys: 0 [Column[joinkey0]] 1 [Column[joinkey0]] Position of Big Table: 0 Stage: Stage-21 Map Reduce Alias -> Map Operator Tree: 0 Map Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {0_VALUE_0} {0_VALUE_1} 1 handleSkewJoin: false keys: 0 [Column[joinkey0]] 1 [Column[joinkey0]] outputColumnNames: _col0, _col1 Position of Big Table: 0 Select Operator expressions: expr: _col1 type: string expr: _col0 type: int outputColumnNames: _col0, _col1 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Local Work: Map Reduce Local Work Stage: Stage-7 Map Reduce Alias -> Map Operator Tree: $INTNAME Reduce Output Operator key expressions: expr: _col1 type: int sort order: + Map-reduce partition columns: expr: _col1 type: int tag: 1 value expressions: expr: _col0 type: string o1:l1:s1:s TableScan alias: s Reduce Output Operator key expressions: expr: s_nationkey type: int sort order: + Map-reduce partition columns: expr: s_nationkey type: int tag: 0 value expressions: expr: s_suppkey type: int expr: s_nationkey type: int Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col0} {VALUE._col3} 1 {VALUE._col0} handleSkewJoin: true outputColumnNames: _col0, _col3, _col9 Select Operator expressions: expr: _col9 type: string expr: _col0 type: int expr: _col3 type: int outputColumnNames: _col0, _col1, _col2 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Stage: Stage-20 Conditional Operator Stage: Stage-26 Map Reduce Local Work Alias -> Map Local Tables: 1 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: 1 HashTable Sink Operator condition expressions: 0 {0_VALUE_0} {0_VALUE_1} 1 {1_VALUE_0} handleSkewJoin: false keys: 0 [Column[joinkey0]] 1 [Column[joinkey0]] Position of Big Table: 0 Stage: Stage-19 Map Reduce Alias -> Map Operator Tree: 0 Map Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {0_VALUE_0} {0_VALUE_1} 1 {1_VALUE_0} handleSkewJoin: false keys: 0 [Column[joinkey0]] 1 [Column[joinkey0]] outputColumnNames: _col0, _col3, _col9 Position of Big Table: 0 Select Operator expressions: expr: _col9 type: string expr: _col0 type: int expr: _col3 type: int outputColumnNames: _col0, _col1, _col2 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Local Work: Map Reduce Local Work Stage: Stage-8 Map Reduce Alias -> Map Operator Tree: $INTNAME Reduce Output Operator key expressions: expr: _col1 type: int sort order: + Map-reduce partition columns: expr: _col1 type: int tag: 1 value expressions: expr: _col0 type: string expr: _col2 type: int o1:l1:l TableScan alias: l Reduce Output Operator key expressions: expr: l_suppkey type: int sort order: + Map-reduce partition columns: expr: l_suppkey type: int tag: 0 value expressions: expr: l_orderkey type: int expr: l_extendedprice type: double expr: l_discount type: double Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col0} {VALUE._col5} {VALUE._col6} 1 {VALUE._col0} {VALUE._col2} handleSkewJoin: true outputColumnNames: _col0, _col5, _col6, _col18, _col20 Select Operator expressions: expr: _col18 type: string expr: _col5 type: double expr: _col6 type: double expr: _col0 type: int expr: _col20 type: int outputColumnNames: _col0, _col1, _col2, _col3, _col4 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Stage: Stage-18 Conditional Operator Stage: Stage-25 Map Reduce Local Work Alias -> Map Local Tables: 1 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: 1 HashTable Sink Operator condition expressions: 0 {0_VALUE_0} {0_VALUE_1} {0_VALUE_2} 1 {1_VALUE_0} {1_VALUE_1} handleSkewJoin: false keys: 0 [Column[joinkey0]] 1 [Column[joinkey0]] Position of Big Table: 0 Stage: Stage-17 Map Reduce Alias -> Map Operator Tree: 0 Map Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {0_VALUE_0} {0_VALUE_1} {0_VALUE_2} 1 {1_VALUE_0} {1_VALUE_1} handleSkewJoin: false keys: 0 [Column[joinkey0]] 1 [Column[joinkey0]] outputColumnNames: _col0, _col5, _col6, _col18, _col20 Position of Big Table: 0 Select Operator expressions: expr: _col18 type: string expr: _col5 type: double expr: _col6 type: double expr: _col0 type: int expr: _col20 type: int outputColumnNames: _col0, _col1, _col2, _col3, _col4 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Local Work: Map Reduce Local Work Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: $INTNAME Reduce Output Operator key expressions: expr: _col3 type: int sort order: + Map-reduce partition columns: expr: _col3 type: int tag: 1 value expressions: expr: _col0 type: string expr: _col1 type: double expr: _col2 type: double expr: _col4 type: int o1:o TableScan alias: o Filter Operator predicate: expr: ((o_orderdate >= '1994-01-01') and (o_orderdate < '1995-01-01')) type: boolean Reduce Output Operator key expressions: expr: o_orderkey type: int sort order: + Map-reduce partition columns: expr: o_orderkey type: int tag: 0 value expressions: expr: o_custkey type: int Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col1} 1 {VALUE._col0} {VALUE._col1} {VALUE._col2} {VALUE._col4} handleSkewJoin: true outputColumnNames: _col1, _col11, _col12, _col13, _col15 Select Operator expressions: expr: _col11 type: string expr: _col12 type: double expr: _col13 type: double expr: _col15 type: int expr: _col1 type: int outputColumnNames: _col0, _col1, _col2, _col3, _col4 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Stage: Stage-16 Conditional Operator Stage: Stage-24 Map Reduce Local Work Alias -> Map Local Tables: 1 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: 1 HashTable Sink Operator condition expressions: 0 {0_VALUE_0} 1 {1_VALUE_0} {1_VALUE_1} {1_VALUE_2} {1_VALUE_3} handleSkewJoin: false keys: 0 [Column[joinkey0]] 1 [Column[joinkey0]] Position of Big Table: 0 Stage: Stage-15 Map Reduce Alias -> Map Operator Tree: 0 Map Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {0_VALUE_0} 1 {1_VALUE_0} {1_VALUE_1} {1_VALUE_2} {1_VALUE_3} handleSkewJoin: false keys: 0 [Column[joinkey0]] 1 [Column[joinkey0]] outputColumnNames: _col1, _col11, _col12, _col13, _col15 Position of Big Table: 0 Select Operator expressions: expr: _col11 type: string expr: _col12 type: double expr: _col13 type: double expr: _col15 type: int expr: _col1 type: int outputColumnNames: _col0, _col1, _col2, _col3, _col4 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Local Work: Map Reduce Local Work Stage: Stage-2 Map Reduce Alias -> Map Operator Tree: $INTNAME Reduce Output Operator key expressions: expr: _col3 type: int expr: _col4 type: int sort order: ++ Map-reduce partition columns: expr: _col3 type: int expr: _col4 type: int tag: 1 value expressions: expr: _col0 type: string expr: _col1 type: double expr: _col2 type: double c TableScan alias: c Reduce Output Operator key expressions: expr: c_nationkey type: int expr: c_custkey type: int sort order: ++ Map-reduce partition columns: expr: c_nationkey type: int expr: c_custkey type: int tag: 0 Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 1 {VALUE._col0} {VALUE._col1} {VALUE._col2} handleSkewJoin: true outputColumnNames: _col10, _col11, _col12 Select Operator expressions: expr: _col10 type: string expr: _col11 type: double expr: _col12 type: double outputColumnNames: _col10, _col11, _col12 Group By Operator aggregations: expr: sum((_col11 * (1 - _col12))) bucketGroup: false keys: expr: _col10 type: string mode: hash outputColumnNames: _col0, _col1 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Stage: Stage-14 Conditional Operator Stage: Stage-23 Map Reduce Local Work Alias -> Map Local Tables: 1 Fetch Operator limit: -1 Alias -> Map Local Operator Tree: 1 HashTable Sink Operator condition expressions: 0 1 {1_VALUE_0} {1_VALUE_1} {1_VALUE_2} handleSkewJoin: false keys: 0 [Column[joinkey0], Column[joinkey1]] 1 [Column[joinkey0], Column[joinkey1]] Position of Big Table: 0 Stage: Stage-13 Map Reduce Alias -> Map Operator Tree: 0 Map Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 1 {1_VALUE_0} {1_VALUE_1} {1_VALUE_2} handleSkewJoin: false keys: 0 [Column[joinkey0], Column[joinkey1]] 1 [Column[joinkey0], Column[joinkey1]] outputColumnNames: _col10, _col11, _col12 Position of Big Table: 0 Select Operator expressions: expr: _col10 type: string expr: _col11 type: double expr: _col12 type: double outputColumnNames: _col10, _col11, _col12 Group By Operator aggregations: expr: sum((_col11 * (1 - _col12))) bucketGroup: false keys: expr: _col10 type: string mode: hash outputColumnNames: _col0, _col1 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Local Work: Map Reduce Local Work Stage: Stage-3 Map Reduce Alias -> Map Operator Tree: hdfs://diascld11:9000/tmp/hive-sysadmin/hive_2013-11-14_21-21-55_101_1064641855181823674/-mr-10003 Reduce Output Operator key expressions: expr: _col0 type: string sort order: + Map-reduce partition columns: expr: _col0 type: string tag: -1 value expressions: expr: _col1 type: double Reduce Operator Tree: Group By Operator aggregations: expr: sum(VALUE._col0) bucketGroup: false keys: expr: KEY._col0 type: string mode: mergepartial outputColumnNames: _col0, _col1 Select Operator expressions: expr: _col0 type: string expr: _col1 type: double outputColumnNames: _col0, _col1 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Stage: Stage-4 Map Reduce Alias -> Map Operator Tree: hdfs://diascld11:9000/tmp/hive-sysadmin/hive_2013-11-14_21-21-55_101_1064641855181823674/-mr-10004 Reduce Output Operator key expressions: expr: _col1 type: double sort order: - tag: -1 value expressions: expr: _col0 type: string expr: _col1 type: double Reduce Operator Tree: Extract File Output Operator compressed: false GlobalTableId: 1 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.q5_local_supplier_volume Stage: Stage-0 Move Operator tables: replace: true table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.q5_local_supplier_volume Stage: Stage-5 Stats-Aggr Operator