Hi, Shao & Pendey
      After repartition and sort within the partition, the application
running on Spark is now faster than on MR. I will try to run it on a much
larger dataset for benchmark.
      Thanks again for the guidance.

周千昊 <z.qian...@gmail.com>于2015年9月11日周五 下午1:35写道:

> Hi, Shao & Pendey
>       Thanks for tips. I will try to workaround this.
>
> Saisai Shao <sai.sai.s...@gmail.com>于2015年9月11日周五 下午1:23写道:
>
>> Hi Qianhao,
>>
>> I think you could sort the data by yourself if you want achieve the same
>> result as MR, like rdd.reduceByKey(...).mapPartitions(// sort within each
>> partition).  Do not call sortByKey again since it will introduce another
>> shuffle (that's the reason why it is slower than MR).
>>
>> The problem and difficulty is that you have to achieve external sort
>> yourself, since memory may not be enough to hold the whole partition.
>>
>> Spark's shuffle is different from MR, which does not have key ordering
>> restriction. So the scenarios like what you mentioned is not so easy to
>> address. SPARK-2926 tries to solve the scenario like yours, but it is not
>> merged yet, you have to find a workaround in application level.
>>
>> Thanks
>> Jerry
>>
>>
>>
>> On Fri, Sep 11, 2015 at 10:42 AM, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> In mr jobs, the output is sorted only within reducer.. That can be
>>> better emulated by sorting each partition of rdd rather than total sorting
>>> the rdd..
>>> In Rdd.mapPartition you can sort the data in one partition and try...
>>> On Sep 11, 2015 7:36 AM, "周千昊" <z.qian...@gmail.com> wrote:
>>>
>>>> Hi, all
>>>>      Can anyone give some tips about this issue?
>>>>
>>>> 周千昊 <qhz...@apache.org>于2015年9月8日周二 下午4:46写道:
>>>>
>>>>> Hi, community
>>>>>      I have an application which I try to migrate from MR to Spark.
>>>>>      It will do some calculations from Hive and output to hfile which
>>>>> will be bulk load to HBase Table, details as follow:
>>>>>
>>>>>      Rdd<Element> input = getSourceInputFromHive()
>>>>>      Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>>>>> input.glom().mapPartitions(/*some calculation*/)
>>>>>      // PS: the result in each partition has already been sorted
>>>>> according to the lexicographical order during the calculation
>>>>>      mapSideResult.reduceByKey(/*some
>>>>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2<byte[], byte[]> to
>>>>> Tuple2<ImmutableBytesWritable, KeyValue>*/).saveAsNewAPIHadoopFile(/*write
>>>>> to hfile*/)
>>>>>
>>>>>       *Here is the problem, as in MR, in the reducer side, the mapper
>>>>> output has already been sorted, so that it is a merge sort which makes
>>>>> writing to hfile is sequential and fast.*
>>>>> *      However in Spark, the output of reduceByKey phase has been
>>>>> shuffled, so I have to sort the rdd in order to write hfile which makes it
>>>>> slower 2x running on Spark than on MR.*
>>>>> *      I am wondering that, if there is anything I can leverage has
>>>>> the same effect as MR. I happen to see a JIRA
>>>>> ticket https://issues.apache.org/jira/browse/SPARK-2926
>>>>> <https://issues.apache.org/jira/browse/SPARK-2926>. Is it related to what 
>>>>> I
>>>>> am looking for?*
>>>>>
>>>> --
>>>> Best Regard
>>>> ZhouQianhao
>>>>
>>>
>> --
> Best Regard
> ZhouQianhao
>
-- 
Best Regard
ZhouQianhao

Reply via email to