Hi, I change my process flow. Now I am processing a file per hour, instead of process at the end of the day.
This decreased the memory comsuption . Regards Eduardo On Thu, Mar 26, 2015 at 3:16 PM, Davies Liu <dav...@databricks.com> wrote: > Could you narrow down to a step which cause the OOM, something like: > > log2= self.sqlContext.jsonFile(path) > log2.count() > ... > out.count() > ... > > On Thu, Mar 26, 2015 at 10:34 AM, Eduardo Cusa > <eduardo.c...@usmediaconsulting.com> wrote: > > the last try was without log2.cache() and still getting out of memory > > > > I using the following conf, maybe help: > > > > > > > > conf = (SparkConf() > > .setAppName("LoadS3") > > .set("spark.executor.memory", "13g") > > .set("spark.driver.memory", "13g") > > .set("spark.driver.maxResultSize","2g") > > .set("spark.default.parallelism","200") > > .set("spark.kryoserializer.buffer.mb","512")) > > sc = SparkContext(conf=conf ) > > sqlContext = SQLContext(sc) > > > > > > > > > > > > On Thu, Mar 26, 2015 at 2:29 PM, Davies Liu <dav...@databricks.com> > wrote: > >> > >> Could you try to remove the line `log2.cache()` ? > >> > >> On Thu, Mar 26, 2015 at 10:02 AM, Eduardo Cusa > >> <eduardo.c...@usmediaconsulting.com> wrote: > >> > I running on ec2 : > >> > > >> > 1 Master : 4 CPU 15 GB RAM (2 GB swap) > >> > > >> > 2 Slaves 4 CPU 15 GB RAM > >> > > >> > > >> > the uncompressed dataset size is 15 GB > >> > > >> > > >> > > >> > > >> > On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa > >> > <eduardo.c...@usmediaconsulting.com> wrote: > >> >> > >> >> Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory. > >> >> > >> >> I ran the same code as before, I need to make any changes? > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu <dav...@databricks.com> > >> >> wrote: > >> >>> > >> >>> With batchSize = 1, I think it will become even worse. > >> >>> > >> >>> I'd suggest to go with 1.3, have a taste for the new DataFrame API. > >> >>> > >> >>> On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa > >> >>> <eduardo.c...@usmediaconsulting.com> wrote: > >> >>> > Hi Davies, I running 1.1.0. > >> >>> > > >> >>> > Now I'm following this thread that recommend use batchsize > parameter > >> >>> > = > >> >>> > 1 > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > > http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html > >> >>> > > >> >>> > if this does not work I will install 1.2.1 or 1.3 > >> >>> > > >> >>> > Regards > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu < > dav...@databricks.com> > >> >>> > wrote: > >> >>> >> > >> >>> >> What's the version of Spark you are running? > >> >>> >> > >> >>> >> There is a bug in SQL Python API [1], it's fixed in 1.2.1 and > 1.3, > >> >>> >> > >> >>> >> [1] https://issues.apache.org/jira/browse/SPARK-6055 > >> >>> >> > >> >>> >> On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa > >> >>> >> <eduardo.c...@usmediaconsulting.com> wrote: > >> >>> >> > Hi Guys, I running the following function with spark-submmit > and > >> >>> >> > de > >> >>> >> > SO > >> >>> >> > is > >> >>> >> > killing my process : > >> >>> >> > > >> >>> >> > > >> >>> >> > def getRdd(self,date,provider): > >> >>> >> > path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz' > >> >>> >> > log2= self.sqlContext.jsonFile(path) > >> >>> >> > log2.registerTempTable('log_test') > >> >>> >> > log2.cache() > >> >>> >> > >> >>> >> You only visit the table once, cache does not help here. > >> >>> >> > >> >>> >> > out=self.sqlContext.sql("SELECT user, tax from log_test > where > >> >>> >> > provider = > >> >>> >> > '"+provider+"'and country <> ''").map(lambda row: (row.user, > >> >>> >> > row.tax)) > >> >>> >> > print "out1" > >> >>> >> > return map((lambda (x,y): (x, list(y))), > >> >>> >> > sorted(out.groupByKey(2000).collect())) > >> >>> >> > >> >>> >> 100 partitions (or less) will be enough for 2G dataset. > >> >>> >> > >> >>> >> > > >> >>> >> > > >> >>> >> > The input dataset has 57 zip files (2 GB) > >> >>> >> > > >> >>> >> > The same process with a smaller dataset completed successfully > >> >>> >> > > >> >>> >> > Any ideas to debug is welcome. > >> >>> >> > > >> >>> >> > Regards > >> >>> >> > Eduardo > >> >>> >> > > >> >>> >> > > >> >>> > > >> >>> > > >> >> > >> >> > >> > > > > > >