Hi, I need some help to figure out why one method of mine in a pipeline stops the execution on the hdfs. I am working with the 10.0-SNAPSHOT and the code is the following (see below). The method stops on the hdfs by calling the collect method (JoinPredictionAndOriginal.collect) creating a data sink, which is why the program stops before the two output files at the ends can be created. What am I missing? Thank you for your time.
Best wishes, Flo // method calculates the prediction error def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector], outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector], Double) ={ var iter = 0 val transformPred = predictions .map { tuple => iter = iter + 1 LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble)) } iter = 0 val tranformOrg = original .map { tuple => iter = iter + 1 LabeledVector(iter, DenseVector(tuple.label)) } val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0) { (l, r) => (l.vector.head._2, r.vector.head._2) } val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect val N = list_JoinPredictionAndOriginal.length val residualSum = list_JoinPredictionAndOriginal.map { num => pow((num._1 - num._2), 2) }.sum val predictionError = sqrt(residualSum / N) original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) (predictions,predictionError) }