Hi, Henry

In first example the dict d always contains only one value because the_Id
is same, in second case duct grows very quickly.
So, I can suggest to firstly apply map function to split you file with
string on rows then please make repartition and then apply custom logic

Example:

def splitf(s):
    return s.split("\n")

rdd.flatmap(splitf).repartition(1000).map(your function)

Best,
Pavel

On Mon, 27 Feb 2017, 06:28 Henry Tremblay, <paulhtremb...@gmail.com> wrote:

> Not sure where you want me to put yield. My first try caused an error in
> Spark that it could not pickle generator objects.
>
> On 02/26/2017 03:25 PM, ayan guha wrote:
>
> Hi
>
> We are doing similar stuff, but with large number of small-ish files. What
> we do is write a function to parse a complete file, similar to your parse
> file. But we use yield, instead of return and flatmap on top of it. Can you
> give it a try and let us know if it works?
>
> On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
> using wholeFiles to process formats that can not be split per line is not
> "old"
>
> and there are plenty of problems for which RDD is still better suited than
> Dataset or DataFrame currently (this might change in near future when
> Dataset gets some crucial optimizations fixed).
>
> On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for
> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
> using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to
> solve problems which exists rather than problems which does not exist any
> more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <paulhtremb...@gmail.com>
> wrote:
>
> The file is so small that a stand alone python script, independent of
> spark, can process the file in under a second.
>
> Also, the following fails:
>
> 1. Read the whole file in with wholeFiles
>
> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
> line="line")
>
> 3. Save the results as CVS to HDFS
>
> 4. Read the files (there are 20) from HDFS into a df using
> sqlContext.read.csv(<path>)
>
> 5. Convert the df to an rdd.
>
> 6 Create key value pairs with the key being the file path and the value
> being the line.
>
> 7 Iterate through values
>
> What happens is Spark either runs out of memory, or, in my last try with a
> slight variation, just hangs for 12 hours.
>
> Henry
>
> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>
> Hi, Tremblay.
> Your file is .gz format, which is not splittable for hadoop. Perhaps the
> file is loaded by only one executor.
> How many executors do you start?
> Perhaps repartition method could solve it, I guess.
>
>
> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <paulhtremb...@gmail.com>
> wrote:
>
> I am reading in a single small file from hadoop with wholeText. If I
> process each line and create a row with two cells, the first cell equal to
> the name of the file, the second cell equal to the line. That code runs
> fine.
>
> But if I just add two line of code and change the first cell based on
> parsing a line, spark runs out of memory. Any idea why such a simple
> process that would succeed quickly in a non spark application fails?
>
> Thanks!
>
> Henry
>
> CODE:
>
> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
> 3816096
> /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>
>
> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
> In [2]: rdd1.count()
> Out[2]: 1
>
>
> In [4]: def process_file(s):
>    ...:     text = s[1]
>    ...:     the_id = s[0]
>    ...:     d = {}
>    ...:     l =  text.split("\n")
>    ...:     final = []
>    ...:     for line in l:
>    ...:         d[the_id] = line
>    ...:         final.append(Row(**d))
>    ...:     return final
>    ...:
>
> In [5]: rdd2 = rdd1.map(process_file)
>
> In [6]: rdd2.count()
> Out[6]: 1
>
> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
> In [8]: rdd3.count()
> Out[8]: 508310
>
> In [9]: rdd3.take(1)
> Out[9]: [Row(hdfs://ip-172-31-35-67.us
> -west-2.compute.internal:8020/mnt/temp/CC-MAIN-2017011609512
> 3-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')]
>
> In [10]: def process_file(s):
>     ...:     text = s[1]
>     ...:     d = {}
>     ...:     l =  text.split("\n")
>     ...:     final = []
>     ...:     the_id = "init"
>     ...:     for line in l:
>     ...:         if line[0:15] == 'WARC-Record-ID:':
>     ...:             the_id = line[15:]
>     ...:         d[the_id] = line
>     ...:         final.append(Row(**d))
>     ...:     return final
>
> In [12]: rdd2 = rdd1.map(process_file)
>
> In [13]: rdd2.count()
> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for
> exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
> ip-172-31-41-89.us-west-2.compute.internal, executor 5):
> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
> 10.3 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>

Reply via email to