Hi, Shao & Pendey Thanks for tips. I will try to workaround this.
Saisai Shao <sai.sai.s...@gmail.com>于2015年9月11日周五 下午1:23写道: > Hi Qianhao, > > I think you could sort the data by yourself if you want achieve the same > result as MR, like rdd.reduceByKey(...).mapPartitions(// sort within each > partition). Do not call sortByKey again since it will introduce another > shuffle (that's the reason why it is slower than MR). > > The problem and difficulty is that you have to achieve external sort > yourself, since memory may not be enough to hold the whole partition. > > Spark's shuffle is different from MR, which does not have key ordering > restriction. So the scenarios like what you mentioned is not so easy to > address. SPARK-2926 tries to solve the scenario like yours, but it is not > merged yet, you have to find a workaround in application level. > > Thanks > Jerry > > > > On Fri, Sep 11, 2015 at 10:42 AM, Raghavendra Pandey < > raghavendra.pan...@gmail.com> wrote: > >> In mr jobs, the output is sorted only within reducer.. That can be better >> emulated by sorting each partition of rdd rather than total sorting the >> rdd.. >> In Rdd.mapPartition you can sort the data in one partition and try... >> On Sep 11, 2015 7:36 AM, "周千昊" <z.qian...@gmail.com> wrote: >> >>> Hi, all >>> Can anyone give some tips about this issue? >>> >>> 周千昊 <qhz...@apache.org>于2015年9月8日周二 下午4:46写道: >>> >>>> Hi, community >>>> I have an application which I try to migrate from MR to Spark. >>>> It will do some calculations from Hive and output to hfile which >>>> will be bulk load to HBase Table, details as follow: >>>> >>>> Rdd<Element> input = getSourceInputFromHive() >>>> Rdd<Tuple2<byte[], byte[]>> mapSideResult = >>>> input.glom().mapPartitions(/*some calculation*/) >>>> // PS: the result in each partition has already been sorted >>>> according to the lexicographical order during the calculation >>>> mapSideResult.reduceByKey(/*some >>>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2<byte[], byte[]> to >>>> Tuple2<ImmutableBytesWritable, KeyValue>*/).saveAsNewAPIHadoopFile(/*write >>>> to hfile*/) >>>> >>>> *Here is the problem, as in MR, in the reducer side, the mapper >>>> output has already been sorted, so that it is a merge sort which makes >>>> writing to hfile is sequential and fast.* >>>> * However in Spark, the output of reduceByKey phase has been >>>> shuffled, so I have to sort the rdd in order to write hfile which makes it >>>> slower 2x running on Spark than on MR.* >>>> * I am wondering that, if there is anything I can leverage has the >>>> same effect as MR. I happen to see a JIRA >>>> ticket https://issues.apache.org/jira/browse/SPARK-2926 >>>> <https://issues.apache.org/jira/browse/SPARK-2926>. Is it related to what I >>>> am looking for?* >>>> >>> -- >>> Best Regard >>> ZhouQianhao >>> >> > -- Best Regard ZhouQianhao