How about this:

val prev: RDD[V] = rdd.mapPartitions(partition => { /*setup()*/; partition
})
new RDD[V](prev) {
  protected def getPartitions = prev.partitions

  def compute(split: Partition, context: TaskContext) = {
    context.addOnCompleteCallback(() => /*cleanup()*/)
    firstParent[V].iterator(split, context)
  }
}


On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen <so...@cloudera.com> wrote:

> I think you're looking for foreachPartition(). You've kinda hacked it
> out of mapPartitions(). Your case has a simple solution, yes. After
> saving to the DB, you know you can close the connection, since you
> know the use of the connection has definitely just finished. But it's
> not a simpler solution for mapPartitions() since that's not really
> what you are using :)
>
> In general, mapPartitions creates an Iterator from another Iterator.
> Of course you could consume the input iterator, open the connection,
> perform operations, close the connection and return an iterator over
> the result. That works, but requires reading the entire input no
> matter what, and, reading it into memory. These may not be OK in all
> cases.
>
> Where possible, it's nicest to return an Iterator that accesses the
> source Iterator only as needed to produce elements. This means
> returning that Iterator before any work has been done. So you have to
> close the connection later when the Iterator has been exhausted.
> Really Tobias's method is trying to shim in a "cleanup()" lifecycle
> method into the Iterator. I suppose it could be done a little more
> cleanly using Guava's Iterator library, which would give you a more
> explicit way to execute something when done.
>
>
> On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska <yana.kadiy...@gmail.com>
> wrote:
> > Sean, would this work --
> >
> > rdd.mapPartitions { partition => Iterator(partition) }.foreach(
> >
> >    // Some setup code here
> >    // save partition to DB
> >    // Some cleanup code here
> > )
> >
> >
> > I tried a pretty simple example ... I can see that the setup and cleanup
> are
> > executed on the executor node, once per partition (I used
> > mapPartitionWithIndex instead of mapPartition to track this a little
> > better). Seems like an easier solution than Tobias's but I'm wondering if
> > it's perhaps incorrect
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to