Hi all I am using spark 2.1 and I encounter exception when do concurrent insert on a table, Here is my scenario and some analysis
create table sample using csv options('path' '/tmp/f/') When concurrent insert are executed, we see exception like below: 2017-12-29 13:41:11,117 | ERROR | main | Aborting job null. | org.apache.spark.internal.Logging$class.logError(Logging.scala:91) org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): No lease on /tmp/f/_SUCCESS (inode 1032508): File does not exist. Holder DFSClient_NONMAPREDUCE_8638078_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3466) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3562) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3525) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:917) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:573) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:973) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2260) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2256) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1778) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2254) at org.apache.hadoop.ipc.Client.call(Client.java:1524) at org.apache.hadoop.ipc.Client.call(Client.java:1460) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy14.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:480) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:202) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy15.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:887) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:861) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:822) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:336) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2$$anonfun$apply$mcV$sp$1.apply$mcV$sp(FileFormatWriter.scala:167) at org.apache.spark.util.Utils$.proxyOperate(Utils.scala:2706) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:166) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:125) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:125) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600) at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24) at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29) at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31) at $line48.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33) at $line48.$read$$iw$$iw$$iw$$iw.<init>(<console>:35) at $line48.$read$$iw$$iw$$iw.<init>(<console>:37) at $line48.$read$$iw$$iw.<init>(<console>:39) at $line48.$read$$iw.<init>(<console>:41) at $line48.$read.<init>(<console>:43) at $line48.$read$.<init>(<console>:47) at $line48.$read$.<clinit>(<console>) at $line48.$eval$.$print$lzycompute(<console>:7) at $line48.$eval$.$print(<console>:6) at $line48.$eval.$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:69) at org.apache.spark.repl.Main$.main(Main.scala:52) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:761) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:190) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:129) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:179) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:125) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:125) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600) ... 48 elided Basic analysis: _SUCCESS file is used by map reduce framework to mark successful jobs (mapreduce.fileoutputcommitter.marksuccessfuljobs / DEFAULTS true). This is done by org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#commitJob via org.apache.spark.sql.execution.datasources.FileFormatWriter#write if (context.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { Path markerPath = new Path(this.outputPath, "_SUCCESS"); fs.create(markerPath).close(); } The _SUCCESS is created by org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter depending on the output path specified via mapreduce.output.fileoutputformat.outputdir in job configuration Lets take example of create table sample using csv options('path' '/tmp/f/') so the data files created on every insert is present in /tmp/f/ mapreduce.output.fileoutputformat.outputdir is passed by spark @ org.apache.spark.sql.execution.datasources.FileFormatWriter#write is used by committer for 2 reason 1. create part files with data under this folder ( first in _temporary folder and then move to output folder on commitJob) 2. create _SUCCESS files on job completion If 2 applications try to insert to same table concurrently, on job completion when try to commit, the _SUCCESS will result in a race condition (in our example /tmp/f/_SUCCESS close() call failed). HDFS can lease to only one HDFS client hence failing other As mentioned, the _SUCCESS is created by MapReduce code, this could be turned off by setting mapreduce.fileoutputcommitter.marksuccessfuljobs = false @ org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#setupJob by Spark _SUCCESS is only used by frameworks like OOZIE for file processing, (Refer https://books.google.co.in/books?id=HAY_CQAAQBAJ&pg=PA119&lpg=PA119&dq=Oozie+dependencies+on+_SUCCESS+file&source=bl&ots=RTr3hP0Cjj&sig=3B2yk24ebZt42SQo8O42eOX6OCI&hl=en&sa=X&ved=0ahUKEwjj053i5cDYAhVIr48KHUIGCZQQ6AEIYjAI#v=onepage&q=Oozie%20dependencies%20on%20_SUCCESS%20file&f=false) so setting mapreduce.fileoutputcommitter.marksuccessfuljobs = false at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#setupJob should be ok.? Will it have any impact as I do not see _SUCCESS being used by spark. I am new to spark so please correct me if any of the analysis is wrong :) Regards Ajith