Hi We are doing similar stuff, but with large number of small-ish files. What we do is write a function to parse a complete file, similar to your parse file. But we use yield, instead of return and flatmap on top of it. Can you give it a try and let us know if it works?
On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <ko...@tresata.com> wrote: > using wholeFiles to process formats that can not be split per line is not > "old" > > and there are plenty of problems for which RDD is still better suited than > Dataset or DataFrame currently (this might change in near future when > Dataset gets some crucial optimizations fixed). > > On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > >> Hi Henry, >> >> Those guys in Databricks training are nuts and still use Spark 1.x for >> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems >> using SPARK. >> >> The core engine of SPARK, which even I understand, has gone through >> several fundamental changes. >> >> Just try reading the file using dataframes and try using SPARK 2.1. >> >> In other words it may be of tremendous benefit if you were learning to >> solve problems which exists rather than problems which does not exist any >> more. >> >> Please let me know in case I can be of any further help. >> >> Regards, >> Gourav >> >> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <paulhtremb...@gmail.com> >> wrote: >> >>> The file is so small that a stand alone python script, independent of >>> spark, can process the file in under a second. >>> >>> Also, the following fails: >>> >>> 1. Read the whole file in with wholeFiles >>> >>> 2. use flatMap to get 50,000 rows that looks like: Row(id="path", >>> line="line") >>> >>> 3. Save the results as CVS to HDFS >>> >>> 4. Read the files (there are 20) from HDFS into a df using >>> sqlContext.read.csv(<path>) >>> >>> 5. Convert the df to an rdd. >>> >>> 6 Create key value pairs with the key being the file path and the value >>> being the line. >>> >>> 7 Iterate through values >>> >>> What happens is Spark either runs out of memory, or, in my last try with >>> a slight variation, just hangs for 12 hours. >>> >>> Henry >>> >>> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote: >>> >>> Hi, Tremblay. >>> Your file is .gz format, which is not splittable for hadoop. Perhaps the >>> file is loaded by only one executor. >>> How many executors do you start? >>> Perhaps repartition method could solve it, I guess. >>> >>> >>> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <paulhtremb...@gmail.com >>> > wrote: >>> >>>> I am reading in a single small file from hadoop with wholeText. If I >>>> process each line and create a row with two cells, the first cell equal to >>>> the name of the file, the second cell equal to the line. That code runs >>>> fine. >>>> >>>> But if I just add two line of code and change the first cell based on >>>> parsing a line, spark runs out of memory. Any idea why such a simple >>>> process that would succeed quickly in a non spark application fails? >>>> >>>> Thanks! >>>> >>>> Henry >>>> >>>> CODE: >>>> >>>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp >>>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i >>>> nternal.warc.gz >>>> >>>> >>>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp") >>>> In [2]: rdd1.count() >>>> Out[2]: 1 >>>> >>>> >>>> In [4]: def process_file(s): >>>> ...: text = s[1] >>>> ...: the_id = s[0] >>>> ...: d = {} >>>> ...: l = text.split("\n") >>>> ...: final = [] >>>> ...: for line in l: >>>> ...: d[the_id] = line >>>> ...: final.append(Row(**d)) >>>> ...: return final >>>> ...: >>>> >>>> In [5]: rdd2 = rdd1.map(process_file) >>>> >>>> In [6]: rdd2.count() >>>> Out[6]: 1 >>>> >>>> In [7]: rdd3 = rdd2.flatMap(lambda x: x) >>>> >>>> In [8]: rdd3.count() >>>> Out[8]: 508310 >>>> >>>> In [9]: rdd3.take(1) >>>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/ >>>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in >>>> ternal.warc.gz='WARC/1.0\r')] >>>> >>>> In [10]: def process_file(s): >>>> ...: text = s[1] >>>> ...: d = {} >>>> ...: l = text.split("\n") >>>> ...: final = [] >>>> ...: the_id = "init" >>>> ...: for line in l: >>>> ...: if line[0:15] == 'WARC-Record-ID:': >>>> ...: the_id = line[15:] >>>> ...: d[the_id] = line >>>> ...: final.append(Row(**d)) >>>> ...: return final >>>> >>>> In [12]: rdd2 = rdd1.map(process_file) >>>> >>>> In [13]: rdd2.count() >>>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on >>>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN >>>> for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. >>>> Consider boosting spark.yarn.executor.memoryOverhead. >>>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: >>>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB >>>> physical memory used. Consider boosting spark.yarn.executor.memoryOver >>>> head. >>>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID >>>> 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5): >>>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks) >>>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of >>>> 10.3 GB physical memory used. Consider boosting >>>> spark.yarn.executor.memoryOverhead. >>>> >>>> >>>> -- >>>> Henry Tremblay >>>> Robert Half Technology >>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>> >>> >>> -- >>> Henry Tremblay >>> Robert Half Technology >>> >>> >> > -- Best Regards, Ayan Guha