Github user ljwagerfield commented on the pull request:
https://github.com/apache/spark/pull/8710#issuecomment-220395710
We're seeing this exception too. We're also running our operations in
serial (at least on the surface it seems as if we are). If we execute a
`df.save` operation in a `Future` and wait for that `Future` to complete, then
all `df.save` operations we perform within subsequent `Future`s will fail.
This specifically happens when we load Avro files from S3 and save them as
Parquet back to S3. The loading works fine but the saving fails on 2nd attempt.
Furthermore, if we simply generate a `DataFrame` from an in-memory list (so
we're not loading from S3 - only saving to S3) then the error goes away... I'm
not sure how helpful this is.
We're using Java 1.8, Scala 2.10.5, with our Spark codebase at commit
https://github.com/apache/spark/commit/15de51c238a7340fa81cb0b80d029a05d97bfc5c.
Our exact reproduction steps are:
**1. Run a Spark Shell with appropriate dependencies**
```
./spark-shell --packages
com.amazonaws:aws-java-sdk:1.10.75,org.apache.hadoop:hadoop-aws:2.7.2,com.databricks:spark-avro_2.10:2.0.1
```
**2. Run the following setup code within the shell**
```
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import sqlContext.implicits._
import org.apache.spark.sql._
implicit val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", "...")
hadoopConf.set("fs.s3.awsSecretAccessKey", "...")
val df =
sqlContext.read.format("com.databricks.spark.avro").load("s3://bucket/input.avro")
def doWrite() {
df.write.format("org.apache.spark.sql.parquet").mode(SaveMode.Overwrite).save("s3://bucket/output")
}
```
**3. Run this _twice_ - but leaving time for the first execution to finish
(so the operations are serialised)**
```
Future { doWrite(); println("SUCCEEDED") }.recover { case e: Throwable =>
println("FAILED: " + e.getMessage()); e.printStackTrace() }
```
**Result:**
```
spark.sql.execution.id is already set
java.lang.IllegalArgumentException: spark.sql.execution.id is already set
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at
$line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.doWrite(<console>:41)
at
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply$mcV$sp(<console>:43)
at
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:43)
at
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:43)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]