The issue is the query hits OOM on a Stage when reading Shuffle Output from previous stage.How come increasing shuffle memory helps to avoid OOM.
On Mon, Mar 14, 2016 at 2:28 PM, Sabarish Sasidharan <sabarish....@gmail.com > wrote: > Thats a pretty old version of Spark SQL. It is devoid of all the > improvements introduced in the last few releases. > > You should try bumping your spark.sql.shuffle.partitions to a value > higher than default (5x or 10x). Also increase your shuffle memory fraction > as you really are not explicitly caching anything. You could simply swap > the fractions in your case. > > Regards > Sab > > On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph <prabhujose.ga...@gmail.com > > wrote: > >> It is a Spark-SQL and the version used is Spark-1.2.1. >> >> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan < >> sabarish.sasidha...@manthan.com> wrote: >> >>> I believe the OP is using Spark SQL and not Hive on Spark. >>> >>> Regards >>> Sab >>> >>> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> I think the only version of Spark that works OK with Hive (Hive on >>>> Spark engine) is version 1.3.1. I also get OOM from time to time and have >>>> to revert using MR >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 14 March 2016 at 08:06, Sabarish Sasidharan < >>>> sabarish.sasidha...@manthan.com> wrote: >>>> >>>>> Which version of Spark are you using? The configuration varies by >>>>> version. >>>>> >>>>> Regards >>>>> Sab >>>>> >>>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph < >>>>> prabhujose.ga...@gmail.com> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> A Hive Join query which runs fine and faster in MapReduce takes lot >>>>>> of time with Spark and finally fails with OOM. >>>>>> >>>>>> *Query: hivejoin.py* >>>>>> >>>>>> from pyspark import SparkContext, SparkConf >>>>>> from pyspark.sql import HiveContext >>>>>> conf = SparkConf().setAppName("Hive_Join") >>>>>> sc = SparkContext(conf=conf) >>>>>> hiveCtx = HiveContext(sc) >>>>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a >>>>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id = >>>>>> a.instance_id"); >>>>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect() >>>>>> print results >>>>>> >>>>>> >>>>>> *Data Study:* >>>>>> >>>>>> Number of Rows: >>>>>> >>>>>> A table has 1002093508 >>>>>> B table has 5371668 >>>>>> C table has 1000 >>>>>> >>>>>> No Data Skewness: >>>>>> >>>>>> item_id in B is unique and A has multiple rows with same item_id, so >>>>>> after first INNER_JOIN the result set is same 1002093508 rows >>>>>> >>>>>> instance_id in C is unique and A has multiple rows with same >>>>>> instance_id (maximum count of number of rows with same instance_id is >>>>>> 250) >>>>>> >>>>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. >>>>>> YARN has allotted all the requested resource immediately and no other job >>>>>> is running on the >>>>>> cluster. >>>>>> >>>>>> spark.storage.memoryFraction 0.6 >>>>>> spark.shuffle.memoryFraction 0.2 >>>>>> >>>>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle >>>>>> write 500GB of intermediate data >>>>>> >>>>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL >>>>>> and output of 400GB is shuffled >>>>>> >>>>>> Stage 4 - tasks fails with OOM on reading the shuffled output data >>>>>> when it reached 40GB data itself >>>>>> >>>>>> First of all, what kind of Hive queries when run on Spark gets a >>>>>> better performance than Mapreduce. And what are the hive queries that >>>>>> won't >>>>>> perform >>>>>> well in Spark. >>>>>> >>>>>> How to calculate the optimal Heap for Executor Memory and the number >>>>>> of executors for given input data size. We don't specify Spark Executors >>>>>> to >>>>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage >>>>>> 4 >>>>>> is failing immediately >>>>>> when it has just read 40GB data, is it caching data in Memory. >>>>>> >>>>>> And in a Spark job, some stage will need lot of memory for shuffle >>>>>> and some need lot of memory for cache. So, when a Spark Executor has lot >>>>>> of >>>>>> memory available >>>>>> for cache and does not use the cache but when there is a need to do >>>>>> lot of shuffle, will executors only use the shuffle fraction which is set >>>>>> for doing shuffle or will it use >>>>>> the free memory available for cache as well. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Prabhu Joseph >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Architect - Big Data >>>>> Ph: +91 99805 99458 >>>>> >>>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and >>>>> Sullivan India ICT)* >>>>> +++ >>>>> >>>> >>>> >>> >>> >>> -- >>> >>> Architect - Big Data >>> Ph: +91 99805 99458 >>> >>> Manthan Systems | *Company of the year - Analytics (2014 Frost and >>> Sullivan India ICT)* >>> +++ >>> >> >> >