from what i can tell from your code you are trying to execute a job
within a job. This just doesn't work.
your main method should look like this:
|publicstaticvoidmain(String[]args)throwsException{doublepi =new
classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|
On 06.06.2016 21:14, Ser Kho wrote:
The question is how to encapsulate numerous transformations into one
object or may be a function in Apache Flink Java setting. I have tried
to investigate this question using an example of Pi calculation (see
below). I am wondering whether or not the suggested approach is valid
from the Flink's point of view. It works on one computer, however, I
do not know how it will behave in a cluster setup. The code is given
below, and the main idea behind it as follows:
1. Create a class, named classPI, which method compute() does all
data transformations, see more about it below.
2. In the main method create a DataSet as in *DataSet< classPI > opi
= env.fromElements(new classPI());*
3.
Create *DataSet< Double > PI*, which equals output of
transformation map() that calls the object PI's method compute() as in
*DataSet< Double > PI = opi.map(new MapFunction< classPI ,
Double>() { public Double map(classPI objPI) { return
objPI.compute(); }});*
4.
Now about ClassPI
*
Constructor instantiates ExecutionEnvironment, which is local
for this class, as in
*public classPI(){ this.NumIter=1000000; env =
ExecutionEnvironment.getExecutionEnvironment();}*
Thus, the code has two ExecutionEnvironment objects: one in main and
another in the class classPI.
*
Has method compute() that runs all data transormations (in this
example it is just several lines but potentially it might contain
tons of Flink transfromations)
*public Double compute(){ DataSet count = env.generateSequence(1,
NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
4.0*count.collect().get(0)/NumIter;
return PI;}*
the whole code is given below. Again, the question is if this is a
valid approach for encapsulation of data transformation into a class
in Flink setup that is supposed to be parallelizable to work on a
cluster. Is there a better way to hide details of data transformations?
Thanks a lot!
-------------------------The code ----------------------
|publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{//
this is one ExecutionEnvironmentfinalExecutionEnvironmentenv
=ExecutionEnvironment.getExecutionEnvironment();// this is critical
DataSet with my classPI that computes PIDataSet<classPI>opi
=env.fromElements(newclassPI());// this map calls the method compute()
of class classPI that computes PIDataSet<Double>PI
=opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI
objPI)throwsException{// this is how I call method compute() that
calculates PI using transformations returnobjPI.compute();}});doublepi
=PI.collect().get(0);System.out.println("We estimate Pi to be:
"+pi);}// this class is of no impotance for my question, howerver, it
is relevant for pi calculation
publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex
=Math.random();doubley =Math.random();return(x *x +y *y)<1?1L:0L;}}//
this class is of no impotance for my question, howerver, it is
relevant for pi calculation
publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1
+value2;}}// this is my class that computes PI, my question is whether
such a class is valid in Flink on cluster with parallel computation
publicstaticfinalclassclassPI
{publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;//
this is constructor with another
ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env
=ExecutionEnvironment.getExecutionEnvironment();}//This is the the
method that contains all data
transformationpublicDoublecompute()throwsException{DataSet<Long>count
=env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer());PI
=4.0*count.collect().get(0)/NumIter;returnPI;}}}|