You need to say .mode("append") if you want to append to existing data.

On Tue, Dec 22, 2015 at 6:48 AM, Yash Sharma <yash...@gmail.com> wrote:

> Well you are right.  Having a quick glance at the source[1] I see that the
> path creation does not consider the partitions.
>
> It tries to create the path before looking for partitions columns.
>
> Not sure what would be the best way to incorporate it. Probably you can
> file a jira and experienced contributors can share their thoughts.
>
> 1.
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
>
> Line- 131
>
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 7:48 PM, "Jan Holmberg" <jan.holmb...@perigeum.fi> wrote:
>
>> In my example directories were distinct.
>>
>> So If I would like to have to distinct directories ex.
>>
>> /tmp/data/year=2012
>> /tmp/data/year=2013
>>
>> It does not work with
>> val df = Seq((2012, "Batman")).toDF("year","title")
>>
>> df.write.partitionBy("year").avro("/tmp/data")
>>
>> val df2 = Seq((2013, "Batman")).toDF("year","title")
>>
>> df2.write.partitionBy("year").avro("/tmp/data")
>>
>>
>> As you can see, it complains about the target directory (/tmp/data) and
>> not about the partitioning keys.
>>
>>
>> org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data
>> already exists.;
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>>
>>
>>
>> On 22 Dec 2015, at 15:44, Yash Sharma <yash...@gmail.com> wrote:
>>
>> Well this will indeed hit the error if the next run has similar year and
>> months and writing would not be possible.
>>
>> You can try working around by introducing a runCount in partition or in
>> the output path.
>>
>> Something like-
>>
>> /tmp/data/year/month/01
>> /tmp/data/year/month/02
>>
>> Or,
>> /tmp/data/01/year/month
>> /tmp/data/02/year/month
>>
>> This is a work around.
>>
>> Am sure other better approaches would follow.
>>
>> - Thanks, via mobile,  excuse brevity.
>> On Dec 22, 2015 7:01 PM, "Jan Holmberg" <jan.holmb...@perigeum.fi> wrote:
>>
>>> Hi Yash,
>>>
>>> the error is caused by the fact that first run creates the base
>>> directory ie. "/tmp/data" and the second batch stumbles to the existing
>>> base directory. I understand that the existing base directory is a
>>> challenge but I do not understand how to make this work with streaming
>>> example where each batch would create a new distinct directory.
>>>
>>> Granularity has no impact. No matter how data is partitioned, second
>>> 'batch' always fails with existing base dir.
>>>
>>> scala> df2.write.partitionBy("year").avro("/tmp/data")
>>> org.apache.spark.sql.AnalysisException: path
>>> hdfs://nameservice1/tmp/data already exists.;
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
>>> at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
>>> at
>>> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
>>> at
>>> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
>>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
>>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
>>>
>>>
>>> On 22 Dec 2015, at 14:06, Yash Sharma <yash...@gmail.com> wrote:
>>>
>>> Hi Jan,
>>> Is the error because a past run of the job has already written to the
>>> location?
>>>
>>> In that case you can add more granularity with 'time' along with year
>>> and month. That should give you a distinct path for every run.
>>>
>>> Let us know if it helps or if i missed anything.
>>>
>>> Goodluck
>>>
>>> - Thanks, via mobile,  excuse brevity.
>>> On Dec 22, 2015 2:31 PM, "Jan Holmberg" <jan.holmb...@perigeum.fi>
>>> wrote:
>>>
>>>> Hi,
>>>> I'm stuck with writing partitioned data to hdfs. Example below ends up
>>>> with 'already exists' -error.
>>>>
>>>> I'm wondering how to handle streaming use case.
>>>>
>>>> What is the intended way to write streaming data to hdfs? What am I
>>>> missing?
>>>>
>>>> cheers,
>>>> -jan
>>>>
>>>>
>>>> import com.databricks.spark.avro._
>>>>
>>>> import org.apache.spark.sql.SQLContext
>>>>
>>>> val sqlContext = new SQLContext(sc)
>>>>
>>>> import sqlContext.implicits._
>>>>
>>>> val df = Seq(
>>>> (2012, 8, "Batman", 9.8),
>>>> (2012, 8, "Hero", 8.7),
>>>> (2012, 7, "Robot", 5.5),
>>>> (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")
>>>>
>>>> df.write.partitionBy("year", "month").avro("/tmp/data")
>>>>
>>>> val df2 = Seq(
>>>> (2012, 10, "Batman", 9.8),
>>>> (2012, 10, "Hero", 8.7),
>>>> (2012, 9, "Robot", 5.5),
>>>> (2011, 9, "Git", 2.0)).toDF("year", "month", "title", "rating")
>>>>
>>>> df2.write.partitionBy("year", "month").avro("/tmp/data")
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>

Reply via email to