Thanks, Stephan.

So the follow-up question -
Is it possible to cast an operator from optimizers/common API to Java
API? ( as Java to Common is possible via translateToDataFlow())
For eg -
org.apache.flink.api.common.operators.base.JoinOperatorBase;
to
org.apache.flink.api.java.operators.JoinOperator;

JavaPlan, too, in turn gives us the List of Common API operators.
Thanks for your help.

Thanks and Regards
Amit Pawar


On Mon, Apr 27, 2015 at 3:18 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> It seems you are actually working on the Optimizer's representation of the
> program. The optimizer (and the runtime as well) think about Flink programs
> in terms of data flows, not data sets any more (that is only in the
> Java/Scala API).
>
> The Java API operations get translated to the data flow operators for
> example in
>
> "org.apache.flink.api.java.operators.SingleInputOperator#translateToDataFlow()".
>
> Actually, even the Java API is in some sense only a fluid data flow builder
> pattern. All DataSets are actually the results from operators and the
> concrete implementations of the abstract data set class are the operator
> classes.
>
> Greetings,
> Stephan
>
>
>
> On Mon, Apr 27, 2015 at 2:51 PM, Amit Pawar <amitpawar5...@gmail.com>
> wrote:
>
> > Hi
> >
> > For a given Operator<?>, it is easily possible to find the input and
> output
> > TypeInformation<?>,which gives the datatype and other information of the
> > dataset. For e.g.
> >
> > if(operator instanceof SingleInputOperator){
> >
> >
> inputTypes.add(((SingleInputOperator)operator).getOperatorInfo().getInputType());
> > }
> >
> > For a given DAG connection, we can determine source and target, also in
> > terms of Operator.
> >
> > Is it possible to determine on which dataset is the operator working on?
> >
> > Thanks and Regards
> > Amit Pawar
> >
> >
> > On Thu, Apr 23, 2015 at 3:01 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > Okay, nice to hear!
> > >
> > > Ping us if you run into other trouble...
> > >
> > > On Thu, Apr 23, 2015 at 2:28 PM, Amit Pawar <amitpawar5...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Stephan.
> > > > Using flink-dist jar solves the issue.
> > > >
> > > >
> > > > Thanks and Regards
> > > > Amit Pawar
> > > >
> > > >
> > > > On Thu, Apr 23, 2015 at 2:02 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > >
> > > > > Hi!
> > > > >
> > > > > What you describe sounds pretty much like a version mixup -
> > > NoSuchMethod
> > > > > indicates one part of the code is out of sync with the other. Can
> you
> > > > make
> > > > > sure that you have all jars from the same Flink version in the
> > > classpath?
> > > > >
> > > > > For the Optimizer Exception: The cause may be a similar issue
> > (version
> > > > > mixup) or a completely missing jar file. If you use the big jar
> file
> > > from
> > > > > flink-dist in version 0.9, does that error occur?
> > > > >
> > > > > Greetings,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, Apr 23, 2015 at 2:01 AM, Amit Pawar <
> amitpawar5...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Many thanks Stephan.
> > > > > > I followed your instructions and it was working fine when I had
> the
> > > > > > required flink projects in the build path,
> > > > > > later when I substituted it by adding respective dependencies
> with
> > > the
> > > > > > snapshots in pom, I am getting the below exception at
> > > > > >
> > > > > > OptimizedPlan opPlan = op.compile(env.createProgramPlan());
> > > > > >
> > > > > > Exception in thread "main"
> > > > org.apache.flink.optimizer.CompilerException:
> > > > > > Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not
> > an
> > > > > > optimizer post-pass.
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573)
> > > > > > at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> > > > > > at thesis.examples.SampleTest.main(SampleTest.java:189)
> > > > > > Caused by: java.lang.ClassCastException: class
> > > > > > org.apache.flink.compiler.postpass.JavaApiPostPass
> > > > > > at java.lang.Class.asSubclass(Class.java:3208)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557)
> > > > > > ... 2 more
> > > > > >
> > > > > > I can work around that by having the necessary flink projects in
> > > > > > eclipse/build path, but then I face  different issue of scala, No
> > > such
> > > > > > method exception on env.execute();
> > > > > >
> > > > > > Please advise.
> > > > > >
> > > > > > Thanks and Regards
> > > > > > Amit Pawar
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <se...@apache.org>
> > > > wrote:
> > > > > >
> > > > > > > Hi Amit!
> > > > > > >
> > > > > > > The DataSet API is basically a fluent builder for the internal
> > DAG
> > > of
> > > > > > > operations, the "Plan". This plan is build when you call
> > > > > "env.execute()".
> > > > > > >
> > > > > > > You can directly get the Plan by calling
> > > > > > > ExecutionEnvironment#createProgramPlan()
> > > > > > >
> > > > > > > The JSON plan has in addition the information inserted by the
> > > > Optimizer
> > > > > > > (what partitioning to use where, what keys to use). This is
> > called
> > > > the
> > > > > > > "OptimizedPlan".
> > > > > > > To obtain that, you have to push the Plan through the
> Optimizer:
> > > > > > > "OptimizedPlan op = new Optimizer(new DataStaristics(), new
> > > > > > > DefaultCostEstimator()).compile(plan)"
> > > > > > >
> > > > > > > That optimized plan has everything in information for the
> > > execution.
> > > > > The
> > > > > > > JSON is created from that OptimizedPlan via "new
> > > > > > > PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"
> > > > > > >
> > > > > > > Note: These classnames and instructions refer to Flink 0.9. For
> > > > version
> > > > > > > 0.8, the names are a bit different.
> > > > > > >
> > > > > > > Greetings,
> > > > > > > Stephan
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <
> > > amitpawar5...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi
> > > > > > > >
> > > > > > > > I am trying to extract/retrieve the Flink execution plan. I
> > > managed
> > > > > to
> > > > > > > get
> > > > > > > > it as JSON string in following ways:
> > > > > > > > 1. Using JAR - via PackagedProgram using getPreviewPlan() ;
> or
> > > > > > > > 2. Directly in program - via ExecutionEnvironment's
> > > > > getExecutionPlan()
> > > > > > > >
> > > > > > > > My question is - Is it possible to retrieve directly the Plan
> > > > object?
> > > > > > > > I tried for this but was not successful as submitting the jar
> > > takes
> > > > > us
> > > > > > > into
> > > > > > > > interactive mode, and in order to use the other mode,
> > > > > > programEntryPoint,
> > > > > > > > the main class needs to implement Program interface with
> > getPlan
> > > > > > method.
> > > > > > > >
> > > > > > > > Even if we manage to get the execution plan as a Plan object,
> > > will
> > > > it
> > > > > > be
> > > > > > > > different from what we have using JSON string? like in terms
> > of -
> > > > > > > > 1. What are the datatypes used in the dataset's tuple
> > > > > > > > 2. On what key is the Join taking place
> > > > > > > > 3. Filtering predicate
> > > > > > > > 4. Field for Distinct and so on
> > > > > > > > (JSON plan does have the operator tree but the contents field
> > > > points
> > > > > to
> > > > > > > the
> > > > > > > > line of code in the class, which is not that helpful)
> > > > > > > >
> > > > > > > > If not, is it possible (by some other way) to get the above
> > > details
> > > > > > just
> > > > > > > by
> > > > > > > > using the Flink job/jar as an input?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks and Regards
> > > > > > > > Amit Pawar
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to