Hi Florian, I believe that when you call *JoinPredictionAndOriginal.collect* the environment will execute your program up until that point. The Csv writes are after this point, so in order to execute these steps I think you would have to call *<env>.execute()* after the Csv writes to trigger the execution (where <env> is the name of the variable pointing to your ExecutionEnvironment).
I hope this helps :-) - Pieter 2015-10-08 14:54 GMT+02:00 Florian Heyl <f.h...@gmx.de>: > Hi, > I need some help to figure out why one method of mine in a pipeline stops > the execution on the hdfs. > I am working with the 10.0-SNAPSHOT and the code is the following (see > below). The method stops on the hdfs by calling the collect method ( > JoinPredictionAndOriginal.collect) creating a data sink, which is why the > program stops before the two output files at the ends can be created. What > am I missing? > Thank you for your time. > > Best wishes, > Flo > > // method calculates the prediction error > def CalcPredError(predictions: DataSet[LabeledVector], original: > DataSet[LabeledVector], > outputPath: String, outputPath2: String, outputPath3: String): > (DataSet[LabeledVector], Double) ={ > > var iter = 0 > > val transformPred = predictions > .map { tuple => > iter = iter + 1 > LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, > BigDecimal.RoundingMode.HALF_UP).toDouble)) > } > > iter = 0 > > val tranformOrg = original > .map { tuple => > iter = iter + 1 > LabeledVector(iter, DenseVector(tuple.label)) > } > > > val JoinPredictionAndOriginal = > transformPred.join(tranformOrg).where(0).equalTo(0) { > (l, r) => (l.vector.head._2, r.vector.head._2) > } > > val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect > > val N = list_JoinPredictionAndOriginal.length > > val residualSum = list_JoinPredictionAndOriginal.map { > num => pow((num._1 - num._2), 2) > }.sum > > val predictionError = sqrt(residualSum / N) > > original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) > transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) > > (predictions,predictionError) > } > > > > > > >