+1 to upgrading Spark. 1.2.1 has non of the memory management improvements that were added in 1.4-1.6.
On Mon, Mar 14, 2016 at 2:03 AM, Prabhu Joseph <prabhujose.ga...@gmail.com> wrote: > The issue is the query hits OOM on a Stage when reading Shuffle Output > from previous stage.How come increasing shuffle memory helps to avoid OOM. > > On Mon, Mar 14, 2016 at 2:28 PM, Sabarish Sasidharan < > sabarish....@gmail.com> wrote: > >> Thats a pretty old version of Spark SQL. It is devoid of all the >> improvements introduced in the last few releases. >> >> You should try bumping your spark.sql.shuffle.partitions to a value >> higher than default (5x or 10x). Also increase your shuffle memory fraction >> as you really are not explicitly caching anything. You could simply swap >> the fractions in your case. >> >> Regards >> Sab >> >> On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph < >> prabhujose.ga...@gmail.com> wrote: >> >>> It is a Spark-SQL and the version used is Spark-1.2.1. >>> >>> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan < >>> sabarish.sasidha...@manthan.com> wrote: >>> >>>> I believe the OP is using Spark SQL and not Hive on Spark. >>>> >>>> Regards >>>> Sab >>>> >>>> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> I think the only version of Spark that works OK with Hive (Hive on >>>>> Spark engine) is version 1.3.1. I also get OOM from time to time and have >>>>> to revert using MR >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> >>>>> On 14 March 2016 at 08:06, Sabarish Sasidharan < >>>>> sabarish.sasidha...@manthan.com> wrote: >>>>> >>>>>> Which version of Spark are you using? The configuration varies by >>>>>> version. >>>>>> >>>>>> Regards >>>>>> Sab >>>>>> >>>>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph < >>>>>> prabhujose.ga...@gmail.com> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> A Hive Join query which runs fine and faster in MapReduce takes lot >>>>>>> of time with Spark and finally fails with OOM. >>>>>>> >>>>>>> *Query: hivejoin.py* >>>>>>> >>>>>>> from pyspark import SparkContext, SparkConf >>>>>>> from pyspark.sql import HiveContext >>>>>>> conf = SparkConf().setAppName("Hive_Join") >>>>>>> sc = SparkContext(conf=conf) >>>>>>> hiveCtx = HiveContext(sc) >>>>>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a >>>>>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id = >>>>>>> a.instance_id"); >>>>>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect() >>>>>>> print results >>>>>>> >>>>>>> >>>>>>> *Data Study:* >>>>>>> >>>>>>> Number of Rows: >>>>>>> >>>>>>> A table has 1002093508 >>>>>>> B table has 5371668 >>>>>>> C table has 1000 >>>>>>> >>>>>>> No Data Skewness: >>>>>>> >>>>>>> item_id in B is unique and A has multiple rows with same item_id, so >>>>>>> after first INNER_JOIN the result set is same 1002093508 rows >>>>>>> >>>>>>> instance_id in C is unique and A has multiple rows with same >>>>>>> instance_id (maximum count of number of rows with same instance_id is >>>>>>> 250) >>>>>>> >>>>>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. >>>>>>> YARN has allotted all the requested resource immediately and no other >>>>>>> job >>>>>>> is running on the >>>>>>> cluster. >>>>>>> >>>>>>> spark.storage.memoryFraction 0.6 >>>>>>> spark.shuffle.memoryFraction 0.2 >>>>>>> >>>>>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle >>>>>>> write 500GB of intermediate data >>>>>>> >>>>>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL >>>>>>> and output of 400GB is shuffled >>>>>>> >>>>>>> Stage 4 - tasks fails with OOM on reading the shuffled output data >>>>>>> when it reached 40GB data itself >>>>>>> >>>>>>> First of all, what kind of Hive queries when run on Spark gets a >>>>>>> better performance than Mapreduce. And what are the hive queries that >>>>>>> won't >>>>>>> perform >>>>>>> well in Spark. >>>>>>> >>>>>>> How to calculate the optimal Heap for Executor Memory and the number >>>>>>> of executors for given input data size. We don't specify Spark >>>>>>> Executors to >>>>>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why >>>>>>> Stage 4 >>>>>>> is failing immediately >>>>>>> when it has just read 40GB data, is it caching data in Memory. >>>>>>> >>>>>>> And in a Spark job, some stage will need lot of memory for shuffle >>>>>>> and some need lot of memory for cache. So, when a Spark Executor has >>>>>>> lot of >>>>>>> memory available >>>>>>> for cache and does not use the cache but when there is a need to do >>>>>>> lot of shuffle, will executors only use the shuffle fraction which is >>>>>>> set >>>>>>> for doing shuffle or will it use >>>>>>> the free memory available for cache as well. >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Prabhu Joseph >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Architect - Big Data >>>>>> Ph: +91 99805 99458 >>>>>> >>>>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and >>>>>> Sullivan India ICT)* >>>>>> +++ >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> Architect - Big Data >>>> Ph: +91 99805 99458 >>>> >>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and >>>> Sullivan India ICT)* >>>> +++ >>>> >>> >>> >> >