This is an optimization to avoid overloading the scheduler with many small tasks. It bin-packs data into tasks based on the file size.
You can disable it by setting spark.sql.files.openCostInBytes very high (higher than spark.sql.files.maxPartitionBytes). On Thu, Aug 11, 2016 at 4:27 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote: > I just took a quick look for this. It seems not parquet-specific problem > but for datasources implimenting FileFormat. > > In 1.6, it seems apparently partitions are made per file but in 2.0 > partition can hold multiple files. > > So, in your case files are miltiple but partitions are fewer, meaning each > partition is not sorted although each part-file is sorted. > > It seems this PR https://github.com/apache/spark/pull/12095 is related. > > I could not test before/after this PR because I don't have the access to > my computer for now (it's my phone) but I am sure the PR is related. > > Maybe we need an option to enable/disable this? > > I appreciate if any gives some feedback. > > Thanks! > > On 11 Aug 2016 3:23 p.m., "Jason Moore" <jason.mo...@quantium.com.au> > wrote: > >> Hi, >> >> >> >> It seems that something changed between Spark 1.6.2 and 2.0.0 that I >> wasn’t expecting. >> >> >> >> If I have a DataFrame with records sorted within each partition, and I >> write it to parquet and read back from the parquet, previously the records >> would be iterated through in the same order they were written (assuming no >> shuffle has taken place). But this doesn’t seem to be the case anymore. >> Below is the code to reproduce in a spark-shell. >> >> >> >> Was this change expected? >> >> >> >> Thanks, >> >> Jason. >> >> >> >> >> >> import org.apache.spark.sql._ >> >> def isSorted[T](self: DataFrame, mapping: Row => T)(implicit ordering: >> Ordering[T]) = { >> >> import self.sqlContext.implicits._ >> >> import ordering._ >> >> self >> >> .mapPartitions(rows => { >> >> val isSorted = rows >> >> .map(mapping) >> >> .sliding(2) // all adjacent pairs >> >> .forall { >> >> case x :: y :: Nil => x <= y >> >> case x :: Nil => true >> >> case Nil => true >> >> } >> >> >> >> Iterator(isSorted) >> >> }) >> >> .reduce(_ && _) >> >> } >> >> >> >> // in Spark 2.0.0 >> >> spark.range(100000).toDF("id").registerTempTable("input") >> >> spark.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY >> id").write.mode("overwrite").parquet("input.parquet") >> >> isSorted(spark.read.parquet("input.parquet"), _.getAs[Long]("id")) >> >> // FALSE >> >> >> >> // in Spark 1.6.2 >> >> sqlContext.range(100000).toDF("id").registerTempTable("input") >> >> sqlContext.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY >> id").write.mode("overwrite").parquet("input.parquet") >> >> isSorted(sqlContext.read.parquet("input.parquet"), _.getAs[Long]("id")) >> >> // TRUE >> >> >> >