Hi Florian,

I believe that when you call *JoinPredictionAndOriginal.collect* the
environment will execute your program up until that point. The Csv writes
are after this point, so in order to execute these steps I think you would
have to call *<env>.execute()* after the Csv writes to trigger the
execution (where <env> is the name of the variable pointing to your
ExecutionEnvironment).

I hope this helps :-)

- Pieter

2015-10-08 14:54 GMT+02:00 Florian Heyl <f.h...@gmx.de>:

> Hi,
> I need some help to figure out why one method of mine in a pipeline stops
> the execution on the hdfs.
> I am working with the 10.0-SNAPSHOT and the code is the following (see
> below). The method stops on the hdfs by calling the collect method (
> JoinPredictionAndOriginal.collect) creating a data sink, which is why the
> program stops before the two output files at the ends can be created. What
> am I missing?
> Thank you for your time.
>
> Best wishes,
> Flo
>
> // method calculates the prediction error
> def CalcPredError(predictions: DataSet[LabeledVector], original: 
> DataSet[LabeledVector],
>      outputPath: String, outputPath2: String, outputPath3: String): 
> (DataSet[LabeledVector], Double) ={
>
>   var iter = 0
>
>   val transformPred = predictions
>     .map { tuple =>
>     iter = iter + 1
>     LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, 
> BigDecimal.RoundingMode.HALF_UP).toDouble))
>   }
>
>   iter = 0
>
>   val tranformOrg = original
>     .map { tuple =>
>     iter = iter + 1
>     LabeledVector(iter, DenseVector(tuple.label))
>   }
>
>
>   val JoinPredictionAndOriginal = 
> transformPred.join(tranformOrg).where(0).equalTo(0) {
>     (l, r) => (l.vector.head._2, r.vector.head._2)
>   }
>
>   val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect
>
>   val N = list_JoinPredictionAndOriginal.length
>
>   val residualSum = list_JoinPredictionAndOriginal.map {
>     num => pow((num._1 - num._2), 2)
>   }.sum
>
>   val predictionError = sqrt(residualSum / N)
>
>   original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>   transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
>
>   (predictions,predictionError)
> }
>
>
>
>
>
>
>

Reply via email to