I believe the OP is using Spark SQL and not Hive on Spark. Regards Sab
On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > I think the only version of Spark that works OK with Hive (Hive on Spark > engine) is version 1.3.1. I also get OOM from time to time and have to > revert using MR > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 14 March 2016 at 08:06, Sabarish Sasidharan < > sabarish.sasidha...@manthan.com> wrote: > >> Which version of Spark are you using? The configuration varies by version. >> >> Regards >> Sab >> >> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph < >> prabhujose.ga...@gmail.com> wrote: >> >>> Hi All, >>> >>> A Hive Join query which runs fine and faster in MapReduce takes lot of >>> time with Spark and finally fails with OOM. >>> >>> *Query: hivejoin.py* >>> >>> from pyspark import SparkContext, SparkConf >>> from pyspark.sql import HiveContext >>> conf = SparkConf().setAppName("Hive_Join") >>> sc = SparkContext(conf=conf) >>> hiveCtx = HiveContext(sc) >>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER >>> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id = >>> a.instance_id"); >>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect() >>> print results >>> >>> >>> *Data Study:* >>> >>> Number of Rows: >>> >>> A table has 1002093508 >>> B table has 5371668 >>> C table has 1000 >>> >>> No Data Skewness: >>> >>> item_id in B is unique and A has multiple rows with same item_id, so >>> after first INNER_JOIN the result set is same 1002093508 rows >>> >>> instance_id in C is unique and A has multiple rows with same instance_id >>> (maximum count of number of rows with same instance_id is 250) >>> >>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN >>> has allotted all the requested resource immediately and no other job is >>> running on the >>> cluster. >>> >>> spark.storage.memoryFraction 0.6 >>> spark.shuffle.memoryFraction 0.2 >>> >>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write >>> 500GB of intermediate data >>> >>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and >>> output of 400GB is shuffled >>> >>> Stage 4 - tasks fails with OOM on reading the shuffled output data when >>> it reached 40GB data itself >>> >>> First of all, what kind of Hive queries when run on Spark gets a better >>> performance than Mapreduce. And what are the hive queries that won't perform >>> well in Spark. >>> >>> How to calculate the optimal Heap for Executor Memory and the number of >>> executors for given input data size. We don't specify Spark Executors to >>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4 >>> is failing immediately >>> when it has just read 40GB data, is it caching data in Memory. >>> >>> And in a Spark job, some stage will need lot of memory for shuffle and >>> some need lot of memory for cache. So, when a Spark Executor has lot of >>> memory available >>> for cache and does not use the cache but when there is a need to do lot >>> of shuffle, will executors only use the shuffle fraction which is set for >>> doing shuffle or will it use >>> the free memory available for cache as well. >>> >>> >>> Thanks, >>> Prabhu Joseph >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >> >> >> -- >> >> Architect - Big Data >> Ph: +91 99805 99458 >> >> Manthan Systems | *Company of the year - Analytics (2014 Frost and >> Sullivan India ICT)* >> +++ >> > > -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++