[ https://issues.apache.org/jira/browse/HIVE-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110187#comment-16110187 ]
Sahil Takiar commented on HIVE-17225: ------------------------------------- Here is another example of when this can happen. Say there are three tables: pt1, pt2 and r1. pt1 and pt2 are partitioned and r1 is not. In terms of data size pt1 < r1 < pt2. If map-joins are enabled, and all three tables are joined the following scenario may occur. pt1 is scanned and written to a hash table, r1 is scanned and written to a hash table. pt2 is treated as the big table in the map-join. The hashtables for pt1 and r1 are generated in the same Spark job. If DPP is enabled, then the scan for r1 will result in a pruning sink targeting the scan for pt1. This will cause the FNF exception show above, because the scans for r1 and pt1 run in parallel. > HoS DPP pruning sink ops can target parallel work objects > --------------------------------------------------------- > > Key: HIVE-17225 > URL: https://issues.apache.org/jira/browse/HIVE-17225 > Project: Hive > Issue Type: Sub-task > Components: Spark > Affects Versions: 3.0.0 > Reporter: Sahil Takiar > Assignee: Sahil Takiar > > Setup: > {code:sql} > SET hive.spark.dynamic.partition.pruning=true; > SET hive.strict.checks.cartesian.product=false; > SET hive.auto.convert.join=true; > CREATE TABLE partitioned_table1 (col int) PARTITIONED BY (part_col int); > CREATE TABLE regular_table1 (col1 int, col2 int); > CREATE TABLE regular_table2 (col1 int, col2 int); > ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 1); > ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 2); > ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 3); > INSERT INTO table regular_table1 VALUES (0, 0), (1, 1), (2, 2); > INSERT INTO table regular_table2 VALUES (0, 0), (1, 1), (2, 2); > INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 1) VALUES (1), > (2), (3); > INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 2) VALUES (1), > (2), (3); > INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 3) VALUES (1), > (2), (3); > SELECT * > FROM regular_table1, > regular_table2, > partitioned_table1 > WHERE partitioned_table1.part_col IN (SELECT regular_table1.col2 > FROM regular_table1 > WHERE regular_table1.col1 > 0) > AND partitioned_table1.part_col IN (SELECT regular_table2.col2 > FROM regular_table2 > WHERE regular_table2.col1 > 1); > {code} > Exception: > {code} > 2017-08-01T13:27:47,483 ERROR [b0d354a8-4cdb-4ba9-acec-27d14926aaf4 main] > ql.Driver: FAILED: Execution Error, return code 3 from > org.apache.hadoop.hive.ql.exec.spark.SparkTask. java.lang.RuntimeException: > org.apache.hadoop.hive.ql.metadata.HiveException: > java.io.FileNotFoundException: File > file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 > does not exist > at > org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:408) > at > org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:498) > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at > org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:127) > at > org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:125) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) > at > org.apache.spark.rdd.AsyncRDDActions.foreachAsync(AsyncRDDActions.scala:125) > at > org.apache.spark.api.java.JavaRDDLike$class.foreachAsync(JavaRDDLike.scala:731) > at > org.apache.spark.api.java.AbstractJavaRDDLike.foreachAsync(JavaRDDLike.scala:45) > at > org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:351) > at > org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:358) > at > org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: > java.io.FileNotFoundException: File > file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 > does not exist > at > org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:147) > at > org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.prune(SparkDynamicPartitionPruner.java:76) > at > org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:406) > ... 62 more > Caused by: java.io.FileNotFoundException: File > file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) > at > org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674) > at > org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:119) > ... 64 more > {code} > The explain plan for the query is: > {code} > STAGE DEPENDENCIES: > Stage-2 is a root stage > Stage-1 depends on stages: Stage-2 > Stage-0 depends on stages: Stage-1 > STAGE PLANS: > Stage: Stage-2 > Spark > Edges: > Reducer 5 <- Map 4 (GROUP, 2) > DagName: stakiar_20170801191847_f8b1a6fa-a4b6-47b7-977e-95ffb63265f4:39 > Vertices: > Map 2 > Map Operator Tree: > TableScan > alias: regular_table2 > Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Select Operator > expressions: col1 (type: int), col2 (type: int) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 3 Data size: 9 Basic stats: > COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 > 1 > 2 > Local Work: > Map Reduce Local Work > Map 3 > Map Operator Tree: > TableScan > alias: partitioned_table1 > Statistics: Num rows: 9 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Select Operator > expressions: col (type: int), part_col (type: int) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 9 Data size: 9 Basic stats: > COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 > 1 > 2 > Local Work: > Map Reduce Local Work > Map 4 > Map Operator Tree: > TableScan > alias: regular_table1 > Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Filter Operator > predicate: ((col1 > 0) and col2 is not null) (type: > boolean) > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Select Operator > expressions: col2 (type: int) > outputColumnNames: col2 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Group By Operator > keys: col2 (type: int) > mode: hash > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Reduce Output Operator > key expressions: _col0 (type: int) > sort order: + > Map-reduce partition columns: _col0 (type: int) > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Map 6 > Map Operator Tree: > TableScan > alias: regular_table2 > Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Filter Operator > predicate: ((col1 > 1) and col2 is not null) (type: > boolean) > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Select Operator > expressions: col2 (type: int) > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Group By Operator > keys: _col0 (type: int) > mode: hash > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 _col5 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > Select Operator > expressions: _col0 (type: int) > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Group By Operator > keys: _col0 (type: int) > mode: hash > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Spark Partition Pruning Sink Operator > partition key expr: part_col > Statistics: Num rows: 1 Data size: 3 Basic > stats: COMPLETE Column stats: NONE > target column name: part_col > target work: Map 3 > Local Work: > Map Reduce Local Work > Reducer 5 > Local Work: > Map Reduce Local Work > Reduce Operator Tree: > Group By Operator > keys: KEY._col0 (type: int) > mode: mergepartial > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE > Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 _col5 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > Select Operator > expressions: _col0 (type: int) > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE > Column stats: NONE > Group By Operator > keys: _col0 (type: int) > mode: hash > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Spark Partition Pruning Sink Operator > partition key expr: part_col > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > target column name: part_col > target work: Map 3 > Stage: Stage-1 > Spark > DagName: stakiar_20170801191847_f8b1a6fa-a4b6-47b7-977e-95ffb63265f4:38 > Vertices: > Map 1 > Map Operator Tree: > TableScan > alias: regular_table1 > Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Select Operator > expressions: col1 (type: int), col2 (type: int) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 3 Data size: 9 Basic stats: > COMPLETE Column stats: NONE > Map Join Operator > condition map: > Inner Join 0 to 1 > Inner Join 0 to 2 > keys: > 0 > 1 > 2 > outputColumnNames: _col0, _col1, _col2, _col3, _col4, > _col5 > input vertices: > 1 Map 2 > 2 Map 3 > Statistics: Num rows: 81 Data size: 648 Basic stats: > COMPLETE Column stats: NONE > Map Join Operator > condition map: > Inner Join 0 to 1 > Left Semi Join 0 to 2 > keys: > 0 _col5 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > outputColumnNames: _col0, _col1, _col2, _col3, _col4, > _col5 > input vertices: > 1 Reducer 5 > 2 Map 6 > Statistics: Num rows: 178 Data size: 1425 Basic > stats: COMPLETE Column stats: NONE > File Output Operator > compressed: false > Statistics: Num rows: 178 Data size: 1425 Basic > stats: COMPLETE Column stats: NONE > table: > input format: > org.apache.hadoop.mapred.SequenceFileInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > serde: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > Local Work: > Map Reduce Local Work > Stage: Stage-0 > Fetch Operator > limit: -1 > Processor Tree: > ListSink > {code} > The explain plan with DPP disabled is: > {code} > STAGE DEPENDENCIES: > Stage-2 is a root stage > Stage-1 depends on stages: Stage-2 > Stage-0 depends on stages: Stage-1 > STAGE PLANS: > Stage: Stage-2 > Spark > Edges: > Reducer 5 <- Map 4 (GROUP, 2) > #### A masked pattern was here #### > Vertices: > Map 2 > Map Operator Tree: > TableScan > alias: regular_table2 > Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Select Operator > expressions: col1 (type: int), col2 (type: int) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 3 Data size: 9 Basic stats: > COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 > 1 > 2 > Local Work: > Map Reduce Local Work > Map 3 > Map Operator Tree: > TableScan > alias: partitioned_table1 > Statistics: Num rows: 9 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Select Operator > expressions: col (type: int), part_col (type: int) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 9 Data size: 9 Basic stats: > COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 > 1 > 2 > Local Work: > Map Reduce Local Work > Map 4 > Map Operator Tree: > TableScan > alias: regular_table1 > Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Filter Operator > predicate: ((col1 > 0) and col2 is not null) (type: > boolean) > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Select Operator > expressions: col2 (type: int) > outputColumnNames: col2 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Group By Operator > keys: col2 (type: int) > mode: hash > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Reduce Output Operator > key expressions: _col0 (type: int) > sort order: + > Map-reduce partition columns: _col0 (type: int) > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Map 6 > Map Operator Tree: > TableScan > alias: regular_table2 > Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Filter Operator > predicate: ((col1 > 1) and col2 is not null) (type: > boolean) > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Select Operator > expressions: col2 (type: int) > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Group By Operator > keys: _col0 (type: int) > mode: hash > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 _col5 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > Local Work: > Map Reduce Local Work > Reducer 5 > Local Work: > Map Reduce Local Work > Reduce Operator Tree: > Group By Operator > keys: KEY._col0 (type: int) > mode: mergepartial > outputColumnNames: _col0 > Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE > Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 _col5 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > Stage: Stage-1 > Spark > #### A masked pattern was here #### > Vertices: > Map 1 > Map Operator Tree: > TableScan > alias: regular_table1 > Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE > Column stats: NONE > Select Operator > expressions: col1 (type: int), col2 (type: int) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 3 Data size: 9 Basic stats: > COMPLETE Column stats: NONE > Map Join Operator > condition map: > Inner Join 0 to 1 > Inner Join 0 to 2 > keys: > 0 > 1 > 2 > outputColumnNames: _col0, _col1, _col2, _col3, _col4, > _col5 > input vertices: > 1 Map 2 > 2 Map 3 > Statistics: Num rows: 81 Data size: 648 Basic stats: > COMPLETE Column stats: NONE > Map Join Operator > condition map: > Inner Join 0 to 1 > Left Semi Join 0 to 2 > keys: > 0 _col5 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > outputColumnNames: _col0, _col1, _col2, _col3, _col4, > _col5 > input vertices: > 1 Reducer 5 > 2 Map 6 > Statistics: Num rows: 178 Data size: 1425 Basic > stats: COMPLETE Column stats: NONE > File Output Operator > compressed: false > Statistics: Num rows: 178 Data size: 1425 Basic > stats: COMPLETE Column stats: NONE > table: > input format: > org.apache.hadoop.mapred.SequenceFileInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > serde: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > Local Work: > Map Reduce Local Work > Stage: Stage-0 > Fetch Operator > limit: -1 > Processor Tree: > ListSink > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)