Hello All,
I encounter a bug when executing TPCH queries with skewed join optimization
enabled:
In particular, if the skewed join optimization is enabled but not triggered
(i.e., the number of
rows with the same key is less than "hive.skewjoin.key") all the following jobs
of the
query are filtered out mistakenly at runtime (for instance only stage 6
and 22 are executed from the plan attached). The corresponding query
using only common joins executes correctly. Similar behaviour is observed
for multiple TPCH queries.
If anyone can comment on this issue or give me any pointers on what could go
wrong
I would really appreciate it. I can also provide the queries and guidance in
reproducing the error if anyone from the development team is interested.
Thanks a lot!
Adrian
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME customer10) c)
(TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME orders10)
o) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME
lineitem10) l) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF
(TOK_TABNAME supplier10) s) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN
(TOK_TABREF (TOK_TABNAME nation10) n) (TOK_TABREF (TOK_TABNAME region10) r)
(and (= (. (TOK_TABLE_OR_COL n) n_regionkey) (. (TOK_TABLE_OR_COL r)
r_regionkey)) (= (. (TOK_TABLE_OR_COL r) r_name) 'ASIA')))) (TOK_INSERT
(TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR
(TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR (TOK_TABLE_OR_COL n_nationkey))))) n1)
(= (. (TOK_TABLE_OR_COL s) s_nationkey) (. (TOK_TABLE_OR_COL n1)
n_nationkey)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
(TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR
(TOK_TABLE_OR_COL s_suppkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL s_nationkey)))))
s1) (= (. (TOK_TABLE_OR_COL l) l_suppkey) (. (TOK_TABLE_OR_COL s1)
s_suppkey)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT
(TOK_SELEXPR (TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR (TOK_TABLE_OR_COL
l_extendedprice)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_discount)) (TOK_SELEXPR
(TOK_TABLE_OR_COL l_orderkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL s_nationkey)))))
l1) (and (and (= (. (TOK_TABLE_OR_COL l1) l_orderkey) (. (TOK_TABLE_OR_COL o)
o_orderkey)) (>= (. (TOK_TABLE_OR_COL o) o_orderdate) '1994-01-01')) (< (.
(TOK_TABLE_OR_COL o) o_orderdate) '1995-01-01')))) (TOK_INSERT (TOK_DESTINATION
(TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL n_name))
(TOK_SELEXPR (TOK_TABLE_OR_COL l_extendedprice)) (TOK_SELEXPR (TOK_TABLE_OR_COL
l_discount)) (TOK_SELEXPR (TOK_TABLE_OR_COL s_nationkey)) (TOK_SELEXPR
(TOK_TABLE_OR_COL o_custkey))))) o1) (and (= (. (TOK_TABLE_OR_COL c)
c_nationkey) (. (TOK_TABLE_OR_COL o1) s_nationkey)) (= (. (TOK_TABLE_OR_COL c)
c_custkey) (. (TOK_TABLE_OR_COL o1) o_custkey))))) (TOK_INSERT (TOK_DESTINATION
(TOK_TAB (TOK_TABNAME q5_local_supplier_volume))) (TOK_SELECT (TOK_SELEXPR
(TOK_TABLE_OR_COL n_name)) (TOK_SELEXPR (TOK_FUNCTION sum (* (TOK_TABLE_OR_COL
l_extendedprice) (- 1 (TOK_TABLE_OR_COL l_discount)))) revenue)) (TOK_GROUPBY
(TOK_TABLE_OR_COL n_name)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEDESC
(TOK_TABLE_OR_COL revenue)))))
STAGE DEPENDENCIES:
Stage-6 is a root stage
Stage-22 depends on stages: Stage-6 , consists of Stage-27
Stage-27
Stage-21 depends on stages: Stage-27
Stage-7 depends on stages: Stage-6, Stage-21
Stage-20 depends on stages: Stage-7 , consists of Stage-26
Stage-26
Stage-19 depends on stages: Stage-26
Stage-8 depends on stages: Stage-7, Stage-19
Stage-18 depends on stages: Stage-8 , consists of Stage-25
Stage-25
Stage-17 depends on stages: Stage-25
Stage-1 depends on stages: Stage-8, Stage-17
Stage-16 depends on stages: Stage-1 , consists of Stage-24
Stage-24
Stage-15 depends on stages: Stage-24
Stage-2 depends on stages: Stage-1, Stage-15
Stage-14 depends on stages: Stage-2 , consists of Stage-23
Stage-23
Stage-13 depends on stages: Stage-23
Stage-3 depends on stages: Stage-2, Stage-13
Stage-4 depends on stages: Stage-3
Stage-0 depends on stages: Stage-4
Stage-5 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-6
Map Reduce
Alias -> Map Operator Tree:
o1:l1:s1:n1:n
TableScan
alias: n
Reduce Output Operator
key expressions:
expr: n_regionkey
type: int
sort order: +
Map-reduce partition columns:
expr: n_regionkey
type: int
tag: 0
value expressions:
expr: n_nationkey
type: int
expr: n_name
type: string
o1:l1:s1:n1:r
TableScan
alias: r
Filter Operator
predicate:
expr: (r_name = 'ASIA')
type: boolean
Reduce Output Operator
key expressions:
expr: r_regionkey
type: int
sort order: +
Map-reduce partition columns:
expr: r_regionkey
type: int
tag: 1
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col1}
1
handleSkewJoin: true
outputColumnNames: _col0, _col1
Select Operator
expressions:
expr: _col1
type: string
expr: _col0
type: int
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-22
Conditional Operator
Stage: Stage-27
Map Reduce Local Work
Alias -> Map Local Tables:
1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
1
HashTable Sink Operator
condition expressions:
0 {0_VALUE_0} {0_VALUE_1}
1
handleSkewJoin: false
keys:
0 [Column[joinkey0]]
1 [Column[joinkey0]]
Position of Big Table: 0
Stage: Stage-21
Map Reduce
Alias -> Map Operator Tree:
0
Map Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {0_VALUE_0} {0_VALUE_1}
1
handleSkewJoin: false
keys:
0 [Column[joinkey0]]
1 [Column[joinkey0]]
outputColumnNames: _col0, _col1
Position of Big Table: 0
Select Operator
expressions:
expr: _col1
type: string
expr: _col0
type: int
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Local Work:
Map Reduce Local Work
Stage: Stage-7
Map Reduce
Alias -> Map Operator Tree:
$INTNAME
Reduce Output Operator
key expressions:
expr: _col1
type: int
sort order: +
Map-reduce partition columns:
expr: _col1
type: int
tag: 1
value expressions:
expr: _col0
type: string
o1:l1:s1:s
TableScan
alias: s
Reduce Output Operator
key expressions:
expr: s_nationkey
type: int
sort order: +
Map-reduce partition columns:
expr: s_nationkey
type: int
tag: 0
value expressions:
expr: s_suppkey
type: int
expr: s_nationkey
type: int
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col3}
1 {VALUE._col0}
handleSkewJoin: true
outputColumnNames: _col0, _col3, _col9
Select Operator
expressions:
expr: _col9
type: string
expr: _col0
type: int
expr: _col3
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-20
Conditional Operator
Stage: Stage-26
Map Reduce Local Work
Alias -> Map Local Tables:
1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
1
HashTable Sink Operator
condition expressions:
0 {0_VALUE_0} {0_VALUE_1}
1 {1_VALUE_0}
handleSkewJoin: false
keys:
0 [Column[joinkey0]]
1 [Column[joinkey0]]
Position of Big Table: 0
Stage: Stage-19
Map Reduce
Alias -> Map Operator Tree:
0
Map Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {0_VALUE_0} {0_VALUE_1}
1 {1_VALUE_0}
handleSkewJoin: false
keys:
0 [Column[joinkey0]]
1 [Column[joinkey0]]
outputColumnNames: _col0, _col3, _col9
Position of Big Table: 0
Select Operator
expressions:
expr: _col9
type: string
expr: _col0
type: int
expr: _col3
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Local Work:
Map Reduce Local Work
Stage: Stage-8
Map Reduce
Alias -> Map Operator Tree:
$INTNAME
Reduce Output Operator
key expressions:
expr: _col1
type: int
sort order: +
Map-reduce partition columns:
expr: _col1
type: int
tag: 1
value expressions:
expr: _col0
type: string
expr: _col2
type: int
o1:l1:l
TableScan
alias: l
Reduce Output Operator
key expressions:
expr: l_suppkey
type: int
sort order: +
Map-reduce partition columns:
expr: l_suppkey
type: int
tag: 0
value expressions:
expr: l_orderkey
type: int
expr: l_extendedprice
type: double
expr: l_discount
type: double
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col5} {VALUE._col6}
1 {VALUE._col0} {VALUE._col2}
handleSkewJoin: true
outputColumnNames: _col0, _col5, _col6, _col18, _col20
Select Operator
expressions:
expr: _col18
type: string
expr: _col5
type: double
expr: _col6
type: double
expr: _col0
type: int
expr: _col20
type: int
outputColumnNames: _col0, _col1, _col2, _col3, _col4
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-18
Conditional Operator
Stage: Stage-25
Map Reduce Local Work
Alias -> Map Local Tables:
1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
1
HashTable Sink Operator
condition expressions:
0 {0_VALUE_0} {0_VALUE_1} {0_VALUE_2}
1 {1_VALUE_0} {1_VALUE_1}
handleSkewJoin: false
keys:
0 [Column[joinkey0]]
1 [Column[joinkey0]]
Position of Big Table: 0
Stage: Stage-17
Map Reduce
Alias -> Map Operator Tree:
0
Map Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {0_VALUE_0} {0_VALUE_1} {0_VALUE_2}
1 {1_VALUE_0} {1_VALUE_1}
handleSkewJoin: false
keys:
0 [Column[joinkey0]]
1 [Column[joinkey0]]
outputColumnNames: _col0, _col5, _col6, _col18, _col20
Position of Big Table: 0
Select Operator
expressions:
expr: _col18
type: string
expr: _col5
type: double
expr: _col6
type: double
expr: _col0
type: int
expr: _col20
type: int
outputColumnNames: _col0, _col1, _col2, _col3, _col4
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Local Work:
Map Reduce Local Work
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
$INTNAME
Reduce Output Operator
key expressions:
expr: _col3
type: int
sort order: +
Map-reduce partition columns:
expr: _col3
type: int
tag: 1
value expressions:
expr: _col0
type: string
expr: _col1
type: double
expr: _col2
type: double
expr: _col4
type: int
o1:o
TableScan
alias: o
Filter Operator
predicate:
expr: ((o_orderdate >= '1994-01-01') and (o_orderdate <
'1995-01-01'))
type: boolean
Reduce Output Operator
key expressions:
expr: o_orderkey
type: int
sort order: +
Map-reduce partition columns:
expr: o_orderkey
type: int
tag: 0
value expressions:
expr: o_custkey
type: int
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col1}
1 {VALUE._col0} {VALUE._col1} {VALUE._col2} {VALUE._col4}
handleSkewJoin: true
outputColumnNames: _col1, _col11, _col12, _col13, _col15
Select Operator
expressions:
expr: _col11
type: string
expr: _col12
type: double
expr: _col13
type: double
expr: _col15
type: int
expr: _col1
type: int
outputColumnNames: _col0, _col1, _col2, _col3, _col4
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-16
Conditional Operator
Stage: Stage-24
Map Reduce Local Work
Alias -> Map Local Tables:
1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
1
HashTable Sink Operator
condition expressions:
0 {0_VALUE_0}
1 {1_VALUE_0} {1_VALUE_1} {1_VALUE_2} {1_VALUE_3}
handleSkewJoin: false
keys:
0 [Column[joinkey0]]
1 [Column[joinkey0]]
Position of Big Table: 0
Stage: Stage-15
Map Reduce
Alias -> Map Operator Tree:
0
Map Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {0_VALUE_0}
1 {1_VALUE_0} {1_VALUE_1} {1_VALUE_2} {1_VALUE_3}
handleSkewJoin: false
keys:
0 [Column[joinkey0]]
1 [Column[joinkey0]]
outputColumnNames: _col1, _col11, _col12, _col13, _col15
Position of Big Table: 0
Select Operator
expressions:
expr: _col11
type: string
expr: _col12
type: double
expr: _col13
type: double
expr: _col15
type: int
expr: _col1
type: int
outputColumnNames: _col0, _col1, _col2, _col3, _col4
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Local Work:
Map Reduce Local Work
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
$INTNAME
Reduce Output Operator
key expressions:
expr: _col3
type: int
expr: _col4
type: int
sort order: ++
Map-reduce partition columns:
expr: _col3
type: int
expr: _col4
type: int
tag: 1
value expressions:
expr: _col0
type: string
expr: _col1
type: double
expr: _col2
type: double
c
TableScan
alias: c
Reduce Output Operator
key expressions:
expr: c_nationkey
type: int
expr: c_custkey
type: int
sort order: ++
Map-reduce partition columns:
expr: c_nationkey
type: int
expr: c_custkey
type: int
tag: 0
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0
1 {VALUE._col0} {VALUE._col1} {VALUE._col2}
handleSkewJoin: true
outputColumnNames: _col10, _col11, _col12
Select Operator
expressions:
expr: _col10
type: string
expr: _col11
type: double
expr: _col12
type: double
outputColumnNames: _col10, _col11, _col12
Group By Operator
aggregations:
expr: sum((_col11 * (1 - _col12)))
bucketGroup: false
keys:
expr: _col10
type: string
mode: hash
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-14
Conditional Operator
Stage: Stage-23
Map Reduce Local Work
Alias -> Map Local Tables:
1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
1
HashTable Sink Operator
condition expressions:
0
1 {1_VALUE_0} {1_VALUE_1} {1_VALUE_2}
handleSkewJoin: false
keys:
0 [Column[joinkey0], Column[joinkey1]]
1 [Column[joinkey0], Column[joinkey1]]
Position of Big Table: 0
Stage: Stage-13
Map Reduce
Alias -> Map Operator Tree:
0
Map Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0
1 {1_VALUE_0} {1_VALUE_1} {1_VALUE_2}
handleSkewJoin: false
keys:
0 [Column[joinkey0], Column[joinkey1]]
1 [Column[joinkey0], Column[joinkey1]]
outputColumnNames: _col10, _col11, _col12
Position of Big Table: 0
Select Operator
expressions:
expr: _col10
type: string
expr: _col11
type: double
expr: _col12
type: double
outputColumnNames: _col10, _col11, _col12
Group By Operator
aggregations:
expr: sum((_col11 * (1 - _col12)))
bucketGroup: false
keys:
expr: _col10
type: string
mode: hash
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Local Work:
Map Reduce Local Work
Stage: Stage-3
Map Reduce
Alias -> Map Operator Tree:
hdfs://diascld11:9000/tmp/hive-sysadmin/hive_2013-11-14_21-21-55_101_1064641855181823674/-mr-10003
Reduce Output Operator
key expressions:
expr: _col0
type: string
sort order: +
Map-reduce partition columns:
expr: _col0
type: string
tag: -1
value expressions:
expr: _col1
type: double
Reduce Operator Tree:
Group By Operator
aggregations:
expr: sum(VALUE._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: string
mode: mergepartial
outputColumnNames: _col0, _col1
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: double
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-4
Map Reduce
Alias -> Map Operator Tree:
hdfs://diascld11:9000/tmp/hive-sysadmin/hive_2013-11-14_21-21-55_101_1064641855181823674/-mr-10004
Reduce Output Operator
key expressions:
expr: _col1
type: double
sort order: -
tag: -1
value expressions:
expr: _col0
type: string
expr: _col1
type: double
Reduce Operator Tree:
Extract
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.q5_local_supplier_volume
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.q5_local_supplier_volume
Stage: Stage-5
Stats-Aggr Operator