I am actually using Spark 2.1 and trying to solve a real life problem. Unfortunately, some of the discussion of my problem went off line, and then I started a new thread.

Here is my problem. I am parsing crawl data which exists in a flat file format. It looks like this:

u'WARC/1.0',
 u'WARC-Type: warcinfo',
 u'WARC-Date: 2016-12-08T13:00:23Z',
 u'WARC-Record-ID: <urn:uuid:f609f246-df68-46ef-

a1c5-2f66e833ffd6>',
 u'Content-Length: 344',
 u'Content-Type: application/warc-fields',
u'WARC-Filename: CC-MAIN-20161202170900-00000-ip-10-31-129-80.ec2.internal.warc.gz',
 u'',
 u'robots: classic',
 u'hostname: ip-10-31-129-80.ec2.internal',
 u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
 u'isPartOf: CC-MAIN-2016-50',
 u'operator: CommonCrawl Admin',
 u'description: Wide crawl of the web for November 2016',
 u'publisher: CommonCrawl',
 u'format: WARC File Format 1.0',
u'conformsTo: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf <http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf>',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: request',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: <urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b>',
 u'Content-Length: 220',
 u'Content-Type: application/http; msgtype=request',
 u'WARC-Warcinfo-ID: <urn:uuid:f609f246-df68-46ef-a1c5-2f66e833ffd6>',
 u'WARC-IP-Address: 217.197.115.133',
u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ <http://1018201.vkrugudruzei.ru/blog/>',
 u'',
 u'GET /blog/ HTTP/1.0',
 u'Host: 1018201.vkrugudruzei.ru <http://1018201.vkrugudruzei.ru>',
 u'Accept-Encoding: x-gzip, gzip, deflate',
u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/) <http://commoncrawl.org/faq/%29>', u'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
 u'',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: response',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: <urn:uuid:4c5e6d1a-e64f-4b6e-8101-c5e46feb84a0>',
 u'Content-Length: 577',
 u'Content-Type: application/http; msgtype=response',
 u'WARC-Warcinfo-ID: <urn:uuid:f609f246-df68-46ef-a1c5-2f66e833ffd6>',
 u'WARC-Concurrent-To: <urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b>',
 u'WARC-IP-Address: 217.197.115.133',
u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ <http://1018201.vkrugudruzei.ru/blog/>',
 u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
 u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
 u'']

I want to turn it into something like this:

Row(warc-type='request',warc-
date='2016-12-02'. ward-record-id='<urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b....)

In other words, I want to turn rows into columns. There are no keywords in the flat file.

From there I can read it in as a dataframe.


On 02/26/2017 12:14 PM, Gourav Sengupta wrote:
Hi Henry,

Those guys in Databricks training are nuts and still use Spark 1.x for their exams. Learning SPARK is a VERY VERY VERY old way of solving problems using SPARK.

The core engine of SPARK, which even I understand, has gone through several fundamental changes.

Just try reading the file using dataframes and try using SPARK 2.1.

In other words it may be of tremendous benefit if you were learning to solve problems which exists rather than problems which does not exist any more.

Please let me know in case I can be of any further help.

Regards,
Gourav

On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote:

    The file is so small that a stand alone python script, independent
    of spark, can process the file in under a second.

    Also, the following fails:

    1. Read the whole file in with wholeFiles

    2. use flatMap to get 50,000 rows that looks like: Row(id="path",
    line="line")

    3. Save the results as CVS to HDFS

    4. Read the files (there are 20) from HDFS into a df using
    sqlContext.read.csv(<path>)

    5. Convert the df to an rdd.

    6 Create key value pairs with the key being the file path and the
    value being the line.

    7 Iterate through values

    What happens is Spark either runs out of memory, or, in my last
    try with a slight variation, just hangs for 12 hours.

    Henry


    On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
    Hi, Tremblay.
    Your file is .gz format, which is not splittable for hadoop.
    Perhaps the file is loaded by only one executor.
    How many executors do you start?
    Perhaps repartition method could solve it, I guess.


    On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
    <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote:

        I am reading in a single small file from hadoop with
        wholeText. If I process each line and create a row with two
        cells, the first cell equal to the name of the file, the
        second cell equal to the line. That code runs fine.

        But if I just add two line of code and change the first cell
        based on parsing a line, spark runs out of memory. Any idea
        why such a simple process that would succeed quickly in a non
        spark application fails?

        Thanks!

        Henry

        CODE:

        [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
        3816096
        
/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz


        In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
        In [2]: rdd1.count()
        Out[2]: 1


        In [4]: def process_file(s):
           ...:     text = s[1]
           ...:     the_id = s[0]
           ...:     d = {}
           ...:     l =  text.split("\n")
           ...:     final = []
           ...:     for line in l:
           ...:         d[the_id] = line
           ...:         final.append(Row(**d))
           ...:     return final
           ...:

        In [5]: rdd2 = rdd1.map(process_file)

        In [6]: rdd2.count()
        Out[6]: 1

        In [7]: rdd3 = rdd2.flatMap(lambda x: x)

        In [8]: rdd3.count()
        Out[8]: 508310

        In [9]: rdd3.take(1)
        Out[9]: [Row(hdfs://ip-172-31-35-67.us
        
<http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
        <http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]

        In [10]: def process_file(s):
            ...:     text = s[1]
            ...:     d = {}
            ...:     l =  text.split("\n")
            ...:     final = []
            ...:     the_id = "init"
            ...:     for line in l:
            ...:         if line[0:15] == 'WARC-Record-ID:':
            ...:             the_id = line[15:]
            ...:         d[the_id] = line
            ...:         final.append(Row(**d))
            ...:     return final

        In [12]: rdd2 = rdd1.map(process_file)

        In [13]: rdd2.count()
        17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
        ip-172-31-41-89.us-west-2.compute.internal: Container killed
        by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
        physical memory used. Consider boosting
        spark.yarn.executor.memoryOverhead.
        17/02/25 19:03:03 WARN
        YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed
        by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
        physical memory used. Consider boosting
        spark.yarn.executor.memoryOverhead.
        17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage
        5.0 (TID 5, ip-172-31-41-89.us-west-2.compute.internal,
        executor 5): ExecutorLostFailure (executor 5 exited caused by
        one of the running tasks) Reason: Container killed by YARN
        for exceeding memory limits. 10.3 GB of 10.3 GB physical
        memory used. Consider boosting
        spark.yarn.executor.memoryOverhead.


-- Henry Tremblay
        Robert Half Technology


        ---------------------------------------------------------------------
        To unsubscribe e-mail: user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>



-- Henry Tremblay
    Robert Half Technology



--
Henry Tremblay
Robert Half Technology

Reply via email to