Thanks, Stephan. So the follow-up question - Is it possible to cast an operator from optimizers/common API to Java API? ( as Java to Common is possible via translateToDataFlow()) For eg - org.apache.flink.api.common.operators.base.JoinOperatorBase; to org.apache.flink.api.java.operators.JoinOperator;
JavaPlan, too, in turn gives us the List of Common API operators. Thanks for your help. Thanks and Regards Amit Pawar On Mon, Apr 27, 2015 at 3:18 PM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > It seems you are actually working on the Optimizer's representation of the > program. The optimizer (and the runtime as well) think about Flink programs > in terms of data flows, not data sets any more (that is only in the > Java/Scala API). > > The Java API operations get translated to the data flow operators for > example in > > "org.apache.flink.api.java.operators.SingleInputOperator#translateToDataFlow()". > > Actually, even the Java API is in some sense only a fluid data flow builder > pattern. All DataSets are actually the results from operators and the > concrete implementations of the abstract data set class are the operator > classes. > > Greetings, > Stephan > > > > On Mon, Apr 27, 2015 at 2:51 PM, Amit Pawar <amitpawar5...@gmail.com> > wrote: > > > Hi > > > > For a given Operator<?>, it is easily possible to find the input and > output > > TypeInformation<?>,which gives the datatype and other information of the > > dataset. For e.g. > > > > if(operator instanceof SingleInputOperator){ > > > > > inputTypes.add(((SingleInputOperator)operator).getOperatorInfo().getInputType()); > > } > > > > For a given DAG connection, we can determine source and target, also in > > terms of Operator. > > > > Is it possible to determine on which dataset is the operator working on? > > > > Thanks and Regards > > Amit Pawar > > > > > > On Thu, Apr 23, 2015 at 3:01 PM, Stephan Ewen <se...@apache.org> wrote: > > > > > Okay, nice to hear! > > > > > > Ping us if you run into other trouble... > > > > > > On Thu, Apr 23, 2015 at 2:28 PM, Amit Pawar <amitpawar5...@gmail.com> > > > wrote: > > > > > > > Thanks Stephan. > > > > Using flink-dist jar solves the issue. > > > > > > > > > > > > Thanks and Regards > > > > Amit Pawar > > > > > > > > > > > > On Thu, Apr 23, 2015 at 2:02 PM, Stephan Ewen <se...@apache.org> > > wrote: > > > > > > > > > Hi! > > > > > > > > > > What you describe sounds pretty much like a version mixup - > > > NoSuchMethod > > > > > indicates one part of the code is out of sync with the other. Can > you > > > > make > > > > > sure that you have all jars from the same Flink version in the > > > classpath? > > > > > > > > > > For the Optimizer Exception: The cause may be a similar issue > > (version > > > > > mixup) or a completely missing jar file. If you use the big jar > file > > > from > > > > > flink-dist in version 0.9, does that error occur? > > > > > > > > > > Greetings, > > > > > Stephan > > > > > > > > > > > > > > > On Thu, Apr 23, 2015 at 2:01 AM, Amit Pawar < > amitpawar5...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Many thanks Stephan. > > > > > > I followed your instructions and it was working fine when I had > the > > > > > > required flink projects in the build path, > > > > > > later when I substituted it by adding respective dependencies > with > > > the > > > > > > snapshots in pom, I am getting the below exception at > > > > > > > > > > > > OptimizedPlan opPlan = op.compile(env.createProgramPlan()); > > > > > > > > > > > > Exception in thread "main" > > > > org.apache.flink.optimizer.CompilerException: > > > > > > Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not > > an > > > > > > optimizer post-pass. > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573) > > > > > > at > org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > > > > > > at thesis.examples.SampleTest.main(SampleTest.java:189) > > > > > > Caused by: java.lang.ClassCastException: class > > > > > > org.apache.flink.compiler.postpass.JavaApiPostPass > > > > > > at java.lang.Class.asSubclass(Class.java:3208) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557) > > > > > > ... 2 more > > > > > > > > > > > > I can work around that by having the necessary flink projects in > > > > > > eclipse/build path, but then I face different issue of scala, No > > > such > > > > > > method exception on env.execute(); > > > > > > > > > > > > Please advise. > > > > > > > > > > > > Thanks and Regards > > > > > > Amit Pawar > > > > > > > > > > > > > > > > > > On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <se...@apache.org> > > > > wrote: > > > > > > > > > > > > > Hi Amit! > > > > > > > > > > > > > > The DataSet API is basically a fluent builder for the internal > > DAG > > > of > > > > > > > operations, the "Plan". This plan is build when you call > > > > > "env.execute()". > > > > > > > > > > > > > > You can directly get the Plan by calling > > > > > > > ExecutionEnvironment#createProgramPlan() > > > > > > > > > > > > > > The JSON plan has in addition the information inserted by the > > > > Optimizer > > > > > > > (what partitioning to use where, what keys to use). This is > > called > > > > the > > > > > > > "OptimizedPlan". > > > > > > > To obtain that, you have to push the Plan through the > Optimizer: > > > > > > > "OptimizedPlan op = new Optimizer(new DataStaristics(), new > > > > > > > DefaultCostEstimator()).compile(plan)" > > > > > > > > > > > > > > That optimized plan has everything in information for the > > > execution. > > > > > The > > > > > > > JSON is created from that OptimizedPlan via "new > > > > > > > PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)" > > > > > > > > > > > > > > Note: These classnames and instructions refer to Flink 0.9. For > > > > version > > > > > > > 0.8, the names are a bit different. > > > > > > > > > > > > > > Greetings, > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar < > > > amitpawar5...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi > > > > > > > > > > > > > > > > I am trying to extract/retrieve the Flink execution plan. I > > > managed > > > > > to > > > > > > > get > > > > > > > > it as JSON string in following ways: > > > > > > > > 1. Using JAR - via PackagedProgram using getPreviewPlan() ; > or > > > > > > > > 2. Directly in program - via ExecutionEnvironment's > > > > > getExecutionPlan() > > > > > > > > > > > > > > > > My question is - Is it possible to retrieve directly the Plan > > > > object? > > > > > > > > I tried for this but was not successful as submitting the jar > > > takes > > > > > us > > > > > > > into > > > > > > > > interactive mode, and in order to use the other mode, > > > > > > programEntryPoint, > > > > > > > > the main class needs to implement Program interface with > > getPlan > > > > > > method. > > > > > > > > > > > > > > > > Even if we manage to get the execution plan as a Plan object, > > > will > > > > it > > > > > > be > > > > > > > > different from what we have using JSON string? like in terms > > of - > > > > > > > > 1. What are the datatypes used in the dataset's tuple > > > > > > > > 2. On what key is the Join taking place > > > > > > > > 3. Filtering predicate > > > > > > > > 4. Field for Distinct and so on > > > > > > > > (JSON plan does have the operator tree but the contents field > > > > points > > > > > to > > > > > > > the > > > > > > > > line of code in the class, which is not that helpful) > > > > > > > > > > > > > > > > If not, is it possible (by some other way) to get the above > > > details > > > > > > just > > > > > > > by > > > > > > > > using the Flink job/jar as an input? > > > > > > > > > > > > > > > > > > > > > > > > Thanks and Regards > > > > > > > > Amit Pawar > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >