How about this: val prev: RDD[V] = rdd.mapPartitions(partition => { /*setup()*/; partition }) new RDD[V](prev) { protected def getPartitions = prev.partitions
def compute(split: Partition, context: TaskContext) = { context.addOnCompleteCallback(() => /*cleanup()*/) firstParent[V].iterator(split, context) } } On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen <so...@cloudera.com> wrote: > I think you're looking for foreachPartition(). You've kinda hacked it > out of mapPartitions(). Your case has a simple solution, yes. After > saving to the DB, you know you can close the connection, since you > know the use of the connection has definitely just finished. But it's > not a simpler solution for mapPartitions() since that's not really > what you are using :) > > In general, mapPartitions creates an Iterator from another Iterator. > Of course you could consume the input iterator, open the connection, > perform operations, close the connection and return an iterator over > the result. That works, but requires reading the entire input no > matter what, and, reading it into memory. These may not be OK in all > cases. > > Where possible, it's nicest to return an Iterator that accesses the > source Iterator only as needed to produce elements. This means > returning that Iterator before any work has been done. So you have to > close the connection later when the Iterator has been exhausted. > Really Tobias's method is trying to shim in a "cleanup()" lifecycle > method into the Iterator. I suppose it could be done a little more > cleanly using Guava's Iterator library, which would give you a more > explicit way to execute something when done. > > > On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska <yana.kadiy...@gmail.com> > wrote: > > Sean, would this work -- > > > > rdd.mapPartitions { partition => Iterator(partition) }.foreach( > > > > // Some setup code here > > // save partition to DB > > // Some cleanup code here > > ) > > > > > > I tried a pretty simple example ... I can see that the setup and cleanup > are > > executed on the executor node, once per partition (I used > > mapPartitionWithIndex instead of mapPartition to track this a little > > better). Seems like an easier solution than Tobias's but I'm wondering if > > it's perhaps incorrect > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >