On Thu, Apr 25, 2019 at 5:59 AM Dian Fu <dian0511...@gmail.com> wrote:
>
> Thanks everyone for the discussion here.
>
> Regarding to the Java/Scala UDF and the built-in UDF to execute in the 
> current Flink way (directly in JVM, not via RPC), I share the same thoughts 
> with Max and Robert and I think it will not be a big problem. From the design 
> doc, I guess the main reason to take the Py4J way instead of the DAG way at 
> present is that DAG has some limitations in some scenarios such as 
> interactive programing which may be a strong requirement for data scientist.

I definitely agree that interactive is  strong requirement for the
data scientist (and others). I don't think this is incompatible with
the DAG model, and something I want to see more of. (For one
exploration, see BeamPython's (still WIP) InteractiveRunner). There
are lots of interesting challenges here (e.g. sampling, partial
results, optimal caching of results vs. re-execution, especially in
the face of fusion) that would be worth working out together.

> In addition (and I'll admit this is rather subjective) it seems to me one of 
> the primary values of a table-like API in a given language (vs. just using 
> (say) plain old SQL itself via a console) is the ability to embed it in a 
> larger pipeline, or at least drop in operations that are not (as) naturally 
> expressed in the "table way," including existing libraries. In other words, a 
> full SDK. The Py4j wrapping doesn't extend itself to such integration nearly 
> as easily.
>
> Hi Robert, regarding to "a larger pipeline", do you mean translating a 
> table-like API jobs from/to another kind of API job or embedding third-part 
> libraries into a table-like API jobs via UDF? Could you kindly explain why 
> this would be a problem for Py4J and will not be a problem if expressing the 
> job with DAG?

I'm talking about anything one would want to do after
tableEnv.toDataSet() or before tableEnv.registerTable(...). Unless you
plan on also wrapping the DataSet/DataStream APIs too, which is a much
taller task. Let alone wrapping all the libraries one might want to
use that are built on these APIs.

If this is instead integrated at a higher level, you could swap back
and forth between the new Tables API and the existing Python SDK
(including libraries such as TFX, and cross langauge capabilities)
almost for free.

> 在 2019年4月25日,上午12:16,Robert Bradshaw <rober...@google.com> 写道:
>
> Thanks for the meeting summary, Stephan. Sound like you covered a lot of 
> ground. Some more comments below, adding onto what Max has said.
>
> On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <m...@apache.org> wrote:
> >
> > Hi Stephan,
> >
> > This is excited! Thanks for sharing. The inter-process communication
> > code looks like the most natural choice as a common ground. To go
> > further, there are indeed some challenges to solve.
>
> It certainly does make sense to share this work, though it does to me seem 
> like a rather low level to integrate at.
>
> > > => Biggest question is whether the language-independent DAG is expressive 
> > > enough to capture all the expressions that we want to map directly to 
> > > Table API expressions. Currently much is hidden in opaque UDFs. Kenn 
> > > mentioned the structure should be flexible enough to capture more 
> > > expressions transparently.
> >
> > Just to add some context how this could be done, there is the concept of
> > a FunctionSpec which is part of a transform in the DAG. FunctionSpec
> > contains a URN and with a payload. FunctionSpec can be either (1)
> > translated by the Runner directly, e.g. map to table API concepts or (2)
> > run a user-defined function with an Environment. It could be feasible
> > for Flink to choose the direct path, whereas Beam Runners would leverage
> > the more generic approach using UDFs. Granted, compatibility across
> > Flink and Beam would only work if both of the translation paths yielded
> > the same semantics.
>
> To elaborate a bit on this, Beam DAGs are built up by applying Transforms 
> (basically operations) to PColections (the equivalent of dataset/datastream), 
> but the key point here is that these transforms are often composite 
> operations that expand out into smaller subtransforms. This expansion happens 
> during pipeline construction, but with the recent work on cross language 
> pipelines can happen out of process. This is one point of extendability. 
> Secondly, and importantly, this composite structure is preserved in the DAG, 
> and so a runner is free to ignore the provided expansion and supply its own 
> (so long as semantically it produces exactly the same output). These 
> composite operations can be identified by arbitrary URNs + payloads, and any 
> runner that does not understand them simply uses the pre-provided expansion.
>
> The existing Flink runner operates on exactly this principle, translating 
> URNs for the leaf operations (Map, Flatten, ...) as well as some composites 
> it can do better (e.g. Reshard). It is intentionally easy to define and add 
> new ones. This actually seems the easier approach (to me at least, but that's 
> probably heavily influenced by what I'm familiar with vs. what I'm not).
>
> As for how well this maps onto the Flink Tables API, part of that depends on 
> how much of the API is the operations themselves, and how much is concerning 
> configuration/environment/etc. which is harder to talk about in an agnostic 
> way.
>
> Using something like Py4j is an easy way to get up an running, especially for 
> a very faithful API, but the instant one wants to add UDFs one hits a cliff 
> of sorts (which is surmountable, but likely a lot harder than having gone the 
> above approach). In addition (and I'll admit this is rather subjective) it 
> seems to me one of the primary values of a table-like API in a given language 
> (vs. just using (say) plain old SQL itself via a console) is the ability to 
> embed it in a larger pipeline, or at least drop in operations that are not 
> (as) naturally expressed in the "table way," including existing libraries. In 
> other words, a full SDK. The Py4j wrapping doesn't extend itself to such 
> integration nearly as easily.
>
> But I really do understand the desire to not block immediate work (and value) 
> for a longer term solution.
>
> > >  If the DAG is generic enough to capture the additional information, we 
> > > probably still need some standardization, so that all the different 
> > > language APIs represent their expressions the same way
> >
> > I wonder whether that's necessary as a first step. I think it would be
> > fine for Flink to have its own way to represent API concepts in the Beam
> > DAG which Beam Runners may not be able to understand. We could then
> > successively add the capability for these transforms to run with Beam.
> >
> > >  Similarly, it makes sense to standardize the type system (and type 
> > > inference) as far as built-in expressions and their interaction with UDFs 
> > > are concerned. The Flink Table API and Blink teams found this to be 
> > > essential for a consistent API behavior. This would not prevent all-UDF 
> > > programs from still using purely binary/opaque types.
> >
> > Beam has a set of standard coders which can be used across languages. We
> > will have to expand those to play well with Flink's:
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types
> >
> > I think we will need to exchange more ideas to work out a model that
> > will work for both Flink and Beam. A regular meeting could be helpful.
>
> +1, I think this would be really good for both this effort and general 
> collaboration between the Beam and Flink communities.
>
> > Thanks,
> > Max
> >
> > On 23.04.19 21:23, Stephan Ewen wrote:
> > > Hi all!
> > >
> > > Below are my notes on the discussion last week on how to collaborate
> > > between Beam and Flink.
> > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
> > > Jincheng, and me.
> > >
> > > This represents my understanding of the discussion, please augment this
> > > where I missed something or where your conclusion was different.
> > >
> > > Best,
> > > Stephan
> > >
> > > =======================================================
> > >
> > > *Beams Python and Portability Framework*
> > >
> > >    - Portability core to Beam
> > >    - Language independent dataflow DAG that is defined via ProtoBuf
> > >    - DAG can be generated from various languages (Java, Python, Go)
> > >    - The DAG describes the pipelines and contains additional parameters
> > > to describe each operator, and contains artifacts that need to be
> > > deployed / executed as part of an operator execution.
> > >    - Operators execute in language-specific containers, data is
> > > exchanged between the language-specific container and the runner
> > > container (JVM) via gRPC.
> > >
> > > *Flink's desiderata for Python API*
> > >
> > >    - Python API should mirror Java / Scala Table API
> > >    - All relational expressions that correspond to built-in functions
> > > should be translated to corresponding expressions in the Table API. That
> > > way the planner generated Java code for the data types and built-in
> > > expressions, meaning no Python code is necessary during execution
> > >    - UDFs should be supported and run similarly as in Beam's approach
> > >    - Python programs should be similarly created and submitted/deployed
> > > as Java / Scala programs (CLI, web, containerized, etc.)
> > >
> > > *Consensus to share inter-process communication code*
> > >
> > >    - Crucial code for robust setup and high performance data exchange
> > > across processes
> > >    - The code for the SDK harness, the artifact boostrapping, and the
> > > data exchange make sense to share.
> > >    - Ongoing discussion whether this can be a dedicated module with slim
> > > dependencies in Beam
> > >
> > > *Potential Long Term Perspective: Share language-independent DAG
> > > representation*
> > >
> > >    - Beam's language independent DAG could become a standard
> > > representation used in both projects
> > >    - Flink would need an way to receive that DAG, map it to the Table
> > > API, execute it from there
> > >    - The DAG would need to have a standardized representation of
> > > functions and expressions that then get mapped to Table API expressions
> > > to let the planner optimize those and generate Java code for those
> > >    - Similar as UDFs are supported in the Table API, there would be
> > > additional "external UDFs" that would go through the above mentioned
> > > inter-process communication layer
> > >
> > >    - _Advantages:_
> > >      => Flink and Beam could share more language bindings
> > >      => Flink would execute Beam portability programs fast, without
> > > intermediate abstraction and directly in the JVM for many operators.
> > >           Abstraction is necessary around UDFs and to bridge between
> > > serializers / coders, etc.
> > >
> > >    - _Open issues:_
> > >      => Biggest question is whether the language-independent DAG is
> > > expressive enough to capture all the expressions that we want to map
> > > directly to Table API expressions. Currently much is hidden in opaque
> > > UDFs. Kenn mentioned the structure should be flexible enough to capture
> > > more expressions transparently.
> > >
> > >      => If the DAG is generic enough to capture the additional
> > > information, we probably still need some standardization, so that all
> > > the different language APIs represent their expressions the same way
> > >      => Similarly, it makes sense to standardize the type system (and
> > > type inference) as far as built-in expressions and their interaction
> > > with UDFs are concerned. The Flink Table API and Blink teams found this
> > > to be essential for a consistent API behavior. This would not prevent
> > > all-UDF programs from still using purely binary/opaque types.
> > >
> > >   =>  We need to create a Python API that follows the same structure as
> > > Flink's Table API that produces the language-independent DAG
> > >
> > > *Short-term approach in Flink*
> > >
> > >    - Goal is to not block Flink's Python effort on the long term
> > > approach and the necessary design and evolution of the
> > > language-independent DAG.
> > >    - Depending on what the outcome of above investigation is, Flink may
> > > initially go with a simple approach to map the Python Table API to the
> > > the Java Table API via Py4J, as outlined in FLIP-38:
> > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
>
>

Reply via email to