Thats a pretty old version of Spark SQL. It is devoid of all the improvements introduced in the last few releases.
You should try bumping your spark.sql.shuffle.partitions to a value higher than default (5x or 10x). Also increase your shuffle memory fraction as you really are not explicitly caching anything. You could simply swap the fractions in your case. Regards Sab On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph <prabhujose.ga...@gmail.com> wrote: > It is a Spark-SQL and the version used is Spark-1.2.1. > > On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan < > sabarish.sasidha...@manthan.com> wrote: > >> I believe the OP is using Spark SQL and not Hive on Spark. >> >> Regards >> Sab >> >> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> I think the only version of Spark that works OK with Hive (Hive on Spark >>> engine) is version 1.3.1. I also get OOM from time to time and have to >>> revert using MR >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 14 March 2016 at 08:06, Sabarish Sasidharan < >>> sabarish.sasidha...@manthan.com> wrote: >>> >>>> Which version of Spark are you using? The configuration varies by >>>> version. >>>> >>>> Regards >>>> Sab >>>> >>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph < >>>> prabhujose.ga...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> A Hive Join query which runs fine and faster in MapReduce takes lot of >>>>> time with Spark and finally fails with OOM. >>>>> >>>>> *Query: hivejoin.py* >>>>> >>>>> from pyspark import SparkContext, SparkConf >>>>> from pyspark.sql import HiveContext >>>>> conf = SparkConf().setAppName("Hive_Join") >>>>> sc = SparkContext(conf=conf) >>>>> hiveCtx = HiveContext(sc) >>>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a >>>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id = >>>>> a.instance_id"); >>>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect() >>>>> print results >>>>> >>>>> >>>>> *Data Study:* >>>>> >>>>> Number of Rows: >>>>> >>>>> A table has 1002093508 >>>>> B table has 5371668 >>>>> C table has 1000 >>>>> >>>>> No Data Skewness: >>>>> >>>>> item_id in B is unique and A has multiple rows with same item_id, so >>>>> after first INNER_JOIN the result set is same 1002093508 rows >>>>> >>>>> instance_id in C is unique and A has multiple rows with same >>>>> instance_id (maximum count of number of rows with same instance_id is 250) >>>>> >>>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN >>>>> has allotted all the requested resource immediately and no other job is >>>>> running on the >>>>> cluster. >>>>> >>>>> spark.storage.memoryFraction 0.6 >>>>> spark.shuffle.memoryFraction 0.2 >>>>> >>>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle >>>>> write 500GB of intermediate data >>>>> >>>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and >>>>> output of 400GB is shuffled >>>>> >>>>> Stage 4 - tasks fails with OOM on reading the shuffled output data >>>>> when it reached 40GB data itself >>>>> >>>>> First of all, what kind of Hive queries when run on Spark gets a >>>>> better performance than Mapreduce. And what are the hive queries that >>>>> won't >>>>> perform >>>>> well in Spark. >>>>> >>>>> How to calculate the optimal Heap for Executor Memory and the number >>>>> of executors for given input data size. We don't specify Spark Executors >>>>> to >>>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4 >>>>> is failing immediately >>>>> when it has just read 40GB data, is it caching data in Memory. >>>>> >>>>> And in a Spark job, some stage will need lot of memory for shuffle and >>>>> some need lot of memory for cache. So, when a Spark Executor has lot of >>>>> memory available >>>>> for cache and does not use the cache but when there is a need to do >>>>> lot of shuffle, will executors only use the shuffle fraction which is set >>>>> for doing shuffle or will it use >>>>> the free memory available for cache as well. >>>>> >>>>> >>>>> Thanks, >>>>> Prabhu Joseph >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> Architect - Big Data >>>> Ph: +91 99805 99458 >>>> >>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and >>>> Sullivan India ICT)* >>>> +++ >>>> >>> >>> >> >> >> -- >> >> Architect - Big Data >> Ph: +91 99805 99458 >> >> Manthan Systems | *Company of the year - Analytics (2014 Frost and >> Sullivan India ICT)* >> +++ >> > >