Hi, Shao & Pendey
      Thanks for tips. I will try to workaround this.

Saisai Shao <sai.sai.s...@gmail.com>于2015年9月11日周五 下午1:23写道:

> Hi Qianhao,
>
> I think you could sort the data by yourself if you want achieve the same
> result as MR, like rdd.reduceByKey(...).mapPartitions(// sort within each
> partition).  Do not call sortByKey again since it will introduce another
> shuffle (that's the reason why it is slower than MR).
>
> The problem and difficulty is that you have to achieve external sort
> yourself, since memory may not be enough to hold the whole partition.
>
> Spark's shuffle is different from MR, which does not have key ordering
> restriction. So the scenarios like what you mentioned is not so easy to
> address. SPARK-2926 tries to solve the scenario like yours, but it is not
> merged yet, you have to find a workaround in application level.
>
> Thanks
> Jerry
>
>
>
> On Fri, Sep 11, 2015 at 10:42 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> In mr jobs, the output is sorted only within reducer.. That can be better
>> emulated by sorting each partition of rdd rather than total sorting the
>> rdd..
>> In Rdd.mapPartition you can sort the data in one partition and try...
>> On Sep 11, 2015 7:36 AM, "周千昊" <z.qian...@gmail.com> wrote:
>>
>>> Hi, all
>>>      Can anyone give some tips about this issue?
>>>
>>> 周千昊 <qhz...@apache.org>于2015年9月8日周二 下午4:46写道:
>>>
>>>> Hi, community
>>>>      I have an application which I try to migrate from MR to Spark.
>>>>      It will do some calculations from Hive and output to hfile which
>>>> will be bulk load to HBase Table, details as follow:
>>>>
>>>>      Rdd<Element> input = getSourceInputFromHive()
>>>>      Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>>>> input.glom().mapPartitions(/*some calculation*/)
>>>>      // PS: the result in each partition has already been sorted
>>>> according to the lexicographical order during the calculation
>>>>      mapSideResult.reduceByKey(/*some
>>>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2<byte[], byte[]> to
>>>> Tuple2<ImmutableBytesWritable, KeyValue>*/).saveAsNewAPIHadoopFile(/*write
>>>> to hfile*/)
>>>>
>>>>       *Here is the problem, as in MR, in the reducer side, the mapper
>>>> output has already been sorted, so that it is a merge sort which makes
>>>> writing to hfile is sequential and fast.*
>>>> *      However in Spark, the output of reduceByKey phase has been
>>>> shuffled, so I have to sort the rdd in order to write hfile which makes it
>>>> slower 2x running on Spark than on MR.*
>>>> *      I am wondering that, if there is anything I can leverage has the
>>>> same effect as MR. I happen to see a JIRA
>>>> ticket https://issues.apache.org/jira/browse/SPARK-2926
>>>> <https://issues.apache.org/jira/browse/SPARK-2926>. Is it related to what I
>>>> am looking for?*
>>>>
>>> --
>>> Best Regard
>>> ZhouQianhao
>>>
>>
> --
Best Regard
ZhouQianhao

Reply via email to