Hi Robert,

In addition to the questions described by Dian, I also want to know what
difficult problems Py4j's solution will encounter in add UDF support, which
you mentioned as follows:

Using something like Py4j is an easy way to get up an running, especially
> for a very faithful API, but the instant one wants to add UDFs one hits a
> cliff of sorts (which is surmountable, but likely a lot harder than having
> gone the above approach).


I appreciate if you can share more specific cases?

Thanks,
Jincheng

Dian Fu <dian0511...@gmail.com> 于2019年4月25日周四 上午11:53写道:

> Thanks everyone for the discussion here.
>
> Regarding to the Java/Scala UDF and the built-in UDF to execute in the
> current Flink way (directly in JVM, not via RPC), I share the same thoughts
> with Max and Robert and I think it will not be a big problem. From the
> design doc, I guess the main reason to take the Py4J way instead of the DAG
> way at present is that DAG has some limitations in some scenarios such as
> interactive programing which may be a strong requirement for data scientist.
>
> > In addition (and I'll admit this is rather subjective) it seems to me
> one of the primary values of a table-like API in a given language (vs. just
> using (say) plain old SQL itself via a console) is the ability to embed it
> in a larger pipeline, or at least drop in operations that are not (as)
> naturally expressed in the "table way," including existing libraries. In
> other words, a full SDK. The Py4j wrapping doesn't extend itself to such
> integration nearly as easily.
>
>
> Hi Robert, regarding to "a larger pipeline", do you mean translating a
> table-like API jobs from/to another kind of API job or embedding third-part
> libraries into a table-like API jobs via UDF? Could you kindly explain why
> this would be a problem for Py4J and will not be a problem if expressing
> the job with DAG?
>
> Thanks,
> Dian
>
>
> > 在 2019年4月25日,上午12:16,Robert Bradshaw <rober...@google.com> 写道:
> >
> > Thanks for the meeting summary, Stephan. Sound like you covered a lot of
> ground. Some more comments below, adding onto what Max has said.
> >
> > On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <m...@apache.org
> <mailto:m...@apache.org>> wrote:
> > >
> > > Hi Stephan,
> > >
> > > This is excited! Thanks for sharing. The inter-process communication
> > > code looks like the most natural choice as a common ground. To go
> > > further, there are indeed some challenges to solve.
> >
> > It certainly does make sense to share this work, though it does to me
> seem like a rather low level to integrate at.
> >
> > > > => Biggest question is whether the language-independent DAG is
> expressive enough to capture all the expressions that we want to map
> directly to Table API expressions. Currently much is hidden in opaque UDFs.
> Kenn mentioned the structure should be flexible enough to capture more
> expressions transparently.
> > >
> > > Just to add some context how this could be done, there is the concept
> of
> > > a FunctionSpec which is part of a transform in the DAG. FunctionSpec
> > > contains a URN and with a payload. FunctionSpec can be either (1)
> > > translated by the Runner directly, e.g. map to table API concepts or
> (2)
> > > run a user-defined function with an Environment. It could be feasible
> > > for Flink to choose the direct path, whereas Beam Runners would
> leverage
> > > the more generic approach using UDFs. Granted, compatibility across
> > > Flink and Beam would only work if both of the translation paths yielded
> > > the same semantics.
> >
> > To elaborate a bit on this, Beam DAGs are built up by applying
> Transforms (basically operations) to PColections (the equivalent of
> dataset/datastream), but the key point here is that these transforms are
> often composite operations that expand out into smaller subtransforms. This
> expansion happens during pipeline construction, but with the recent work on
> cross language pipelines can happen out of process. This is one point of
> extendability. Secondly, and importantly, this composite structure is
> preserved in the DAG, and so a runner is free to ignore the provided
> expansion and supply its own (so long as semantically it produces exactly
> the same output). These composite operations can be identified by arbitrary
> URNs + payloads, and any runner that does not understand them simply uses
> the pre-provided expansion.
> >
> > The existing Flink runner operates on exactly this principle,
> translating URNs for the leaf operations (Map, Flatten, ...) as well as
> some composites it can do better (e.g. Reshard). It is intentionally easy
> to define and add new ones. This actually seems the easier approach (to me
> at least, but that's probably heavily influenced by what I'm familiar with
> vs. what I'm not).
> >
> > As for how well this maps onto the Flink Tables API, part of that
> depends on how much of the API is the operations themselves, and how much
> is concerning configuration/environment/etc. which is harder to talk about
> in an agnostic way.
> >
> > Using something like Py4j is an easy way to get up an running,
> especially for a very faithful API, but the instant one wants to add UDFs
> one hits a cliff of sorts (which is surmountable, but likely a lot harder
> than having gone the above approach). In addition (and I'll admit this is
> rather subjective) it seems to me one of the primary values of a table-like
> API in a given language (vs. just using (say) plain old SQL itself via a
> console) is the ability to embed it in a larger pipeline, or at least drop
> in operations that are not (as) naturally expressed in the "table way,"
> including existing libraries. In other words, a full SDK. The Py4j wrapping
> doesn't extend itself to such integration nearly as easily.
> >
> > But I really do understand the desire to not block immediate work (and
> value) for a longer term solution.
> >
> > > >  If the DAG is generic enough to capture the additional information,
> we probably still need some standardization, so that all the different
> language APIs represent their expressions the same way
> > >
> > > I wonder whether that's necessary as a first step. I think it would be
> > > fine for Flink to have its own way to represent API concepts in the
> Beam
> > > DAG which Beam Runners may not be able to understand. We could then
> > > successively add the capability for these transforms to run with Beam.
> > >
> > > >  Similarly, it makes sense to standardize the type system (and type
> inference) as far as built-in expressions and their interaction with UDFs
> are concerned. The Flink Table API and Blink teams found this to be
> essential for a consistent API behavior. This would not prevent all-UDF
> programs from still using purely binary/opaque types.
> > >
> > > Beam has a set of standard coders which can be used across languages.
> We
> > > will have to expand those to play well with Flink's:
> > >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types
> >
> > >
> > > I think we will need to exchange more ideas to work out a model that
> > > will work for both Flink and Beam. A regular meeting could be helpful.
> >
> > +1, I think this would be really good for both this effort and general
> collaboration between the Beam and Flink communities.
> >
> > > Thanks,
> > > Max
> > >
> > > On 23.04.19 21:23, Stephan Ewen wrote:
> > > > Hi all!
> > > >
> > > > Below are my notes on the discussion last week on how to collaborate
> > > > between Beam and Flink.
> > > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei,
> Shaoxuan,
> > > > Jincheng, and me.
> > > >
> > > > This represents my understanding of the discussion, please augment
> this
> > > > where I missed something or where your conclusion was different.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > > =======================================================
> > > >
> > > > *Beams Python and Portability Framework*
> > > >
> > > >    - Portability core to Beam
> > > >    - Language independent dataflow DAG that is defined via ProtoBuf
> > > >    - DAG can be generated from various languages (Java, Python, Go)
> > > >    - The DAG describes the pipelines and contains additional
> parameters
> > > > to describe each operator, and contains artifacts that need to be
> > > > deployed / executed as part of an operator execution.
> > > >    - Operators execute in language-specific containers, data is
> > > > exchanged between the language-specific container and the runner
> > > > container (JVM) via gRPC.
> > > >
> > > > *Flink's desiderata for Python API*
> > > >
> > > >    - Python API should mirror Java / Scala Table API
> > > >    - All relational expressions that correspond to built-in functions
> > > > should be translated to corresponding expressions in the Table API.
> That
> > > > way the planner generated Java code for the data types and built-in
> > > > expressions, meaning no Python code is necessary during execution
> > > >    - UDFs should be supported and run similarly as in Beam's approach
> > > >    - Python programs should be similarly created and
> submitted/deployed
> > > > as Java / Scala programs (CLI, web, containerized, etc.)
> > > >
> > > > *Consensus to share inter-process communication code*
> > > >
> > > >    - Crucial code for robust setup and high performance data exchange
> > > > across processes
> > > >    - The code for the SDK harness, the artifact boostrapping, and the
> > > > data exchange make sense to share.
> > > >    - Ongoing discussion whether this can be a dedicated module with
> slim
> > > > dependencies in Beam
> > > >
> > > > *Potential Long Term Perspective: Share language-independent DAG
> > > > representation*
> > > >
> > > >    - Beam's language independent DAG could become a standard
> > > > representation used in both projects
> > > >    - Flink would need an way to receive that DAG, map it to the Table
> > > > API, execute it from there
> > > >    - The DAG would need to have a standardized representation of
> > > > functions and expressions that then get mapped to Table API
> expressions
> > > > to let the planner optimize those and generate Java code for those
> > > >    - Similar as UDFs are supported in the Table API, there would be
> > > > additional "external UDFs" that would go through the above mentioned
> > > > inter-process communication layer
> > > >
> > > >    - _Advantages:_
> > > >      => Flink and Beam could share more language bindings
> > > >      => Flink would execute Beam portability programs fast, without
> > > > intermediate abstraction and directly in the JVM for many operators.
> > > >           Abstraction is necessary around UDFs and to bridge between
> > > > serializers / coders, etc.
> > > >
> > > >    - _Open issues:_
> > > >      => Biggest question is whether the language-independent DAG is
> > > > expressive enough to capture all the expressions that we want to map
> > > > directly to Table API expressions. Currently much is hidden in opaque
> > > > UDFs. Kenn mentioned the structure should be flexible enough to
> capture
> > > > more expressions transparently.
> > > >
> > > >      => If the DAG is generic enough to capture the additional
> > > > information, we probably still need some standardization, so that all
> > > > the different language APIs represent their expressions the same way
> > > >      => Similarly, it makes sense to standardize the type system (and
> > > > type inference) as far as built-in expressions and their interaction
> > > > with UDFs are concerned. The Flink Table API and Blink teams found
> this
> > > > to be essential for a consistent API behavior. This would not prevent
> > > > all-UDF programs from still using purely binary/opaque types.
> > > >
> > > >   =>  We need to create a Python API that follows the same structure
> as
> > > > Flink's Table API that produces the language-independent DAG
> > > >
> > > > *Short-term approach in Flink*
> > > >
> > > >    - Goal is to not block Flink's Python effort on the long term
> > > > approach and the necessary design and evolution of the
> > > > language-independent DAG.
> > > >    - Depending on what the outcome of above investigation is, Flink
> may
> > > > initially go with a simple approach to map the Python Table API to
> the
> > > > the Java Table API via Py4J, as outlined in FLIP-38:
> > > >
> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
> <
> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
> >
>

Reply via email to