This is an optimization to avoid overloading the scheduler with many small
tasks.  It bin-packs data into tasks based on the file size.

You can disable it by setting spark.sql.files.openCostInBytes very high
(higher than spark.sql.files.maxPartitionBytes).

On Thu, Aug 11, 2016 at 4:27 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> I just took a quick look for this. It seems not parquet-specific problem
> but for datasources implimenting FileFormat.
>
> In 1.6, it seems apparently partitions are made per file but in 2.0
> partition can hold multiple files.
>
> So, in your case files are miltiple but partitions are fewer, meaning each
> partition is not sorted although each part-file is sorted.
>
> It seems this PR https://github.com/apache/spark/pull/12095 is related.
>
> I could not test before/after this PR because I don't have the access to
> my computer for now (it's my phone) but I am sure the PR is related.
>
> Maybe we need an option to enable/disable this?
>
> I appreciate if any gives some feedback.
>
> Thanks!
>
> On 11 Aug 2016 3:23 p.m., "Jason Moore" <jason.mo...@quantium.com.au>
> wrote:
>
>> Hi,
>>
>>
>>
>> It seems that something changed between Spark 1.6.2 and 2.0.0 that I
>> wasn’t expecting.
>>
>>
>>
>> If I have a DataFrame with records sorted within each partition, and I
>> write it to parquet and read back from the parquet, previously the records
>> would be iterated through in the same order they were written (assuming no
>> shuffle has taken place).  But this doesn’t seem to be the case anymore.
>> Below is the code to reproduce in a spark-shell.
>>
>>
>>
>> Was this change expected?
>>
>>
>>
>> Thanks,
>>
>> Jason.
>>
>>
>>
>>
>>
>> import org.apache.spark.sql._
>>
>> def isSorted[T](self: DataFrame, mapping: Row => T)(implicit ordering:
>> Ordering[T]) = {
>>
>>   import self.sqlContext.implicits._
>>
>>   import ordering._
>>
>>   self
>>
>>     .mapPartitions(rows => {
>>
>>       val isSorted = rows
>>
>>         .map(mapping)
>>
>>         .sliding(2) // all adjacent pairs
>>
>>         .forall {
>>
>>           case x :: y :: Nil => x <= y
>>
>>           case x :: Nil => true
>>
>>           case Nil => true
>>
>>         }
>>
>>
>>
>>       Iterator(isSorted)
>>
>>     })
>>
>>     .reduce(_ && _)
>>
>> }
>>
>>
>>
>> // in Spark 2.0.0
>>
>> spark.range(100000).toDF("id").registerTempTable("input")
>>
>> spark.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY
>> id").write.mode("overwrite").parquet("input.parquet")
>>
>> isSorted(spark.read.parquet("input.parquet"), _.getAs[Long]("id"))
>>
>> // FALSE
>>
>>
>>
>> // in Spark 1.6.2
>>
>> sqlContext.range(100000).toDF("id").registerTempTable("input")
>>
>> sqlContext.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY
>> id").write.mode("overwrite").parquet("input.parquet")
>>
>> isSorted(sqlContext.read.parquet("input.parquet"), _.getAs[Long]("id"))
>>
>> // TRUE
>>
>>
>>
>

Reply via email to