Hi

We are doing similar stuff, but with large number of small-ish files. What
we do is write a function to parse a complete file, similar to your parse
file. But we use yield, instead of return and flatmap on top of it. Can you
give it a try and let us know if it works?

On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <ko...@tresata.com> wrote:

> using wholeFiles to process formats that can not be split per line is not
> "old"
>
> and there are plenty of problems for which RDD is still better suited than
> Dataset or DataFrame currently (this might change in near future when
> Dataset gets some crucial optimizations fixed).
>
> On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi Henry,
>>
>> Those guys in Databricks training are nuts and still use Spark 1.x for
>> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
>> using SPARK.
>>
>> The core engine of SPARK, which even I understand, has gone through
>> several fundamental changes.
>>
>> Just try reading the file using dataframes and try using SPARK 2.1.
>>
>> In other words it may be of tremendous benefit if you were learning to
>> solve problems which exists rather than problems which does not exist any
>> more.
>>
>> Please let me know in case I can be of any further help.
>>
>> Regards,
>> Gourav
>>
>> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <paulhtremb...@gmail.com>
>> wrote:
>>
>>> The file is so small that a stand alone python script, independent of
>>> spark, can process the file in under a second.
>>>
>>> Also, the following fails:
>>>
>>> 1. Read the whole file in with wholeFiles
>>>
>>> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
>>> line="line")
>>>
>>> 3. Save the results as CVS to HDFS
>>>
>>> 4. Read the files (there are 20) from HDFS into a df using
>>> sqlContext.read.csv(<path>)
>>>
>>> 5. Convert the df to an rdd.
>>>
>>> 6 Create key value pairs with the key being the file path and the value
>>> being the line.
>>>
>>> 7 Iterate through values
>>>
>>> What happens is Spark either runs out of memory, or, in my last try with
>>> a slight variation, just hangs for 12 hours.
>>>
>>> Henry
>>>
>>> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>>>
>>> Hi, Tremblay.
>>> Your file is .gz format, which is not splittable for hadoop. Perhaps the
>>> file is loaded by only one executor.
>>> How many executors do you start?
>>> Perhaps repartition method could solve it, I guess.
>>>
>>>
>>> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <paulhtremb...@gmail.com
>>> > wrote:
>>>
>>>> I am reading in a single small file from hadoop with wholeText. If I
>>>> process each line and create a row with two cells, the first cell equal to
>>>> the name of the file, the second cell equal to the line. That code runs
>>>> fine.
>>>>
>>>> But if I just add two line of code and change the first cell based on
>>>> parsing a line, spark runs out of memory. Any idea why such a simple
>>>> process that would succeed quickly in a non spark application fails?
>>>>
>>>> Thanks!
>>>>
>>>> Henry
>>>>
>>>> CODE:
>>>>
>>>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
>>>> nternal.warc.gz
>>>>
>>>>
>>>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>>> In [2]: rdd1.count()
>>>> Out[2]: 1
>>>>
>>>>
>>>> In [4]: def process_file(s):
>>>>    ...:     text = s[1]
>>>>    ...:     the_id = s[0]
>>>>    ...:     d = {}
>>>>    ...:     l =  text.split("\n")
>>>>    ...:     final = []
>>>>    ...:     for line in l:
>>>>    ...:         d[the_id] = line
>>>>    ...:         final.append(Row(**d))
>>>>    ...:     return final
>>>>    ...:
>>>>
>>>> In [5]: rdd2 = rdd1.map(process_file)
>>>>
>>>> In [6]: rdd2.count()
>>>> Out[6]: 1
>>>>
>>>> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>>
>>>> In [8]: rdd3.count()
>>>> Out[8]: 508310
>>>>
>>>> In [9]: rdd3.take(1)
>>>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>>>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>>> ternal.warc.gz='WARC/1.0\r')]
>>>>
>>>> In [10]: def process_file(s):
>>>>     ...:     text = s[1]
>>>>     ...:     d = {}
>>>>     ...:     l =  text.split("\n")
>>>>     ...:     final = []
>>>>     ...:     the_id = "init"
>>>>     ...:     for line in l:
>>>>     ...:         if line[0:15] == 'WARC-Record-ID:':
>>>>     ...:             the_id = line[15:]
>>>>     ...:         d[the_id] = line
>>>>     ...:         final.append(Row(**d))
>>>>     ...:     return final
>>>>
>>>> In [12]: rdd2 = rdd1.map(process_file)
>>>>
>>>> In [13]: rdd2.count()
>>>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>>>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN
>>>> for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used.
>>>> Consider boosting spark.yarn.executor.memoryOverhead.
>>>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>>>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>>>> head.
>>>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID
>>>> 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>>>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>>>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>>>> 10.3 GB physical memory used. Consider boosting
>>>> spark.yarn.executor.memoryOverhead.
>>>>
>>>>
>>>> --
>>>> Henry Tremblay
>>>> Robert Half Technology
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>> --
>>> Henry Tremblay
>>> Robert Half Technology
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Reply via email to