Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
other people might disagree, but i have had better luck with a model that looks more like traditional map-red if you use spark for disk-to-disk computations: more cores per executor (and so less RAM per core/task). so i would suggest trying --executor-cores 4 and adjust numPartitions accordingly.

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Regarding # of executors. I get 342 executors in parallel each time and i set executor-cores to 1. Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions while running blockJoin. Is this correct. And is my assumptions on replication levels correct. Did you get a chance to look a

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I am unable to run my application or sample application with prebuilt spark 1.4 and wit this custom 1.4. In both cases i get this error 15/06/28 15:30:07 WARN ipc.Client: Exception encountered while connecting to the server : java.lang.IllegalArgumentException: Server has invalid Kerberos principa

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
regarding your calculation of executors... RAM in executor is not really comparable to size on disk. if you read from from file and write to file you do not have to set storage level. in the join or blockJoin specify number of partitions as a multiple (say 2 times) of number of cores available t

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
My code: val viEvents = details.filter(_.get(14).asInstanceOf[Long] != NULL_VALUE).map { vi => (vi.get(14).asInstanceOf[Long], vi) } //AVRO (150G) val lstgItem = DataUtil.getDwLstgItem(sc, DateUtil.addDaysToDate(startDate, -89)).filter(_.getItemId().toLong != NULL_VALUE).map { lstg => (ls

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Could you please suggest and help me understand further. This is the actual sizes -sh-4.1$ hadoop fs -count dw_lstg_item 1 764 2041084436189 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00 *This is not skewed there is exactly one etntry for each item but its 2TB* So should i

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
specify numPartitions or partitioner for operations that shuffle. so use: def join[W](other: RDD[(K, W)], numPartitions: Int) or def blockJoin[W]( other: JavaPairRDD[K, W], leftReplication: Int, rightReplication: Int, partitioner: Partitioner) for example: left.blockJoin(right, 3, 1, new

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
a blockJoin spreads out one side while replicating the other. i would suggest replicating the smaller side. so if lstgItem is smaller try 3,1 or else 1,3. this should spread the "fat" keys out over multiple (3) executors... On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > I am able to us

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
You mentioned storage levels must be (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), how do i specify that ? On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > I am able to use blockjoin API and it does not throw compilation erro

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] --

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > I ran this w/o maven options > > ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive > -Phive-thriftserver > > I got this spark-1

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > ./

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
./make-distribution.sh --tgz --*mvn* "-Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package" or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package" ​Both fail with +

Re: Join highly skewed datasets

2015-06-28 Thread Ted Yu
maven command needs to be passed through --mvn option. Cheers On Sun, Jun 28, 2015 at 12:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > Running this now > > ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 > -Phive -Phive-thriftserver -DskipTests clean package > > > Waiting for it to co

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Running this now ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Waiting for it to complete. There is no progress after initial log messages //LOGS $ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Dhadoop.versi

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I just did that, where can i find that "spark-1.4.0-bin-hadoop2.4.tgz" file ? On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu wrote: > You can use the following command to build Spark after applying the pull > request: > > mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package > > > Cheers > > > On S

Re: Join highly skewed datasets

2015-06-28 Thread Ted Yu
You can use the following command to build Spark after applying the pull request: mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package Cheers On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > I see that block support did not make it to spark 1.4 release. > > Can you share instruct

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I see that block support did not make it to spark 1.4 release. Can you share instructions of building spark with this support for hadoop 2.4.x distribution. appreciate. On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > This is nice. Which version of Spark has this support ? Or do I need

Re: Join highly skewed datasets

2015-06-26 Thread ๏̯͡๏
This is nice. Which version of Spark has this support ? Or do I need to build it. I have never built Spark from git, please share instructions for Hadoop 2.4.x YARN. I am struggling a lot to get a join work between 200G and 2TB datasets. I am constantly getting this exception 1000s of executors a

Re: Join highly skewed datasets

2015-06-26 Thread Koert Kuipers
we went through a similar process, switching from scalding (where everything just works on large datasets) to spark (where it does not). spark can be made to work on very large datasets, it just requires a little more effort. pay attention to your storage levels (should be memory-and-disk or disk-

Re: Join highly skewed datasets

2015-06-26 Thread ๏̯͡๏
Not far at all. On large data sets everything simply fails with Spark. Worst is am not able to figure out the reason of failure, the logs run into millions of lines and i do not know the keywords to search for failure reason On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf wrote: > How far did you g

Re: Join highly skewed datasets

2015-06-15 Thread Night Wolf
How far did you get? On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > We use Scoobi + MR to perform joins and we particularly use blockJoin() > API of scoobi > > > /** Perform an equijoin with another distributed list where this list is > considerably smaller > * than the right (but too la