Hi All,

I have a use case where we try to run multiple parallel Spark SQL query to
write data to the same table but different partitions to speed up things.

But looks like different applications will end up using the same _temporary
directory
for ex:-
/user/hive/db/tb/_temporary/0/task_20210319101649_0006_m_000000..


And one finishing first might end up deleting the directory and the other
application will fail with this exception:

java.io.FileNotFoundException: File
org.apache.hadoop.hdfs://nameservice1/user/test/db/tb/_temporary/0
does not exist.

hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:985)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:121)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1045)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1042)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1052)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:473)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:475)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:392)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:364)
        at 
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
        at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)


When going over similar issues reported I do not see any solution to this.
This issue might be specific to V1 but V2 having its own limitation we are
stuck with V1.

Was thinking if we can configure the temporary directory to be application
specific but does not look like. ( The mapred.outpu.dir or
mapred mapreduce.output.fileoutputformat.outputdir does not seemed to be
used). Why can't we have an app id in the _temporary name to stop
interfering with other application or have _temporary configurable.

Any suggestions.

Regards
Bimal

Reply via email to