"The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting."
Implement CustomUnaryOperation. This can then be applied to a DataSet by calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`. On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho <khov2...@yahoo.com> wrote: > The question is how to encapsulate numerous transformations into one > object or may be a function in Apache Flink Java setting. I have tried to > investigate this question using an example of Pi calculation (see below). I > am wondering whether or not the suggested approach is valid from the > Flink's point of view. It works on one computer, however, I do not know how > it will behave in a cluster setup. The code is given below, and the main > idea behind it as follows: > > 1. Create a class, named classPI, which method compute() does all data > transformations, see more about it below. > 2. In the main method create a DataSet as in *DataSet< classPI > opi = > env.fromElements(new classPI());* > 3. Create *DataSet< Double > PI*, which equals output of > transformation map() that calls the object PI's method compute() as in > *DataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() { > public Double map(classPI objPI) { return objPI.compute(); }});* > 4. Now about ClassPI > - Constructor instantiates ExecutionEnvironment, which is local for > this class, as in > *public classPI(){ this.NumIter=1000000; env = > ExecutionEnvironment.getExecutionEnvironment();}* > > Thus, the code has two ExecutionEnvironment objects: one in main and > another in the class classPI. > > - Has method compute() that runs all data transormations (in this > example it is just several lines but potentially it might contain tons of > Flink transfromations) > > *public Double compute(){ DataSet count = env.generateSequence(1, NumIter) > .map(new Sampler()) .reduce(new SumReducer()); PI = > 4.0*count.collect().get(0)/NumIter; return PI;}* > > the whole code is given below. Again, the question is if this is a valid > approach for encapsulation of data transformation into a class in Flink > setup that is supposed to be parallelizable to work on a cluster. Is there > a better way to hide details of data transformations? > Thanks a lot! > > -------------------------The code ---------------------- > > public class PiEstimation{ > public static void main(String[] args) throws Exception {// this is one > ExecutionEnvironment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); // this is critical DataSet > with my classPI that computes PI > DataSet<classPI> opi = env.fromElements(new classPI());// this map calls the > method compute() of class classPI that computes PI > DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() { > public Double map(classPI objPI) throws Exception { > // this is how I call method compute() that calculates PI using > transformations > return objPI.compute(); } }); > > double pi = PI.collect().get(0); > System.out.println("We estimate Pi to be: " + pi); } > // this class is of no impotance for my question, howerver, it is relevant > for pi calculation public static class Sampler implements MapFunction<Long, > Long> {@Overridepublic Long map(Long value) { > double x = Math.random(); > double y = Math.random(); > return (x * x + y * y) < 1 ? 1L : 0L;}} > // this class is of no impotance for my question, howerver, it is relevant > for pi calculation public static final class SumReducer implements > ReduceFunction<Long>{ > @Override > public Long reduce(Long value1, Long value2) { > return value1 + value2;}} > // this is my class that computes PI, my question is whether such a class is > valid in Flink on cluster with parallel computation public static final > class classPI{ > public Integer NumIter; > private final ExecutionEnvironment env; > public Double PI; > > // this is constructor with another ExecutionEnvironment > public classPI(){ > this.NumIter=1000000; > env = ExecutionEnvironment.getExecutionEnvironment(); > } > //This is the the method that contains all data transformation > public Double compute() throws Exception{ > DataSet<Long> count = env.generateSequence(1, NumIter) > .map(new Sampler()) > .reduce(new SumReducer()); > PI = 4.0*count.collect().get(0)/NumIter; > return PI;}}} > >