I just took a quick look for this. It seems not parquet-specific problem but for datasources implimenting FileFormat.
In 1.6, it seems apparently partitions are made per file but in 2.0 partition can hold multiple files. So, in your case files are miltiple but partitions are fewer, meaning each partition is not sorted although each part-file is sorted. It seems this PR https://github.com/apache/spark/pull/12095 is related. I could not test before/after this PR because I don't have the access to my computer for now (it's my phone) but I am sure the PR is related. Maybe we need an option to enable/disable this? I appreciate if any gives some feedback. Thanks! On 11 Aug 2016 3:23 p.m., "Jason Moore" <jason.mo...@quantium.com.au> wrote: > Hi, > > > > It seems that something changed between Spark 1.6.2 and 2.0.0 that I > wasn’t expecting. > > > > If I have a DataFrame with records sorted within each partition, and I > write it to parquet and read back from the parquet, previously the records > would be iterated through in the same order they were written (assuming no > shuffle has taken place). But this doesn’t seem to be the case anymore. > Below is the code to reproduce in a spark-shell. > > > > Was this change expected? > > > > Thanks, > > Jason. > > > > > > import org.apache.spark.sql._ > > def isSorted[T](self: DataFrame, mapping: Row => T)(implicit ordering: > Ordering[T]) = { > > import self.sqlContext.implicits._ > > import ordering._ > > self > > .mapPartitions(rows => { > > val isSorted = rows > > .map(mapping) > > .sliding(2) // all adjacent pairs > > .forall { > > case x :: y :: Nil => x <= y > > case x :: Nil => true > > case Nil => true > > } > > > > Iterator(isSorted) > > }) > > .reduce(_ && _) > > } > > > > // in Spark 2.0.0 > > spark.range(100000).toDF("id").registerTempTable("input") > > spark.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY > id").write.mode("overwrite").parquet("input.parquet") > > isSorted(spark.read.parquet("input.parquet"), _.getAs[Long]("id")) > > // FALSE > > > > // in Spark 1.6.2 > > sqlContext.range(100000).toDF("id").registerTempTable("input") > > sqlContext.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY > id").write.mode("overwrite").parquet("input.parquet") > > isSorted(sqlContext.read.parquet("input.parquet"), _.getAs[Long]("id")) > > // TRUE > > >