Cool! Now I understand how to approach this problem. At my last position, I don't think we did it quite efficiently. Maybe a blog post by me?

Henry


On 02/28/2017 01:22 AM, 颜发才(Yan Facai) wrote:
Google is your friend, Henry.
http://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions

On Tue, Feb 28, 2017 at 2:17 AM, Henry Tremblay <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote:

    Thanks! That works:

    def process_file(my_iter):
        the_id = "init"
        final = []
        for chunk in my_iter:
            lines = chunk[1].split("\n")
            for line in lines:
                if line[0:15] == 'WARC-Record-ID:':
                    the_id = line[15:]
                final.append(Row(the_id = the_id, line = line))
        return iter(final)

    rdd2 = rdd.mapPartition(process_file)

    Can anyone explain why this solution works? I am aware that an
    iterator is lazily evaluated, but my exact understanding in this
    case is vague.

    Henry


    On 02/27/2017 12:50 AM, 颜发才(Yan Facai) wrote:
    Hi, Tremblay,
    map processes text line by line, so it is not the method you need.

    However,
    mapPartition and iterator can help you maintain a state.
    like:
    
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapPartitions
    
<http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapPartitions>




    On Mon, Feb 27, 2017 at 4:24 PM, Henry Tremblay
    <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>> wrote:

        This won't work:

        rdd2 = rdd.flatMap(splitf)

        rdd2.take(1)

        [u'WARC/1.0\r']

        rdd2.count()

        508310

        If I then try to apply a map to rdd2, the map only works on
        each individual line. I need to create a state machine as in
        my second function. That is, I need to apply a key to each
        line, but the key is determined by a previous line.

        My first function below always has the same id. That was the
        point, to show that the first function succeeded while the
        second failed. In the  dictionary grows, but it has at most
        508,310 keys, In fact, most likely it will have only about
        1/10th of this or less. I used the same exact code with the
        same file with pure python, without Spark, and the process
        ran in under 1 second.

        Thanks!


        Henry

        On 02/26/2017 11:37 PM, Pavel Plotnikov wrote:

        Hi, Henry

        In first example the dict d always contains only one value
        because the_Id is same, in second case duct grows very quickly.
        So, I can suggest to firstly apply map function to split you
        file with string on rows then please make repartition and
        then apply custom logic

        Example:

        def splitf(s):
            return s.split("\n")

        rdd.flatmap(splitf).repartition(1000).map(your function)

        Best,
        Pavel


        On Mon, 27 Feb 2017, 06:28 Henry Tremblay,
        <paulhtremb...@gmail.com <mailto:paulhtremb...@gmail.com>>
        wrote:

            Not sure where you want me to put yield. My first try
            caused an error in Spark that it could not pickle
            generator objects.


            On 02/26/2017 03:25 PM, ayan guha wrote:
            Hi

            We are doing similar stuff, but with large number of
            small-ish files. What we do is write a function to
            parse a complete file, similar to your parse file. But
            we use yield, instead of return and flatmap on top of
            it. Can you give it a try and let us know if it works?

            On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers
            <ko...@tresata.com <mailto:ko...@tresata.com>> wrote:

                using wholeFiles to process formats that can not be
                split per line is not "old"

                and there are plenty of problems for which RDD is
                still better suited than Dataset or DataFrame
                currently (this might change in near future when
                Dataset gets some crucial optimizations fixed).

                On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
                <gourav.sengu...@gmail.com
                <mailto:gourav.sengu...@gmail.com>> wrote:

                    Hi Henry,

                    Those guys in Databricks training are nuts and
                    still use Spark 1.x for their exams. Learning
                    SPARK is a VERY VERY VERY old way of solving
                    problems using SPARK.

                    The core engine of SPARK, which even I
                    understand, has gone through several
                    fundamental changes.

                    Just try reading the file using dataframes and
                    try using SPARK 2.1.

                    In other words it may be of tremendous benefit
                    if you were learning to solve problems which
                    exists rather than problems which does not
                    exist any more.

                    Please let me know in case I can be of any
                    further help.

                    Regards,
                    Gourav

                    On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
                    <paulhtremb...@gmail.com
                    <mailto:paulhtremb...@gmail.com>> wrote:

                        The file is so small that a stand alone
                        python script, independent of spark, can
                        process the file in under a second.

                        Also, the following fails:

                        1. Read the whole file in with wholeFiles

                        2. use flatMap to get 50,000 rows that
                        looks like: Row(id="path", line="line")

                        3. Save the results as CVS to HDFS

                        4. Read the files (there are 20) from HDFS
                        into a df using sqlContext.read.csv(<path>)

                        5. Convert the df to an rdd.

                        6 Create key value pairs with the key being
                        the file path and the value being the line.

                        7 Iterate through values

                        What happens is Spark either runs out of
                        memory, or, in my last try with a slight
                        variation, just hangs for 12 hours.

                        Henry


                        On 02/26/2017 03:31 AM, 颜发才(Yan Facai)
                        wrote:
                        Hi, Tremblay.
                        Your file is .gz format, which is not
                        splittable for hadoop. Perhaps the file is
                        loaded by only one executor.
                        How many executors do you start?
                        Perhaps repartition method could solve it,
                        I guess.


                        On Sun, Feb 26, 2017 at 3:33 AM, Henry
                        Tremblay <paulhtremb...@gmail.com
                        <mailto:paulhtremb...@gmail.com>> wrote:

                            I am reading in a single small file
                            from hadoop with wholeText. If I
                            process each line and create a row
                            with two cells, the first cell equal
                            to the name of the file, the second
                            cell equal to the line. That code runs
                            fine.

                            But if I just add two line of code and
                            change the first cell based on parsing
                            a line, spark runs out of memory. Any
                            idea why such a simple process that
                            would succeed quickly in a non spark
                            application fails?

                            Thanks!

                            Henry

                            CODE:

                            [hadoop@ip-172-31-35-67 ~]$ hadoop fs
                            -du /mnt/temp
                            3816096
                            
/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz


                            In [1]: rdd1 =
                            sc.wholeTextFiles("/mnt/temp")
                            In [2]: rdd1.count()
                            Out[2]: 1


                            In [4]: def process_file(s):
                               ...:  text = s[1]
                               ...:  the_id = s[0]
                               ...:     d = {}
                               ...:     l = text.split("\n")
                               ...:  final = []
                               ...:  for line in l:
                               ...:  d[the_id] = line
                               ...:  final.append(Row(**d))
                               ...:  return final
                               ...:

                            In [5]: rdd2 = rdd1.map(process_file)

                            In [6]: rdd2.count()
                            Out[6]: 1

                            In [7]: rdd3 = rdd2.flatMap(lambda x: x)

                            In [8]: rdd3.count()
                            Out[8]: 508310

                            In [9]: rdd3.take(1)
                            Out[9]: [Row(hdfs://ip-172-31-35-67.us
                            
<http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
                            
<http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]

                            In [10]: def process_file(s):
                                ...:  text = s[1]
                                ...:     d = {}
                                ...:     l = text.split("\n")
                                ...:  final = []
                                ...:  the_id = "init"
                                ...:  for line in l:
                                ...:    if line[0:15] ==
                            'WARC-Record-ID:':
                                ...:        the_id = line[15:]
                                ...:    d[the_id] = line
                                ...:  final.append(Row(**d))
                                ...:  return final

                            In [12]: rdd2 = rdd1.map(process_file)

                            In [13]: rdd2.count()
                            17/02/25 19:03:03 ERROR YarnScheduler:
                            Lost executor 5 on
                            ip-172-31-41-89.us-west-2.compute.internal:
                            Container killed by YARN for exceeding
                            memory limits. 10.3 GB of 10.3 GB
                            physical memory used. Consider
                            boosting
                            spark.yarn.executor.memoryOverhead.
                            17/02/25 19:03:03 WARN
                            YarnSchedulerBackend$YarnSchedulerEndpoint:
                            Container killed by YARN for exceeding
                            memory limits. 10.3 GB of 10.3 GB
                            physical memory used. Consider
                            boosting
                            spark.yarn.executor.memoryOverhead.
                            17/02/25 19:03:03 WARN TaskSetManager:
                            Lost task 0.0 in stage 5.0 (TID 5,
                            ip-172-31-41-89.us-west-2.compute.internal,
                            executor 5): ExecutorLostFailure
                            (executor 5 exited caused by one of
                            the running tasks) Reason: Container
                            killed by YARN for exceeding memory
                            limits. 10.3 GB of 10.3 GB physical
                            memory used. Consider boosting
                            spark.yarn.executor.memoryOverhead.


-- Henry Tremblay
                            Robert Half Technology


                            
---------------------------------------------------------------------
                            To unsubscribe e-mail:
                            user-unsubscr...@spark.apache.org
                            <mailto:user-unsubscr...@spark.apache.org>



-- Henry Tremblay
                        Robert Half Technology






-- Best Regards,
            Ayan Guha

-- Henry Tremblay
            Robert Half Technology


-- Henry Tremblay
        Robert Half Technology



-- Henry Tremblay
    Robert Half Technology



--
Henry Tremblay
Robert Half Technology

Reply via email to