from what i can tell from your code you are trying to execute a job within a job. This just doesn't work.

your main method should look like this:

|publicstaticvoidmain(String[]args)throwsException{doublepi =new classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|




On 06.06.2016 21:14, Ser Kho wrote:
The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:

 1. Create a class, named classPI, which method compute() does all
    data transformations, see more about it below.
 2. In the main method create a DataSet as in *DataSet< classPI > opi
    = env.fromElements(new classPI());*
3.
    Create *DataSet< Double > PI*, which equals output of
    transformation map() that calls the object PI's method compute() as in
    *DataSet< Double > PI = opi.map(new MapFunction< classPI ,
    Double>() { public Double map(classPI objPI) { return
    objPI.compute(); }});*
4.
    Now about ClassPI
     *
        Constructor instantiates ExecutionEnvironment, which is local
        for this class, as in
        *public classPI(){ this.NumIter=1000000; env =
        ExecutionEnvironment.getExecutionEnvironment();}*

Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.

 *
    Has method compute() that runs all data transormations (in this
    example it is just several lines but potentially it might contain
    tons of Flink transfromations)
    *public Double compute(){ DataSet count = env.generateSequence(1,
    NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
    4.0*count.collect().get(0)/NumIter;
    return PI;}*

the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations?
Thanks a lot!

-------------------------The code ----------------------

|publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{// this is one ExecutionEnvironmentfinalExecutionEnvironmentenv =ExecutionEnvironment.getExecutionEnvironment();// this is critical DataSet with my classPI that computes PIDataSet<classPI>opi =env.fromElements(newclassPI());// this map calls the method compute() of class classPI that computes PIDataSet<Double>PI =opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI objPI)throwsException{// this is how I call method compute() that calculates PI using transformations returnobjPI.compute();}});doublepi =PI.collect().get(0);System.out.println("We estimate Pi to be: "+pi);}// this class is of no impotance for my question, howerver, it is relevant for pi calculation publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex =Math.random();doubley =Math.random();return(x *x +y *y)<1?1L:0L;}}// this class is of no impotance for my question, howerver, it is relevant for pi calculation publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1 +value2;}}// this is my class that computes PI, my question is whether such a class is valid in Flink on cluster with parallel computation publicstaticfinalclassclassPI {publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// this is constructor with another ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env =ExecutionEnvironment.getExecutionEnvironment();}//This is the the method that contains all data transformationpublicDoublecompute()throwsException{DataSet<Long>count =env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer());PI =4.0*count.collect().get(0)/NumIter;returnPI;}}}|

Reply via email to