Your Spark solution first reduces partial results into a single partition, computes the final result, and then collects to the driver side. This involves a shuffle and two waves of network traffic. Instead, you can directly collect partial results to the driver and then computes the final results on driver side:
val data = sc.textFile(...).map(line => line.split(",").map(_.toDouble))val partialResults = data.mapPartitions(points => skyline(points.toArray).iterator).collect()val results = skyline(partialResults) On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen <yanzhe...@gmail.com> wrote: Hi all, > > As a previous thread, I am asking how to implement a divide-and-conquer > algorithm (skyline) in Spark. > Here is my current solution: > > val data = sc.textFile(…).map(line => line.split(“,”).map(_.toDouble)) > > val result = data.mapPartitions(points => > *skyline*(points.toArray).iterator).coalesce(1, > true) > .mapPartitions(points => *skyline* > (points.toArray).iterator).collect() > > where skyline is a local algorithm to compute the results: > > def *skyline*(points: Array[Point]) : Array[Point] > > Basically, I find this implement runs slower than the corresponding Hadoop > version (the identity map phase plus local skyline for both combine and > reduce phases). > > Below are my questions: > > 1. Why this implementation is much slower than the Hadoop one? > > I can find two possible reasons: one is the shuffle overhead in coalesce, > another is calling the toArray and iterator repeatedly when invoking > local skyline algorithm. But I am not sure which one is true. > I haven’t seen your Hadoop version. But if this assumption is right, the above version should help. > 2. One observation is that while Hadoop version almost used up all the CPU > resources during execution, the CPU seems not that hot on Spark. Is that a > clue to prove that the shuffling might be the real bottleneck? > How many parallel tasks are there when running your Spark code? I doubt tasks are queued and run sequentially. > 3. Is there any difference between coalesce(1, true) and reparation? It > seems that both opeartions need shuffling data. What’s the proper > situations using the coalesce method? > repartition(n) is just an alias of coalesce(n, true), so yes, they both involve data shuffling. coalesce can be used to shrink partition number when dataset size shrinks dramatically after operations like filter. Say you have an RDD containing 1TB of data with 100 partitions, after a .filter(...) call, only 20GB data left, then you may want to coalesce to 2 partitions rather than 100. > 4. More generally, I am trying to implementing some core geometry > computation operators on Spark (like skyline, convex hull etc). In my > understanding, since Spark is more capable of handling iterative > computations on dataset, the above solution apparently doesn’t exploit what > Spark is good at. Any comments on how to do geometry computations on Spark > (if it is possible) ? > Although Spark is good at iterative algorithms, it also performs better in batch computing due to much lower scheduling overhead and thread level parallelism. Theoretically, you can always accelerate your MapReduce job by rewriting it in Spark. > Thanks for any insight. > > Yanzhe > >