Hi, Henry In first example the dict d always contains only one value because the_Id is same, in second case duct grows very quickly. So, I can suggest to firstly apply map function to split you file with string on rows then please make repartition and then apply custom logic
Example: def splitf(s): return s.split("\n") rdd.flatmap(splitf).repartition(1000).map(your function) Best, Pavel On Mon, 27 Feb 2017, 06:28 Henry Tremblay, <paulhtremb...@gmail.com> wrote: > Not sure where you want me to put yield. My first try caused an error in > Spark that it could not pickle generator objects. > > On 02/26/2017 03:25 PM, ayan guha wrote: > > Hi > > We are doing similar stuff, but with large number of small-ish files. What > we do is write a function to parse a complete file, similar to your parse > file. But we use yield, instead of return and flatmap on top of it. Can you > give it a try and let us know if it works? > > On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <ko...@tresata.com> wrote: > > using wholeFiles to process formats that can not be split per line is not > "old" > > and there are plenty of problems for which RDD is still better suited than > Dataset or DataFrame currently (this might change in near future when > Dataset gets some crucial optimizations fixed). > > On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > > Hi Henry, > > Those guys in Databricks training are nuts and still use Spark 1.x for > their exams. Learning SPARK is a VERY VERY VERY old way of solving problems > using SPARK. > > The core engine of SPARK, which even I understand, has gone through > several fundamental changes. > > Just try reading the file using dataframes and try using SPARK 2.1. > > In other words it may be of tremendous benefit if you were learning to > solve problems which exists rather than problems which does not exist any > more. > > Please let me know in case I can be of any further help. > > Regards, > Gourav > > On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <paulhtremb...@gmail.com> > wrote: > > The file is so small that a stand alone python script, independent of > spark, can process the file in under a second. > > Also, the following fails: > > 1. Read the whole file in with wholeFiles > > 2. use flatMap to get 50,000 rows that looks like: Row(id="path", > line="line") > > 3. Save the results as CVS to HDFS > > 4. Read the files (there are 20) from HDFS into a df using > sqlContext.read.csv(<path>) > > 5. Convert the df to an rdd. > > 6 Create key value pairs with the key being the file path and the value > being the line. > > 7 Iterate through values > > What happens is Spark either runs out of memory, or, in my last try with a > slight variation, just hangs for 12 hours. > > Henry > > On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote: > > Hi, Tremblay. > Your file is .gz format, which is not splittable for hadoop. Perhaps the > file is loaded by only one executor. > How many executors do you start? > Perhaps repartition method could solve it, I guess. > > > On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <paulhtremb...@gmail.com> > wrote: > > I am reading in a single small file from hadoop with wholeText. If I > process each line and create a row with two cells, the first cell equal to > the name of the file, the second cell equal to the line. That code runs > fine. > > But if I just add two line of code and change the first cell based on > parsing a line, spark runs out of memory. Any idea why such a simple > process that would succeed quickly in a non spark application fails? > > Thanks! > > Henry > > CODE: > > [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp > 3816096 > /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz > > > In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp") > In [2]: rdd1.count() > Out[2]: 1 > > > In [4]: def process_file(s): > ...: text = s[1] > ...: the_id = s[0] > ...: d = {} > ...: l = text.split("\n") > ...: final = [] > ...: for line in l: > ...: d[the_id] = line > ...: final.append(Row(**d)) > ...: return final > ...: > > In [5]: rdd2 = rdd1.map(process_file) > > In [6]: rdd2.count() > Out[6]: 1 > > In [7]: rdd3 = rdd2.flatMap(lambda x: x) > > In [8]: rdd3.count() > Out[8]: 508310 > > In [9]: rdd3.take(1) > Out[9]: [Row(hdfs://ip-172-31-35-67.us > -west-2.compute.internal:8020/mnt/temp/CC-MAIN-2017011609512 > 3-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')] > > In [10]: def process_file(s): > ...: text = s[1] > ...: d = {} > ...: l = text.split("\n") > ...: final = [] > ...: the_id = "init" > ...: for line in l: > ...: if line[0:15] == 'WARC-Record-ID:': > ...: the_id = line[15:] > ...: d[the_id] = line > ...: final.append(Row(**d)) > ...: return final > > In [12]: rdd2 = rdd1.map(process_file) > > In [13]: rdd2.count() > 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on > ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for > exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider > boosting spark.yarn.executor.memoryOverhead. > 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: > Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB > physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. > 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, > ip-172-31-41-89.us-west-2.compute.internal, executor 5): > ExecutorLostFailure (executor 5 exited caused by one of the running tasks) > Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of > 10.3 GB physical memory used. Consider boosting > spark.yarn.executor.memoryOverhead. > > > -- > Henry Tremblay > Robert Half Technology > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > -- > Henry Tremblay > Robert Half Technology > > > > > > > -- > Best Regards, > Ayan Guha > > > -- > Henry Tremblay > Robert Half Technology > >