Hey Stephan and Pieter,
That was the same what I thought, so I simply changed the code like this:

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)

env.execute()

transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

env.execute()
But he still not execute the two commands.
Thank you for your time.

Flo


Am 08.10.2015 um 17:41 schrieb Stephan Ewen <se...@apache.org>:

> Yes, sinks in Flink are lazy and do not trigger execution automatically. We 
> made this choice to allow multiple concurrent sinks (spitting the streams and 
> writing to many outputs concurrently). That requires explicit execution 
> triggers (env.execute()).
> 
> The exceptions are, as mentioned, the "eager" methods "collect()", "count()" 
> and "print()". They need to be eager, because the driver program needs for 
> example the "count()" value before it can possibly progress...
> 
> Stephan
> 
> 
> On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <phame...@gmail.com> wrote:
> Hi Florian,
> 
> I believe that when you call JoinPredictionAndOriginal.collect the 
> environment will execute your program up until that point. The Csv writes are 
> after this point, so in order to execute these steps I think you would have 
> to call <env>.execute() after the Csv writes to trigger the execution (where 
> <env> is the name of the variable pointing to your ExecutionEnvironment).
> 
> I hope this helps :-)
> 
> - Pieter
> 
> 2015-10-08 14:54 GMT+02:00 Florian Heyl <f.h...@gmx.de>:
> Hi,
> I need some help to figure out why one method of mine in a pipeline stops the 
> execution on the hdfs.
> I am working with the 10.0-SNAPSHOT and the code is the following (see 
> below). The method stops on the hdfs by calling the collect method 
> (JoinPredictionAndOriginal.collect) creating a data sink, which is why the 
> program stops before the two output files at the ends can be created. What am 
> I missing?
> Thank you for your time.
> 
> Best wishes,
> Flo
> 
> // method calculates the prediction error
> def CalcPredError(predictions: DataSet[LabeledVector], original: 
> DataSet[LabeledVector],
>      outputPath: String, outputPath2: String, outputPath3: String): 
> (DataSet[LabeledVector], Double) ={
> 
>   var iter = 0
> 
>   val transformPred = predictions
>     .map { tuple =>
>     iter = iter + 1
>     LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, 
> BigDecimal.RoundingMode.HALF_UP).toDouble))
>   }
> 
>   iter = 0
> 
>   val tranformOrg = original
>     .map { tuple =>
>     iter = iter + 1
>     LabeledVector(iter, DenseVector(tuple.label))
>   }
> 
>   val JoinPredictionAndOriginal = 
> transformPred.join(tranformOrg).where(0).equalTo(0) {
>     (l, r) => (l.vector.head._2, r.vector.head._2)
>   }
> 
>   val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect
> 
>   val N = list_JoinPredictionAndOriginal.length
> 
>   val residualSum = list_JoinPredictionAndOriginal.map {
>     num => pow((num._1 - num._2), 2)
>   }.sum
> 
>   val predictionError = sqrt(residualSum / N)
> 
>   original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
>   transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)
> 
>   (predictions,predictionError)
> }
> 
> 
> 
> 
>  
> 
> 

Reply via email to