On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar <rokros...@gmail.com> wrote: > the runtime for each consecutive iteration is still roughly twice as long as > for the previous one -- is there a way to reduce whatever overhead is > accumulating?
Sorry, I didn't fully understand you question, which two are you comparing? PySpark will try to combine the multiple map() together, then you will get a task which need all the lookup_tables (the same size as before). You could add a checkpoint after some of the iterations. > On Feb 11, 2015, at 8:11 PM, Davies Liu <dav...@databricks.com> wrote: > >> On Wed, Feb 11, 2015 at 10:47 AM, rok <rokros...@gmail.com> wrote: >>> I was having trouble with memory exceptions when broadcasting a large lookup >>> table, so I've resorted to processing it iteratively -- but how can I modify >>> an RDD iteratively? >>> >>> I'm trying something like : >>> >>> rdd = sc.parallelize(...) >>> lookup_tables = {...} >>> >>> for lookup_table in lookup_tables : >>> rdd = rdd.map(lambda x: func(x, lookup_table)) >>> >>> If I leave it as is, then only the last "lookup_table" is applied instead of >>> stringing together all the maps. However, if add a .cache() to the .map then >>> it seems to work fine. >> >> This is the something related to Python closure implementation, you should >> do it like this: >> >> def create_func(lookup_table): >> return lambda x: func(x, lookup_table) >> >> for lookup_table in lookup_tables: >> rdd = rdd.map(create_func(lookup_table)) >> >> The Python closure just remember the variable, not copy the value of it. >> In the loop, `lookup_table` is the same variable. When we serialize the final >> rdd, all the closures are referring to the same `lookup_table`, which points >> to the last value. >> >> When we create the closure in a function, Python create a variable for >> each closure, so it works. >> >>> A second problem is that the runtime for each iteration roughly doubles at >>> each iteration so this clearly doesn't seem to be the way to do it. What is >>> the preferred way of doing such repeated modifications to an RDD and how can >>> the accumulation of overhead be minimized? >>> >>> Thanks! >>> >>> Rok >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org