You need to say .mode("append") if you want to append to existing data.
On Tue, Dec 22, 2015 at 6:48 AM, Yash Sharma <yash...@gmail.com> wrote: > Well you are right. Having a quick glance at the source[1] I see that the > path creation does not consider the partitions. > > It tries to create the path before looking for partitions columns. > > Not sure what would be the best way to incorporate it. Probably you can > file a jira and experienced contributors can share their thoughts. > > 1. > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala > > Line- 131 > > - Thanks, via mobile, excuse brevity. > On Dec 22, 2015 7:48 PM, "Jan Holmberg" <jan.holmb...@perigeum.fi> wrote: > >> In my example directories were distinct. >> >> So If I would like to have to distinct directories ex. >> >> /tmp/data/year=2012 >> /tmp/data/year=2013 >> >> It does not work with >> val df = Seq((2012, "Batman")).toDF("year","title") >> >> df.write.partitionBy("year").avro("/tmp/data") >> >> val df2 = Seq((2013, "Batman")).toDF("year","title") >> >> df2.write.partitionBy("year").avro("/tmp/data") >> >> >> As you can see, it complains about the target directory (/tmp/data) and >> not about the partitioning keys. >> >> >> org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data >> already exists.; >> at >> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76) >> at >> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) >> at >> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) >> at >> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) >> >> >> >> On 22 Dec 2015, at 15:44, Yash Sharma <yash...@gmail.com> wrote: >> >> Well this will indeed hit the error if the next run has similar year and >> months and writing would not be possible. >> >> You can try working around by introducing a runCount in partition or in >> the output path. >> >> Something like- >> >> /tmp/data/year/month/01 >> /tmp/data/year/month/02 >> >> Or, >> /tmp/data/01/year/month >> /tmp/data/02/year/month >> >> This is a work around. >> >> Am sure other better approaches would follow. >> >> - Thanks, via mobile, excuse brevity. >> On Dec 22, 2015 7:01 PM, "Jan Holmberg" <jan.holmb...@perigeum.fi> wrote: >> >>> Hi Yash, >>> >>> the error is caused by the fact that first run creates the base >>> directory ie. "/tmp/data" and the second batch stumbles to the existing >>> base directory. I understand that the existing base directory is a >>> challenge but I do not understand how to make this work with streaming >>> example where each batch would create a new distinct directory. >>> >>> Granularity has no impact. No matter how data is partitioned, second >>> 'batch' always fails with existing base dir. >>> >>> scala> df2.write.partitionBy("year").avro("/tmp/data") >>> org.apache.spark.sql.AnalysisException: path >>> hdfs://nameservice1/tmp/data already exists.; >>> at >>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) >>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) >>> at >>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) >>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) >>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) >>> at >>> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37) >>> at >>> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37) >>> at >>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33) >>> at >>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38) >>> >>> >>> On 22 Dec 2015, at 14:06, Yash Sharma <yash...@gmail.com> wrote: >>> >>> Hi Jan, >>> Is the error because a past run of the job has already written to the >>> location? >>> >>> In that case you can add more granularity with 'time' along with year >>> and month. That should give you a distinct path for every run. >>> >>> Let us know if it helps or if i missed anything. >>> >>> Goodluck >>> >>> - Thanks, via mobile, excuse brevity. >>> On Dec 22, 2015 2:31 PM, "Jan Holmberg" <jan.holmb...@perigeum.fi> >>> wrote: >>> >>>> Hi, >>>> I'm stuck with writing partitioned data to hdfs. Example below ends up >>>> with 'already exists' -error. >>>> >>>> I'm wondering how to handle streaming use case. >>>> >>>> What is the intended way to write streaming data to hdfs? What am I >>>> missing? >>>> >>>> cheers, >>>> -jan >>>> >>>> >>>> import com.databricks.spark.avro._ >>>> >>>> import org.apache.spark.sql.SQLContext >>>> >>>> val sqlContext = new SQLContext(sc) >>>> >>>> import sqlContext.implicits._ >>>> >>>> val df = Seq( >>>> (2012, 8, "Batman", 9.8), >>>> (2012, 8, "Hero", 8.7), >>>> (2012, 7, "Robot", 5.5), >>>> (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating") >>>> >>>> df.write.partitionBy("year", "month").avro("/tmp/data") >>>> >>>> val df2 = Seq( >>>> (2012, 10, "Batman", 9.8), >>>> (2012, 10, "Hero", 8.7), >>>> (2012, 9, "Robot", 5.5), >>>> (2011, 9, "Git", 2.0)).toDF("year", "month", "title", "rating") >>>> >>>> df2.write.partitionBy("year", "month").avro("/tmp/data") >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >>