Hi, Shao & Pendey After repartition and sort within the partition, the application running on Spark is now faster than on MR. I will try to run it on a much larger dataset for benchmark. Thanks again for the guidance.
周千昊 <z.qian...@gmail.com>于2015年9月11日周五 下午1:35写道: > Hi, Shao & Pendey > Thanks for tips. I will try to workaround this. > > Saisai Shao <sai.sai.s...@gmail.com>于2015年9月11日周五 下午1:23写道: > >> Hi Qianhao, >> >> I think you could sort the data by yourself if you want achieve the same >> result as MR, like rdd.reduceByKey(...).mapPartitions(// sort within each >> partition). Do not call sortByKey again since it will introduce another >> shuffle (that's the reason why it is slower than MR). >> >> The problem and difficulty is that you have to achieve external sort >> yourself, since memory may not be enough to hold the whole partition. >> >> Spark's shuffle is different from MR, which does not have key ordering >> restriction. So the scenarios like what you mentioned is not so easy to >> address. SPARK-2926 tries to solve the scenario like yours, but it is not >> merged yet, you have to find a workaround in application level. >> >> Thanks >> Jerry >> >> >> >> On Fri, Sep 11, 2015 at 10:42 AM, Raghavendra Pandey < >> raghavendra.pan...@gmail.com> wrote: >> >>> In mr jobs, the output is sorted only within reducer.. That can be >>> better emulated by sorting each partition of rdd rather than total sorting >>> the rdd.. >>> In Rdd.mapPartition you can sort the data in one partition and try... >>> On Sep 11, 2015 7:36 AM, "周千昊" <z.qian...@gmail.com> wrote: >>> >>>> Hi, all >>>> Can anyone give some tips about this issue? >>>> >>>> 周千昊 <qhz...@apache.org>于2015年9月8日周二 下午4:46写道: >>>> >>>>> Hi, community >>>>> I have an application which I try to migrate from MR to Spark. >>>>> It will do some calculations from Hive and output to hfile which >>>>> will be bulk load to HBase Table, details as follow: >>>>> >>>>> Rdd<Element> input = getSourceInputFromHive() >>>>> Rdd<Tuple2<byte[], byte[]>> mapSideResult = >>>>> input.glom().mapPartitions(/*some calculation*/) >>>>> // PS: the result in each partition has already been sorted >>>>> according to the lexicographical order during the calculation >>>>> mapSideResult.reduceByKey(/*some >>>>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2<byte[], byte[]> to >>>>> Tuple2<ImmutableBytesWritable, KeyValue>*/).saveAsNewAPIHadoopFile(/*write >>>>> to hfile*/) >>>>> >>>>> *Here is the problem, as in MR, in the reducer side, the mapper >>>>> output has already been sorted, so that it is a merge sort which makes >>>>> writing to hfile is sequential and fast.* >>>>> * However in Spark, the output of reduceByKey phase has been >>>>> shuffled, so I have to sort the rdd in order to write hfile which makes it >>>>> slower 2x running on Spark than on MR.* >>>>> * I am wondering that, if there is anything I can leverage has >>>>> the same effect as MR. I happen to see a JIRA >>>>> ticket https://issues.apache.org/jira/browse/SPARK-2926 >>>>> <https://issues.apache.org/jira/browse/SPARK-2926>. Is it related to what >>>>> I >>>>> am looking for?* >>>>> >>>> -- >>>> Best Regard >>>> ZhouQianhao >>>> >>> >> -- > Best Regard > ZhouQianhao > -- Best Regard ZhouQianhao