The issue is the query hits OOM on a Stage when reading Shuffle Output from
previous stage.How come increasing shuffle memory helps to avoid OOM.

On Mon, Mar 14, 2016 at 2:28 PM, Sabarish Sasidharan <sabarish....@gmail.com
> wrote:

> Thats a pretty old version of Spark SQL. It is devoid of all the
> improvements introduced in the last few releases.
>
> You should try bumping your spark.sql.shuffle.partitions to a value
> higher than default (5x or 10x). Also increase your shuffle memory fraction
> as you really are not explicitly caching anything. You could simply swap
> the fractions in your case.
>
> Regards
> Sab
>
> On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph <prabhujose.ga...@gmail.com
> > wrote:
>
>> It is a Spark-SQL and the version used is Spark-1.2.1.
>>
>> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> I believe the OP is using Spark SQL and not Hive on Spark.
>>>
>>> Regards
>>> Sab
>>>
>>> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> I think the only version of Spark that works OK with Hive (Hive on
>>>> Spark engine) is version 1.3.1. I also get OOM from time to time and have
>>>> to revert using MR
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 14 March 2016 at 08:06, Sabarish Sasidharan <
>>>> sabarish.sasidha...@manthan.com> wrote:
>>>>
>>>>> Which version of Spark are you using? The configuration varies by
>>>>> version.
>>>>>
>>>>> Regards
>>>>> Sab
>>>>>
>>>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>>>>> prabhujose.ga...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> A Hive Join query which runs fine and faster in MapReduce takes lot
>>>>>> of time with Spark and finally fails with OOM.
>>>>>>
>>>>>> *Query:  hivejoin.py*
>>>>>>
>>>>>> from pyspark import SparkContext, SparkConf
>>>>>> from pyspark.sql import HiveContext
>>>>>> conf = SparkConf().setAppName("Hive_Join")
>>>>>> sc = SparkContext(conf=conf)
>>>>>> hiveCtx = HiveContext(sc)
>>>>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
>>>>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>>>>> a.instance_id");
>>>>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>>>>> print results
>>>>>>
>>>>>>
>>>>>> *Data Study:*
>>>>>>
>>>>>> Number of Rows:
>>>>>>
>>>>>> A table has 1002093508
>>>>>> B table has    5371668
>>>>>> C table has          1000
>>>>>>
>>>>>> No Data Skewness:
>>>>>>
>>>>>> item_id in B is unique and A has multiple rows with same item_id, so
>>>>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>>>>
>>>>>> instance_id in C is unique and A has multiple rows with same
>>>>>> instance_id (maximum count of number of rows with same instance_id is 
>>>>>> 250)
>>>>>>
>>>>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory.
>>>>>> YARN has allotted all the requested resource immediately and no other job
>>>>>> is running on the
>>>>>> cluster.
>>>>>>
>>>>>> spark.storage.memoryFraction     0.6
>>>>>> spark.shuffle.memoryFraction     0.2
>>>>>>
>>>>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
>>>>>> write 500GB of intermediate data
>>>>>>
>>>>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL
>>>>>> and output of 400GB is shuffled
>>>>>>
>>>>>> Stage 4 - tasks fails with OOM on reading the shuffled output data
>>>>>> when it reached 40GB data itself
>>>>>>
>>>>>> First of all, what kind of Hive queries when run on Spark gets a
>>>>>> better performance than Mapreduce. And what are the hive queries that 
>>>>>> won't
>>>>>> perform
>>>>>> well in Spark.
>>>>>>
>>>>>> How to calculate the optimal Heap for Executor Memory and the number
>>>>>> of executors for given input data size. We don't specify Spark Executors 
>>>>>> to
>>>>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 
>>>>>> 4
>>>>>> is failing immediately
>>>>>> when it has just read 40GB data, is it caching data in Memory.
>>>>>>
>>>>>> And in a Spark job, some stage will need lot of memory for shuffle
>>>>>> and some need lot of memory for cache. So, when a Spark Executor has lot 
>>>>>> of
>>>>>> memory available
>>>>>> for cache and does not use the cache but when there is a need to do
>>>>>> lot of shuffle, will executors only use the shuffle fraction which is set
>>>>>> for doing shuffle or will it use
>>>>>> the free memory available for cache as well.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Prabhu Joseph
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Architect - Big Data
>>>>> Ph: +91 99805 99458
>>>>>
>>>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>>>> Sullivan India ICT)*
>>>>> +++
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Architect - Big Data
>>> Ph: +91 99805 99458
>>>
>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>> Sullivan India ICT)*
>>> +++
>>>
>>
>>
>

Reply via email to