Re: A DataFrame cache bug

2017-02-21 Thread gen tang
Hi Kazuaki Ishizaki Thanks a lot for your help. It works. However, a more strange bug appears as follows: import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession def f(path: String, spark: SparkSession): DataFrame = { val data = spark.read.option("mergeSchema", "true").p

Re: A DataFrame cache bug

2017-02-21 Thread Kazuaki Ishizaki
Hi, Thank you for pointing out the JIRA. I think that this JIRA suggests you to insert "spark.catalog.refreshByPath(dir)". val dir = "/tmp/test" spark.range(100).write.mode("overwrite").parquet(dir) val df = spark.read.parquet(dir) df.count // output 100 which is correct f(df).count // output 89

Re: A DataFrame cache bug

2017-02-21 Thread gen tang
Hi All, I might find a related issue on jira: https://issues.apache.org/jira/browse/SPARK-15678 This issue is closed, may be we should reopen it. Thanks Cheers Gen On Wed, Feb 22, 2017 at 1:57 PM, gen tang wrote: > Hi All, > > I found a strange bug which is related with reading data from a

A DataFrame cache bug

2017-02-21 Thread gen tang
Hi All, I found a strange bug which is related with reading data from a updated path and cache operation. Please consider the following code: import org.apache.spark.sql.DataFrame def f(data: DataFrame): DataFrame = { val df = data.filter("id>10") df.cache df.count df } f(spark.range(10

Re: Output Committers for S3

2017-02-21 Thread Ryan Blue
Does S3Guard help with this? I thought it was like S3mper and could help detect eventual consistency problems, but wouldn't help with the committer problem. rb On Tue, Feb 21, 2017 at 12:39 PM, Matthew Schauer wrote: > Thanks for the repo, Ryan! I had heard that Netflix had a committer that > u

Re: Output Committers for S3

2017-02-21 Thread Ryan Blue
On Tue, Feb 21, 2017 at 6:15 AM, Steve Loughran wrote: > On 21 Feb 2017, at 01:00, Ryan Blue wrote: > > You'd have to encode the task ID in the output file name to identify files > > to roll back in the event you need to revert a task, but if you have > > partitioned output, you have to do a lo

Re: Output Committers for S3

2017-02-21 Thread Matthew Schauer
Thanks for the repo, Ryan! I had heard that Netflix had a committer that used the local filesystem as a temporary store, but I wasn't able to find that anywhere until now. I implemented something similar that writes to HDFS and then copies to S3, but it doesn't use the multipart upload API, so I'

Re: Output Committers for S3

2017-02-21 Thread Steve Loughran
On 21 Feb 2017, at 14:15, Steve Loughran mailto:ste...@hortonworks.com>> wrote: What your patch has made me realise is that I could also do a delayed-commit copy by reading in a file, doing a multipart put to its final destination, and again, postponing the final commit. this is something whic

Re: Output Committers for S3

2017-02-21 Thread Steve Loughran
On 21 Feb 2017, at 01:00, Ryan Blue mailto:rb...@netflix.com.INVALID>> wrote: We just wrote a couple new committers for S3 that we're beginning to roll out to our Spark users. I've uploaded a repo with it if you'd like to take a look: https://github.com/rdblue/s3committer The main problem w

Re: Output Committers for S3

2017-02-21 Thread Steve Loughran
On 20 Feb 2017, at 18:14, Matthew Schauer mailto:matthew.scha...@ibm.com>> wrote: I'm using Spark 1.5.2 and trying to append a data frame to partitioned Parquet directory in S3. It is known that the default `ParquetOutputCommitter` performs poorly in S3 because move is implemented as copy/delet