Hey Stephan and Pieter, That was the same what I thought, so I simply changed the code like this:
original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) env.execute() transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) env.execute() But he still not execute the two commands. Thank you for your time. Flo Am 08.10.2015 um 17:41 schrieb Stephan Ewen <se...@apache.org>: > Yes, sinks in Flink are lazy and do not trigger execution automatically. We > made this choice to allow multiple concurrent sinks (spitting the streams and > writing to many outputs concurrently). That requires explicit execution > triggers (env.execute()). > > The exceptions are, as mentioned, the "eager" methods "collect()", "count()" > and "print()". They need to be eager, because the driver program needs for > example the "count()" value before it can possibly progress... > > Stephan > > > On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <phame...@gmail.com> wrote: > Hi Florian, > > I believe that when you call JoinPredictionAndOriginal.collect the > environment will execute your program up until that point. The Csv writes are > after this point, so in order to execute these steps I think you would have > to call <env>.execute() after the Csv writes to trigger the execution (where > <env> is the name of the variable pointing to your ExecutionEnvironment). > > I hope this helps :-) > > - Pieter > > 2015-10-08 14:54 GMT+02:00 Florian Heyl <f.h...@gmx.de>: > Hi, > I need some help to figure out why one method of mine in a pipeline stops the > execution on the hdfs. > I am working with the 10.0-SNAPSHOT and the code is the following (see > below). The method stops on the hdfs by calling the collect method > (JoinPredictionAndOriginal.collect) creating a data sink, which is why the > program stops before the two output files at the ends can be created. What am > I missing? > Thank you for your time. > > Best wishes, > Flo > > // method calculates the prediction error > def CalcPredError(predictions: DataSet[LabeledVector], original: > DataSet[LabeledVector], > outputPath: String, outputPath2: String, outputPath3: String): > (DataSet[LabeledVector], Double) ={ > > var iter = 0 > > val transformPred = predictions > .map { tuple => > iter = iter + 1 > LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, > BigDecimal.RoundingMode.HALF_UP).toDouble)) > } > > iter = 0 > > val tranformOrg = original > .map { tuple => > iter = iter + 1 > LabeledVector(iter, DenseVector(tuple.label)) > } > > val JoinPredictionAndOriginal = > transformPred.join(tranformOrg).where(0).equalTo(0) { > (l, r) => (l.vector.head._2, r.vector.head._2) > } > > val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect > > val N = list_JoinPredictionAndOriginal.length > > val residualSum = list_JoinPredictionAndOriginal.map { > num => pow((num._1 - num._2), 2) > }.sum > > val predictionError = sqrt(residualSum / N) > > original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE) > transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE) > > (predictions,predictionError) > } > > > > > > >