Hi Sung,

On Fri, Apr 18, 2014 at 5:11 PM, Sung Hwan Chung
<coded...@cs.stanford.edu> wrote:
> while (true) {
>   rdd.map((row : Array[Double]) => {
>     row[numCols - 1] = computeSomething(row)
>   }).reduce(...)
> }
>
> If it fails at some point, I'd imagine that the intermediate info being
> stored in row[numCols - 1] will be lost. And unless Spark runs this whole
> thing from the very first iteration, things will get out of sync.

I'm not sure I completely follow what you're trying to achieve here.
But modifications to the "row" argument to the closure won't be seen
anywhere outside that closure. What the reduce() step will see will be
the output of the map step; in your code above, you're mapping the
input to nothing.

If you did this instead:

   rdd.map((row : Array[Double]) => {
     row(numCols - 1) = computeSomething(row)
     row
   }).reduce(...)

Then you'd be mapping the input to an array which just happens to be
the same as the input of the map function, with a modified item at
"numCols - 1". Note there isn't any state being kept in "row"; the
only "state" here is the output of the map function, which might be
serialized and sent somewhere else to be processed by the closure
passed to reduce().

If there's an error while processing the map function, Spark will
re-run it on a different worker node, and you should get the same
output (given that the input of your RDD is the same).

Also note that your example is always running exactly the same
computation, since it's always using the same RDD. If you want the
loop to do some sort of chaining, you'd need to create a new RDD from
the results of the reduce, and then perform your map/reduce operations
on that new RDD. And then the same rules as above would apply.

Just to illustrate what I'm trying to say, try running this in a spark shell:

var input = List(Array(1,2,3), Array(4,5,6))
var rows = sc.parallelize(input)

rows.map(row => {
  row(1) = row(1) * 2
  row
}).reduce((a1, a2) => Array(a1(0) + a2(0), a1(1) + a2(1), a1(2) + a2(2)))


And note how, after the computation runs, "input" still holds its
original value, even though the map function modified its input.


-- 
Marcelo

Reply via email to