The issue is not almost solved even in newer Spark.
On Wed, Feb 10, 2016 at 1:36 AM, Cesar Flores <ces...@gmail.com> wrote: > Well, actually I am observing a single partition no matter what my input > is. I am using spark 1.3.1. > > For what you both are saying, it appears that this sorting issue (going to > a single partition after applying orderBy in a DF) is solved in later > version of Spark? Well, if that is the case, I guess I just need to wait > until my workplace decides to update. > > > Thanks a lot > > On Tue, Feb 9, 2016 at 9:39 AM, Takeshi Yamamuro <linguin....@gmail.com> > wrote: > >> Hi, >> >> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of >> `HashPartitioning`. >> `RangePartitioning` roughly samples input data and internally computes >> partition bounds >> to split given rows into `spark.sql.shuffle.partitions` partitions. >> Therefore, when sort keys are highly skewed, I think some partitions >> could end up being empty >> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions` >> . >> >> >> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <hemant9...@gmail.com> >> wrote: >> >>> For sql shuffle operations like groupby, the number of output partitions >>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does >>> not honour this. >>> >>> In my small test, I could see that the number of partitions in DF >>> returned by orderBy was equal to the total number of distinct keys. Are you >>> observing the same, I mean do you have a single value for all rows in the >>> column on which you are running orderBy? If yes, you are better off not >>> running the orderBy clause. >>> >>> May be someone from spark sql team could answer that how should the >>> partitioning of the output DF be handled when doing an orderBy? >>> >>> Hemant >>> www.snappydata.io >>> https://github.com/SnappyDataInc/snappydata >>> >>> >>> >>> >>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ces...@gmail.com> wrote: >>> >>>> >>>> I have a data frame which I sort using orderBy function. This operation >>>> causes my data frame to go to a single partition. After using those >>>> results, I would like to re-partition to a larger number of partitions. >>>> Currently I am just doing: >>>> >>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single >>>> partition and around 14 million records >>>> val newDF = hc.createDataFrame(rdd, df.schema) >>>> >>>> This process is really slow. Is there any other way of achieving this >>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)? >>>> >>>> >>>> Thanks a lot >>>> -- >>>> Cesar Flores >>>> >>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > > > -- > Cesar Flores > -- --- Takeshi Yamamuro