[ https://issues.apache.org/jira/browse/HIVE-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110187#comment-16110187 ]
Sahil Takiar edited comment on HIVE-17225 at 8/2/17 3:31 AM: ------------------------------------------------------------- The query above fails because the {{Spark Partition Pruning Sink Operator}} in Map 3 has a target work of Map 1. This means that when Map 1 runs, it will looks for a tmp file on HDFS that contains all the partitions it should scan. The problem is that Map 1 and Map 3 will run in parallel, so Map 1 will fail with a FNF exception. Here is a brief explanation of the whats happening in the query above. There are three tables: pt1, r1 and r2. pt1 is partitioned, r1t and rt2 aren't. In terms of data size pt1 < rt1 = rt2. Map-joins are enabled. Since pt1 is the smallest table, it is scanned and written to a hash table. rt2 is also scanned and written to a hash table. rt1 is treated as the big table in the map-join. The hashtables for pt1 and rt2 are generated in the same Spark job. If DPP is enabled, then the scan for rt2 will result in a pruning sink targeting the scan for pt1. This will cause the FNF exception show above, because the scans for rt2 and pt1 run in parallel. was (Author: stakiar): Here is another example of when this can happen. Say there are three tables: pt1, pt2 and r1. pt1 and pt2 are partitioned and r1 is not. In terms of data size pt1 < r1 < pt2. If map-joins are enabled, and all three tables are joined the following scenario may occur. pt1 is scanned and written to a hash table, r1 is scanned and written to a hash table. pt2 is treated as the big table in the map-join. The hashtables for pt1 and r1 are generated in the same Spark job. If DPP is enabled, then the scan for r1 will result in a pruning sink targeting the scan for pt1. This will cause the FNF exception show above, because the scans for r1 and pt1 run in parallel. > HoS DPP pruning sink ops can target parallel work objects > --------------------------------------------------------- > > Key: HIVE-17225 > URL: https://issues.apache.org/jira/browse/HIVE-17225 > Project: Hive > Issue Type: Sub-task > Components: Spark > Affects Versions: 3.0.0 > Reporter: Sahil Takiar > Assignee: Sahil Takiar > > Setup: > {code:sql} > SET hive.spark.dynamic.partition.pruning=true; > SET hive.strict.checks.cartesian.product=false; > SET hive.auto.convert.join=true; > CREATE TABLE partitioned_table1 (col int) PARTITIONED BY (part_col int); > CREATE TABLE regular_table1 (col int); > CREATE TABLE regular_table2 (col int); > ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 1); > ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 2); > ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 3); > INSERT INTO table regular_table1 VALUES (1), (2), (3), (4), (5), (6); > INSERT INTO table regular_table2 VALUES (1), (2), (3), (4), (5), (6); > INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 1) VALUES (1); > INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 2) VALUES (2); > INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 3) VALUES (3); > SELECT * > FROM partitioned_table1, > regular_table1 rt1, > regular_table2 rt2 > WHERE rt1.col = partitioned_table1.part_col > AND rt2.col = partitioned_table1.part_col; > {code} > Exception: > {code} > 2017-08-01T13:27:47,483 ERROR [b0d354a8-4cdb-4ba9-acec-27d14926aaf4 main] > ql.Driver: FAILED: Execution Error, return code 3 from > org.apache.hadoop.hive.ql.exec.spark.SparkTask. java.lang.RuntimeException: > org.apache.hadoop.hive.ql.metadata.HiveException: > java.io.FileNotFoundException: File > file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 > does not exist > at > org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:408) > at > org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:498) > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) > at > org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:127) > at > org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:125) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) > at > org.apache.spark.rdd.AsyncRDDActions.foreachAsync(AsyncRDDActions.scala:125) > at > org.apache.spark.api.java.JavaRDDLike$class.foreachAsync(JavaRDDLike.scala:731) > at > org.apache.spark.api.java.AbstractJavaRDDLike.foreachAsync(JavaRDDLike.scala:45) > at > org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:351) > at > org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:358) > at > org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: > java.io.FileNotFoundException: File > file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 > does not exist > at > org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:147) > at > org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.prune(SparkDynamicPartitionPruner.java:76) > at > org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:406) > ... 62 more > Caused by: java.io.FileNotFoundException: File > file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) > at > org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674) > at > org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:119) > ... 64 more > {code} > The explain plan for the query is: > {code} > STAGE DEPENDENCIES: > Stage-2 is a root stage > Stage-1 depends on stages: Stage-2 > Stage-0 depends on stages: Stage-1 > STAGE PLANS: > Stage: Stage-2 > Spark > DagName: stakiar_20170801202453_5376c59d-2eca-47ca-94bc-a3049f7fbd0a:39 > Vertices: > Map 1 > Map Operator Tree: > TableScan > alias: partitioned_table1 > Statistics: Num rows: 3 Data size: 3 Basic stats: COMPLETE > Column stats: NONE > Select Operator > expressions: col (type: int), part_col (type: int) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 3 Data size: 3 Basic stats: > COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 _col1 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > Local Work: > Map Reduce Local Work > Map 3 > Map Operator Tree: > TableScan > alias: rt2 > Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE > Column stats: NONE > Filter Operator > predicate: col is not null (type: boolean) > Statistics: Num rows: 6 Data size: 6 Basic stats: > COMPLETE Column stats: NONE > Select Operator > expressions: col (type: int) > outputColumnNames: _col0 > Statistics: Num rows: 6 Data size: 6 Basic stats: > COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 _col1 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > Select Operator > expressions: _col0 (type: int) > outputColumnNames: _col0 > Statistics: Num rows: 6 Data size: 6 Basic stats: > COMPLETE Column stats: NONE > Group By Operator > keys: _col0 (type: int) > mode: hash > outputColumnNames: _col0 > Statistics: Num rows: 6 Data size: 6 Basic stats: > COMPLETE Column stats: NONE > Spark Partition Pruning Sink Operator > partition key expr: part_col > Statistics: Num rows: 6 Data size: 6 Basic stats: > COMPLETE Column stats: NONE > target column name: part_col > target work: Map 1 > Local Work: > Map Reduce Local Work > Stage: Stage-1 > Spark > DagName: stakiar_20170801202453_5376c59d-2eca-47ca-94bc-a3049f7fbd0a:38 > Vertices: > Map 2 > Map Operator Tree: > TableScan > alias: rt1 > Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE > Column stats: NONE > Filter Operator > predicate: col is not null (type: boolean) > Statistics: Num rows: 6 Data size: 6 Basic stats: > COMPLETE Column stats: NONE > Select Operator > expressions: col (type: int) > outputColumnNames: _col0 > Statistics: Num rows: 6 Data size: 6 Basic stats: > COMPLETE Column stats: NONE > Map Join Operator > condition map: > Inner Join 0 to 1 > Inner Join 0 to 2 > keys: > 0 _col1 (type: int) > 1 _col0 (type: int) > 2 _col0 (type: int) > outputColumnNames: _col0, _col1, _col2, _col3 > input vertices: > 0 Map 1 > 2 Map 3 > Statistics: Num rows: 13 Data size: 13 Basic stats: > COMPLETE Column stats: NONE > File Output Operator > compressed: false > Statistics: Num rows: 13 Data size: 13 Basic stats: > COMPLETE Column stats: NONE > table: > input format: > org.apache.hadoop.mapred.SequenceFileInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > serde: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > Local Work: > Map Reduce Local Work > Stage: Stage-0 > Fetch Operator > limit: -1 > Processor Tree: > ListSink > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)