[ https://issues.apache.org/jira/browse/HIVE-14082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346777#comment-15346777 ]
Sahil Takiar edited comment on HIVE-14082 at 7/18/16 8:50 PM: -------------------------------------------------------------- After a lot of investigation into these two errors, they seem to be slightly related. The TL;DR is that I think there is a bug in the class {{ReduceSinkOperator}}, but I am not 100% sure. I started debugging Exception 1 first. It seems that {{GenericUDFOPEqualOrGreaterThan}} tries to compare a {{Text}} object with value {{value_1_3}} to a {{DateWritable}} object with value {{2015-06-22}}. This happens inside the reduce method, and causes the code to throw a {{ClassCastException}} and fail. I compared the query plans for Query 1 vs. Query 3 (the output of {{EXPLAIN ...}}). Both plans require a single Map-Reduce job. The major difference is that the Query 3 runs a Filter Operator in both the Map Task and the Reduce Task, while Query 1 only runs a Filter Operator in the Map Task. I suspect the Filter Operator in the Reduce Task of Query 3 is having some type of issue. Here is the output of the {{EXPLAIN}} query: {code} +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | Explain | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | STAGE DEPENDENCIES: | | Stage-2 is a root stage | | Stage-0 depends on stages: Stage-2 | | Stage-3 depends on stages: Stage-0 | | Stage-1 depends on stages: Stage-2 | | Stage-4 depends on stages: Stage-1 | | | | STAGE PLANS: | | Stage: Stage-2 | | Map Reduce | | Map Operator Tree: | | TableScan | | alias: multi_table_insert_source | | Statistics: Num rows: 5 Data size: 250 Basic stats: COMPLETE Column stats: NONE | | Filter Operator | | predicate: ((date_column >= 2013-06-21) or (date_column >= 2015-06-22)) (type: boolean) | | Statistics: Num rows: 2 Data size: 100 Basic stats: COMPLETE Column stats: NONE | | Select Operator | | expressions: column_1 (type: string), column_2 (type: string), column_3 (type: string), date_column (type: date) | | outputColumnNames: column_1, column_2, column_3, date_column | | Statistics: Num rows: 2 Data size: 100 Basic stats: COMPLETE Column stats: NONE | | Reduce Output Operator | | key expressions: column_1 (type: string), column_2 (type: string), column_3 (type: string), date_column (type: date) | | sort order: ++++ | | Map-reduce partition columns: column_1 (type: string), column_2 (type: string) | | Statistics: Num rows: 2 Data size: 100 Basic stats: COMPLETE Column stats: NONE | | Reduce Operator Tree: | | Forward | | Statistics: Num rows: 2 Data size: 100 Basic stats: COMPLETE Column stats: NONE | | Filter Operator | | predicate: (KEY._col2:1._col0 >= 2015-06-22) (type: boolean) | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | Group By Operator | | aggregations: count(), count(DISTINCT KEY._col2:0._col0), count(DISTINCT KEY._col2:1._col0, KEY._col2:1._col1) | | keys: KEY._col0 (type: string), KEY._col1 (type: string) | | mode: complete | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | Select Operator | | expressions: _col0 (type: string), _col1 (type: string), UDFToInteger(_col2) (type: int), UDFToInteger(_col3) (type: int), UDFToInteger(_col4) (type: int) | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | File Output Operator | | compressed: false | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | table: | | input format: org.apache.hadoop.mapred.TextInputFormat | | output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | | serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | name: multi_table_insert_bug.multi_table_insert_test | | Filter Operator | | predicate: (KEY._col2:1._col0 >= 2013-06-21) (type: boolean) | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | Group By Operator | | aggregations: count(), count(DISTINCT KEY._col2:0._col0), count(DISTINCT KEY._col2:1._col0, KEY._col2:1._col1) | | keys: KEY._col0 (type: string), KEY._col1 (type: string) | | mode: complete | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | Select Operator | | expressions: _col0 (type: string), _col1 (type: string), UDFToInteger(_col2) (type: int), UDFToInteger(_col3) (type: int), UDFToInteger(_col4) (type: int) | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | File Output Operator | | compressed: false | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | table: | | input format: org.apache.hadoop.mapred.TextInputFormat | | output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | | serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | name: multi_table_insert_bug.multi_table_insert_test | | | | Stage: Stage-0 | | Move Operator | | tables: | | partition: | | partition_column 365 | | replace: true | | table: | | input format: org.apache.hadoop.mapred.TextInputFormat | | output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | | serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | name: multi_table_insert_bug.multi_table_insert_test | | | | Stage: Stage-3 | | Stats-Aggr Operator | | | | Stage: Stage-1 | | Move Operator | | tables: | | partition: | | partition_column 1096 | | replace: true | | table: | | input format: org.apache.hadoop.mapred.TextInputFormat | | output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | | serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | name: multi_table_insert_bug.multi_table_insert_test | | | | Stage: Stage-4 | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | Explain | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | Stats-Aggr Operator | | | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ {code} The {{ReduceSinkOperator}} class is emitting the record: *Record 1:* {code} {"key":{"_col0":"value_1_1","_col1":"value_1_2","_col2":{0:{"_col0":"value_1_3"}}},"value":null} {code} Which does not contain the {{date_column}} column, causing the {{FilterOperator}} in the {{ExecReducer}} to fail. Once I determined that {{ReduceSinkOperator}} is emitting this record, I did my best to trace through the logic of {{ReduceSinkOperator}} to figure out why it is emitting Record 1. The {{ReduceSinkOperator}} seems to emit a single record for each {{DISTINCT}} keyword in the query, but the record will only have a subset of its original columns. For each {{DISTINCT}} clause it will emit the columns in the {{DISTINCT}} clause itself, as well as the columns in the {{GROUP BY}}. For example, for {{COUNT(DISTINCT column_3)}} it will emit {{column_1}}, {{column_2}}, and {{column_3}}, but not the {{date_column}}. *I'm not sure the {{ReduceSinkOperator}} takes into account the situation where a {{FilterOperator}} can occur on the reduce-side of a Hive query, over a column not in the {{DISTINCT}} or {{GROUP BY}} clause.* Exception 2 seems to be related to Exception 1. Exception 2 is thrown in the class {{OpProcFactory.ReduceSinkLineage.process(...)}} method. This method does some type of processing on the {{ReduceSinkOperator}} class (the same class mentioned above when analyzing Exception 1), so my guess is that they are related. was (Author: stakiar): After a lot of investigation into these two errors, they seem to be slightly related. The TL;DR is that I think there is a bug in the class {{ReduceSinkOperator}}, but I am not 100% sure. I started debugging Exception 1 first. It seems that {{GenericUDFOPEqualOrGreaterThan}} tries to compare a {{Text}} object with value {{value_1_3}} to a {{DateWritable}} object with value {{2015-06-22}}. This happens inside the reduce method, and causes the code to throw a {{ClassCastException}} and fail. I compared the query plans for Query 1 vs. Query 3 (the output of {{EXPLAIN ...}}). Both plans require a single Map-Reduce job. The major difference is that the Query 3 runs a Filter Operator in both the Map Task and the Reduce Task, while Query 1 only runs a Filter Operator in the Map Task. I suspect the Filter Operator in the Reduce Task of Query 3 is having some type of issue. Here is the output of the {{EXPLAIN}} query: {code} +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | Explain | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | STAGE DEPENDENCIES: | | Stage-2 is a root stage | | Stage-0 depends on stages: Stage-2 | | Stage-3 depends on stages: Stage-0 | | Stage-1 depends on stages: Stage-2 | | Stage-4 depends on stages: Stage-1 | | | | STAGE PLANS: | | Stage: Stage-2 | | Map Reduce | | Map Operator Tree: | | TableScan | | alias: multi_table_insert_source | | Statistics: Num rows: 5 Data size: 250 Basic stats: COMPLETE Column stats: NONE | | Filter Operator | | predicate: ((date_column >= 2013-06-21) or (date_column >= 2015-06-22)) (type: boolean) | | Statistics: Num rows: 2 Data size: 100 Basic stats: COMPLETE Column stats: NONE | | Select Operator | | expressions: column_1 (type: string), column_2 (type: string), column_3 (type: string), date_column (type: date) | | outputColumnNames: column_1, column_2, column_3, date_column | | Statistics: Num rows: 2 Data size: 100 Basic stats: COMPLETE Column stats: NONE | | Reduce Output Operator | | key expressions: column_1 (type: string), column_2 (type: string), column_3 (type: string), date_column (type: date) | | sort order: ++++ | | Map-reduce partition columns: column_1 (type: string), column_2 (type: string) | | Statistics: Num rows: 2 Data size: 100 Basic stats: COMPLETE Column stats: NONE | | Reduce Operator Tree: | | Forward | | Statistics: Num rows: 2 Data size: 100 Basic stats: COMPLETE Column stats: NONE | | Filter Operator | | predicate: (KEY._col2:1._col0 >= 2015-06-22) (type: boolean) | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | Group By Operator | | aggregations: count(), count(DISTINCT KEY._col2:0._col0), count(DISTINCT KEY._col2:1._col0, KEY._col2:1._col1) | | keys: KEY._col0 (type: string), KEY._col1 (type: string) | | mode: complete | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | Select Operator | | expressions: _col0 (type: string), _col1 (type: string), UDFToInteger(_col2) (type: int), UDFToInteger(_col3) (type: int), UDFToInteger(_col4) (type: int) | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | File Output Operator | | compressed: false | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | table: | | input format: org.apache.hadoop.mapred.TextInputFormat | | output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | | serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | name: multi_table_insert_bug.multi_table_insert_test | | Filter Operator | | predicate: (KEY._col2:1._col0 >= 2013-06-21) (type: boolean) | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | Group By Operator | | aggregations: count(), count(DISTINCT KEY._col2:0._col0), count(DISTINCT KEY._col2:1._col0, KEY._col2:1._col1) | | keys: KEY._col0 (type: string), KEY._col1 (type: string) | | mode: complete | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | Select Operator | | expressions: _col0 (type: string), _col1 (type: string), UDFToInteger(_col2) (type: int), UDFToInteger(_col3) (type: int), UDFToInteger(_col4) (type: int) | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | File Output Operator | | compressed: false | | Statistics: Num rows: 1 Data size: 50 Basic stats: COMPLETE Column stats: NONE | | table: | | input format: org.apache.hadoop.mapred.TextInputFormat | | output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | | serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | name: multi_table_insert_bug.multi_table_insert_test | | | | Stage: Stage-0 | | Move Operator | | tables: | | partition: | | partition_column 365 | | replace: true | | table: | | input format: org.apache.hadoop.mapred.TextInputFormat | | output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | | serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | name: multi_table_insert_bug.multi_table_insert_test | | | | Stage: Stage-3 | | Stats-Aggr Operator | | | | Stage: Stage-1 | | Move Operator | | tables: | | partition: | | partition_column 1096 | | replace: true | | table: | | input format: org.apache.hadoop.mapred.TextInputFormat | | output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | | serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | name: multi_table_insert_bug.multi_table_insert_test | | | | Stage: Stage-4 | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | Explain | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | Stats-Aggr Operator | | | +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ {code} The {{ReduceSinkOperator}} class is emitting the record: *Record 1:* {code} {"key":{"_col0":"value_1_1","_col1":"value_1_2","_col2":{0:{"_col0":"value_1_3"}}},"value":null} {code} Which does not contain the {{date_column}} column, causing the {{FilterOperator}} in the {{ExecReducer}} to fail. Once I determined that {{ReduceSinkOperator}} is emitting this record, I did my best to trace through the logic of {{ReduceSinkOperator}} to figure out why it is emitting Record 1. The {{ReduceSinkOperator}} seems to emit a single record for each {{DISTINCT}} keyword in the query, but the record will only have a subset of its original columns. For each {{DISTINCT}} clause it will emit the columns in the {{DISTINCT}} clause itself, as well as the columns in the {{GROUP BY}}. For example, for {{COUNT(DISTINCT column_3)}} it will emit {{column_1}}, {{column_2}}, and {{column_3}}, but not the {{date_column}}. I'm not sure the {{ReduceSinkOperator}} takes into account the situation where a {{FilterOperator}} can occur on the reduce-side of a Hive query, over a column not in the {{DISTINCT}} or {{GROUP BY}} clause. Exception 2 seems to be related to Exception 1. Exception 2 is thrown in the class {{OpProcFactory.ReduceSinkLineage.process(...)}} method. This method does some type of processing on the {{ReduceSinkOperator}} class (the same class mentioned above when analyzing Exception 1), so my guess is that they are related. > Multi-Insert Query Fails with GROUP BY, DISTINCT, and WHERE clauses > ------------------------------------------------------------------- > > Key: HIVE-14082 > URL: https://issues.apache.org/jira/browse/HIVE-14082 > Project: Hive > Issue Type: Bug > Affects Versions: 1.1.0, 2.1.0 > Reporter: Sahil Takiar > Assignee: Sahil Takiar > > The following MULTI-INSERT Query Fails in Hive. I've listed the query > required to re-produce this failure, as well as a few similar queries that > work properly. > Setup Queries: > {code} > DROP SCHEMA IF EXISTS multi_table_insert_bug CASCADE; > CREATE SCHEMA multi_table_insert_bug; > USE multi_table_insert_bug; > DROP TABLE IF EXISTS multi_table_insert_source; > DROP TABLE IF EXISTS multi_table_insert_test; > CREATE TABLE multi_table_insert_source ( > date_column DATE, > column_1 STRING, > column_2 STRING, > column_3 STRING, > column_4 STRING > ); > CREATE TABLE multi_table_insert_test ( > column_1 STRING, > column_2 STRING, > line_count INT, > distinct_count_by_1_column INT, > distinct_count_by_2_columns INT > ) > PARTITIONED BY (partition_column INT); > INSERT OVERWRITE TABLE multi_table_insert_source VALUES > ('2016-01-22', 'value_1_1', 'value_1_2', 'value_1_3', 'value_1_4'), > ('2016-01-22', 'value_2_1', 'value_2_2', 'value_2_3', 'value_2_4'), > ('2016-01-22', 'value_3_1', 'value_3_2', 'value_3_3', 'value_3_4'), > ('2016-01-22', 'value_4_1', 'value_4_2', 'value_4_3', 'value_4_4'), > ('2016-01-22', 'value_5_1', 'value_5_2', 'value_5_3', 'value_5_4'); > {code} > The following queries run successfully: > *Query 1:* > {code} > FROM multi_table_insert_source > INSERT OVERWRITE TABLE multi_table_insert_test PARTITION (partition_column > = 365) > SELECT > column_1, > column_2, > COUNT(*) AS line_count, > COUNT(DISTINCT column_3) AS distinct_count_by_1_column, > COUNT(DISTINCT date_column, column_3) AS distinct_count_by_2_columns > WHERE date_column >= DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP()), 365) > GROUP BY > column_1, > column_2; > {code} > *Query 2:* > {code} > FROM multi_table_insert_source > INSERT OVERWRITE TABLE multi_table_insert_test PARTITION (partition_column > = 365) > SELECT > column_1, > column_2, > COUNT(*) AS line_count, > COUNT(DISTINCT column_3) AS distinct_count_by_1_column, > COUNT(DISTINCT date_column, column_3) AS distinct_count_by_2_columns > -- WHERE date_column >= DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP()), 365) > GROUP BY > column_1, > column_2 > INSERT OVERWRITE TABLE multi_table_insert_test PARTITION (partition_column > = 1096) > SELECT > column_1, > column_2, > COUNT(*) AS line_count, > COUNT(DISTINCT column_3) AS distinct_count_by_1_column, > COUNT(DISTINCT date_column, column_3) AS distinct_count_by_2_columns > -- WHERE date_column >= DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP()), 1096) > GROUP BY > column_1, > column_2; > {code} > The following query fails with a {{ClassCastException}}: > *Query 3:* > {code} > FROM multi_table_insert_source > INSERT OVERWRITE TABLE multi_table_insert_test PARTITION (partition_column > = 365) > SELECT > column_1, > column_2, > COUNT(*) AS line_count, > COUNT(DISTINCT column_3) AS distinct_count_by_1_column, > COUNT(DISTINCT date_column, column_3) AS distinct_count_by_2_columns > WHERE date_column >= DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP()), 365) > GROUP BY > column_1, > column_2 > INSERT OVERWRITE TABLE multi_table_insert_test PARTITION (partition_column > = 1096) > SELECT > column_1, > column_2, > COUNT(*) AS line_count, > COUNT(DISTINCT column_3) AS distinct_count_by_1_column, > COUNT(DISTINCT date_column, column_3) AS distinct_count_by_2_columns > WHERE date_column >= DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP()), 1096) > GROUP BY > column_1, > column_2; > {code} > Here is the full stack-trace of the exception: > *Exception 1:* > {code} > java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: > Hive Runtime Error while processing row (tag=0) > {"key":{"_col0":"value_1_1","_col1":"value_1_2","_col2":{0:{"_col0":"value_1_3"}}},"value":null} > at > org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:257) > at > org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:506) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:447) > at > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:449) > Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime > Error while processing row (tag=0) > {"key":{"_col0":"value_1_1","_col1":"value_1_2","_col2":{0:{"_col0":"value_1_3"}}},"value":null} > at > org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:245) > ... 3 more > Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be > cast to org.apache.hadoop.hive.serde2.io.DateWritable > at > org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDateObjectInspector.getPrimitiveWritableObject(WritableDateObjectInspector.java:38) > at > org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.compare(ObjectInspectorUtils.java:938) > at > org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.compare(ObjectInspectorUtils.java:818) > at > org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.compare(ObjectInspectorUtils.java:809) > at > org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan.evaluate(GenericUDFOPEqualOrGreaterThan.java:141) > at > org.apache.hadoop.hive.ql.exec.ExprNodeGenericFuncEvaluator._evaluate(ExprNodeGenericFuncEvaluator.java:186) > at > org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator.evaluate(ExprNodeEvaluator.java:77) > at > org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator.evaluate(ExprNodeEvaluator.java:65) > at > org.apache.hadoop.hive.ql.exec.FilterOperator.process(FilterOperator.java:112) > at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:878) > at > org.apache.hadoop.hive.ql.exec.ForwardOperator.process(ForwardOperator.java:38) > at > org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:236) > ... 3 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)