On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar <rokros...@gmail.com> wrote:
> the runtime for each consecutive iteration is still roughly twice as long as 
> for the previous one -- is there a way to reduce whatever overhead is 
> accumulating?

Sorry, I didn't fully understand you question, which two are you comparing?

PySpark will try to combine the multiple map() together, then you will get
a task which need all the lookup_tables (the same size as before).

You could add a checkpoint after some of the iterations.

> On Feb 11, 2015, at 8:11 PM, Davies Liu <dav...@databricks.com> wrote:
>
>> On Wed, Feb 11, 2015 at 10:47 AM, rok <rokros...@gmail.com> wrote:
>>> I was having trouble with memory exceptions when broadcasting a large lookup
>>> table, so I've resorted to processing it iteratively -- but how can I modify
>>> an RDD iteratively?
>>>
>>> I'm trying something like :
>>>
>>> rdd = sc.parallelize(...)
>>> lookup_tables = {...}
>>>
>>> for lookup_table in lookup_tables :
>>>    rdd = rdd.map(lambda x: func(x, lookup_table))
>>>
>>> If I leave it as is, then only the last "lookup_table" is applied instead of
>>> stringing together all the maps. However, if add a .cache() to the .map then
>>> it seems to work fine.
>>
>> This is the something related to Python closure implementation, you should
>> do it like this:
>>
>> def create_func(lookup_table):
>>     return lambda x: func(x, lookup_table)
>>
>> for lookup_table in lookup_tables:
>>    rdd = rdd.map(create_func(lookup_table))
>>
>> The Python closure just remember the variable, not copy the value of it.
>> In the loop, `lookup_table` is the same variable. When we serialize the final
>> rdd, all the closures are referring to the same `lookup_table`, which points
>> to the last value.
>>
>> When we create the closure in a function, Python create a variable for
>> each closure, so it works.
>>
>>> A second problem is that the runtime for each iteration roughly doubles at
>>> each iteration so this clearly doesn't seem to be the way to do it. What is
>>> the preferred way of doing such repeated modifications to an RDD and how can
>>> the accumulation of overhead be minimized?
>>>
>>> Thanks!
>>>
>>> Rok
>>>
>>>
>>>
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to