Hi,
I need some help to figure out why one method of mine in a pipeline stops the 
execution on the hdfs.
I am working with the 10.0-SNAPSHOT and the code is the following (see below). 
The method stops on the hdfs by calling the collect method 
(JoinPredictionAndOriginal.collect) creating a data sink, which is why the 
program stops before the two output files at the ends can be created. What am I 
missing?
Thank you for your time.

Best wishes,
Flo

// method calculates the prediction error
def CalcPredError(predictions: DataSet[LabeledVector], original: 
DataSet[LabeledVector],
     outputPath: String, outputPath2: String, outputPath3: String): 
(DataSet[LabeledVector], Double) ={

  var iter = 0

  val transformPred = predictions
    .map { tuple =>
    iter = iter + 1
    LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, 
BigDecimal.RoundingMode.HALF_UP).toDouble))
  }

  iter = 0

  val tranformOrg = original
    .map { tuple =>
    iter = iter + 1
    LabeledVector(iter, DenseVector(tuple.label))
  }

  val JoinPredictionAndOriginal = 
transformPred.join(tranformOrg).where(0).equalTo(0) {
    (l, r) => (l.vector.head._2, r.vector.head._2)
  }

  val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect

  val N = list_JoinPredictionAndOriginal.length

  val residualSum = list_JoinPredictionAndOriginal.map {
    num => pow((num._1 - num._2), 2)
  }.sum

  val predictionError = sqrt(residualSum / N)

  original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
  transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

  (predictions,predictionError)
}




 

Reply via email to