"The question is how to encapsulate numerous transformations into one
object or may be a function in Apache Flink Java setting."

Implement CustomUnaryOperation. This can then be applied to a DataSet by
calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`.

On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho <khov2...@yahoo.com> wrote:

> The question is how to encapsulate numerous transformations into one
> object or may be a function in Apache Flink Java setting. I have tried to
> investigate this question using an example of Pi calculation (see below). I
> am wondering whether or not the suggested approach is valid from the
> Flink's point of view. It works on one computer, however, I do not know how
> it will behave in a cluster setup. The code is given below, and the main
> idea behind it as follows:
>
>    1. Create a class, named classPI, which method compute() does all data
>    transformations, see more about it below.
>    2. In the main method create a DataSet as in *DataSet< classPI > opi =
>    env.fromElements(new classPI());*
>    3. Create *DataSet< Double > PI*, which equals output of
>    transformation map() that calls the object PI's method compute() as in
>    *DataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() {
>    public Double map(classPI objPI) { return objPI.compute(); }});*
>    4. Now about ClassPI
>    - Constructor instantiates ExecutionEnvironment, which is local for
>       this class, as in
>       *public classPI(){ this.NumIter=1000000; env =
>       ExecutionEnvironment.getExecutionEnvironment();}*
>
> Thus, the code has two ExecutionEnvironment objects: one in main and
> another in the class classPI.
>
>    - Has method compute() that runs all data transormations (in this
>    example it is just several lines but potentially it might contain tons of
>    Flink transfromations)
>
> *public Double compute(){ DataSet count = env.generateSequence(1, NumIter)
>    .map(new Sampler()) .reduce(new SumReducer()); PI =
>    4.0*count.collect().get(0)/NumIter; return PI;}*
>
> the whole code is given below. Again, the question is if this is a valid
> approach for encapsulation of data transformation into a class in Flink
> setup that is supposed to be parallelizable to work on a cluster. Is there
> a better way to hide details of data transformations?
> Thanks a lot!
>
> -------------------------The code ----------------------
>
> public class PiEstimation{
> public static void main(String[] args) throws Exception {// this is one 
> ExecutionEnvironment
>  final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();   // this is critical DataSet 
> with my classPI that computes PI
>  DataSet<classPI> opi = env.fromElements(new classPI());// this map calls the 
> method compute() of class classPI that computes PI
>  DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() {
>    public Double map(classPI  objPI) throws Exception {
>    // this is how I call method compute() that calculates PI using 
> transformations
>    return objPI.compute(); } });
>
>    double pi = PI.collect().get(0);
>    System.out.println("We estimate Pi to be: " + pi);   }
> // this class is of no impotance for my question, howerver, it is relevant 
> for pi calculation public static class Sampler implements MapFunction<Long, 
> Long> {@Overridepublic Long map(Long value) {
>     double x = Math.random();
>     double y = Math.random();
>     return (x * x + y * y) < 1 ? 1L : 0L;}}
> // this class is of no impotance for my question, howerver, it is relevant 
> for pi calculation public static final class SumReducer implements 
> ReduceFunction<Long>{
>   @Override
>   public Long reduce(Long value1, Long value2) {
>   return value1 + value2;}}
> // this is my class that computes PI, my question is whether such a class is 
> valid in Flink on  cluster with parallel computation public static final 
> class classPI{
>    public Integer NumIter;
>    private final ExecutionEnvironment env;
>    public Double PI;
>
>    // this is constructor with another ExecutionEnvironment
>    public   classPI(){
>            this.NumIter=1000000;
>             env = ExecutionEnvironment.getExecutionEnvironment();
>    }
>    //This is the the method that contains all data transformation
>    public Double compute() throws Exception{
>          DataSet<Long> count = env.generateSequence(1, NumIter)
>                                .map(new Sampler())
>                                .reduce(new SumReducer());
>          PI = 4.0*count.collect().get(0)/NumIter;
>          return  PI;}}}
>
>

Reply via email to