Thanks everyone for the discussion here. Regarding to the Java/Scala UDF and the built-in UDF to execute in the current Flink way (directly in JVM, not via RPC), I share the same thoughts with Max and Robert and I think it will not be a big problem. From the design doc, I guess the main reason to take the Py4J way instead of the DAG way at present is that DAG has some limitations in some scenarios such as interactive programing which may be a strong requirement for data scientist.
> In addition (and I'll admit this is rather subjective) it seems to me one of > the primary values of a table-like API in a given language (vs. just using > (say) plain old SQL itself via a console) is the ability to embed it in a > larger pipeline, or at least drop in operations that are not (as) naturally > expressed in the "table way," including existing libraries. In other words, a > full SDK. The Py4j wrapping doesn't extend itself to such integration nearly > as easily. Hi Robert, regarding to "a larger pipeline", do you mean translating a table-like API jobs from/to another kind of API job or embedding third-part libraries into a table-like API jobs via UDF? Could you kindly explain why this would be a problem for Py4J and will not be a problem if expressing the job with DAG? Thanks, Dian > 在 2019年4月25日,上午12:16,Robert Bradshaw <rober...@google.com> 写道: > > Thanks for the meeting summary, Stephan. Sound like you covered a lot of > ground. Some more comments below, adding onto what Max has said. > > On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org>> wrote: > > > > Hi Stephan, > > > > This is excited! Thanks for sharing. The inter-process communication > > code looks like the most natural choice as a common ground. To go > > further, there are indeed some challenges to solve. > > It certainly does make sense to share this work, though it does to me seem > like a rather low level to integrate at. > > > > => Biggest question is whether the language-independent DAG is expressive > > > enough to capture all the expressions that we want to map directly to > > > Table API expressions. Currently much is hidden in opaque UDFs. Kenn > > > mentioned the structure should be flexible enough to capture more > > > expressions transparently. > > > > Just to add some context how this could be done, there is the concept of > > a FunctionSpec which is part of a transform in the DAG. FunctionSpec > > contains a URN and with a payload. FunctionSpec can be either (1) > > translated by the Runner directly, e.g. map to table API concepts or (2) > > run a user-defined function with an Environment. It could be feasible > > for Flink to choose the direct path, whereas Beam Runners would leverage > > the more generic approach using UDFs. Granted, compatibility across > > Flink and Beam would only work if both of the translation paths yielded > > the same semantics. > > To elaborate a bit on this, Beam DAGs are built up by applying Transforms > (basically operations) to PColections (the equivalent of dataset/datastream), > but the key point here is that these transforms are often composite > operations that expand out into smaller subtransforms. This expansion happens > during pipeline construction, but with the recent work on cross language > pipelines can happen out of process. This is one point of extendability. > Secondly, and importantly, this composite structure is preserved in the DAG, > and so a runner is free to ignore the provided expansion and supply its own > (so long as semantically it produces exactly the same output). These > composite operations can be identified by arbitrary URNs + payloads, and any > runner that does not understand them simply uses the pre-provided expansion. > > The existing Flink runner operates on exactly this principle, translating > URNs for the leaf operations (Map, Flatten, ...) as well as some composites > it can do better (e.g. Reshard). It is intentionally easy to define and add > new ones. This actually seems the easier approach (to me at least, but that's > probably heavily influenced by what I'm familiar with vs. what I'm not). > > As for how well this maps onto the Flink Tables API, part of that depends on > how much of the API is the operations themselves, and how much is concerning > configuration/environment/etc. which is harder to talk about in an agnostic > way. > > Using something like Py4j is an easy way to get up an running, especially for > a very faithful API, but the instant one wants to add UDFs one hits a cliff > of sorts (which is surmountable, but likely a lot harder than having gone the > above approach). In addition (and I'll admit this is rather subjective) it > seems to me one of the primary values of a table-like API in a given language > (vs. just using (say) plain old SQL itself via a console) is the ability to > embed it in a larger pipeline, or at least drop in operations that are not > (as) naturally expressed in the "table way," including existing libraries. In > other words, a full SDK. The Py4j wrapping doesn't extend itself to such > integration nearly as easily. > > But I really do understand the desire to not block immediate work (and value) > for a longer term solution. > > > > If the DAG is generic enough to capture the additional information, we > > > probably still need some standardization, so that all the different > > > language APIs represent their expressions the same way > > > > I wonder whether that's necessary as a first step. I think it would be > > fine for Flink to have its own way to represent API concepts in the Beam > > DAG which Beam Runners may not be able to understand. We could then > > successively add the capability for these transforms to run with Beam. > > > > > Similarly, it makes sense to standardize the type system (and type > > > inference) as far as built-in expressions and their interaction with UDFs > > > are concerned. The Flink Table API and Blink teams found this to be > > > essential for a consistent API behavior. This would not prevent all-UDF > > > programs from still using purely binary/opaque types. > > > > Beam has a set of standard coders which can be used across languages. We > > will have to expand those to play well with Flink's: > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types > > > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types> > > > > I think we will need to exchange more ideas to work out a model that > > will work for both Flink and Beam. A regular meeting could be helpful. > > +1, I think this would be really good for both this effort and general > collaboration between the Beam and Flink communities. > > > Thanks, > > Max > > > > On 23.04.19 21:23, Stephan Ewen wrote: > > > Hi all! > > > > > > Below are my notes on the discussion last week on how to collaborate > > > between Beam and Flink. > > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan, > > > Jincheng, and me. > > > > > > This represents my understanding of the discussion, please augment this > > > where I missed something or where your conclusion was different. > > > > > > Best, > > > Stephan > > > > > > ======================================================= > > > > > > *Beams Python and Portability Framework* > > > > > > - Portability core to Beam > > > - Language independent dataflow DAG that is defined via ProtoBuf > > > - DAG can be generated from various languages (Java, Python, Go) > > > - The DAG describes the pipelines and contains additional parameters > > > to describe each operator, and contains artifacts that need to be > > > deployed / executed as part of an operator execution. > > > - Operators execute in language-specific containers, data is > > > exchanged between the language-specific container and the runner > > > container (JVM) via gRPC. > > > > > > *Flink's desiderata for Python API* > > > > > > - Python API should mirror Java / Scala Table API > > > - All relational expressions that correspond to built-in functions > > > should be translated to corresponding expressions in the Table API. That > > > way the planner generated Java code for the data types and built-in > > > expressions, meaning no Python code is necessary during execution > > > - UDFs should be supported and run similarly as in Beam's approach > > > - Python programs should be similarly created and submitted/deployed > > > as Java / Scala programs (CLI, web, containerized, etc.) > > > > > > *Consensus to share inter-process communication code* > > > > > > - Crucial code for robust setup and high performance data exchange > > > across processes > > > - The code for the SDK harness, the artifact boostrapping, and the > > > data exchange make sense to share. > > > - Ongoing discussion whether this can be a dedicated module with slim > > > dependencies in Beam > > > > > > *Potential Long Term Perspective: Share language-independent DAG > > > representation* > > > > > > - Beam's language independent DAG could become a standard > > > representation used in both projects > > > - Flink would need an way to receive that DAG, map it to the Table > > > API, execute it from there > > > - The DAG would need to have a standardized representation of > > > functions and expressions that then get mapped to Table API expressions > > > to let the planner optimize those and generate Java code for those > > > - Similar as UDFs are supported in the Table API, there would be > > > additional "external UDFs" that would go through the above mentioned > > > inter-process communication layer > > > > > > - _Advantages:_ > > > => Flink and Beam could share more language bindings > > > => Flink would execute Beam portability programs fast, without > > > intermediate abstraction and directly in the JVM for many operators. > > > Abstraction is necessary around UDFs and to bridge between > > > serializers / coders, etc. > > > > > > - _Open issues:_ > > > => Biggest question is whether the language-independent DAG is > > > expressive enough to capture all the expressions that we want to map > > > directly to Table API expressions. Currently much is hidden in opaque > > > UDFs. Kenn mentioned the structure should be flexible enough to capture > > > more expressions transparently. > > > > > > => If the DAG is generic enough to capture the additional > > > information, we probably still need some standardization, so that all > > > the different language APIs represent their expressions the same way > > > => Similarly, it makes sense to standardize the type system (and > > > type inference) as far as built-in expressions and their interaction > > > with UDFs are concerned. The Flink Table API and Blink teams found this > > > to be essential for a consistent API behavior. This would not prevent > > > all-UDF programs from still using purely binary/opaque types. > > > > > > => We need to create a Python API that follows the same structure as > > > Flink's Table API that produces the language-independent DAG > > > > > > *Short-term approach in Flink* > > > > > > - Goal is to not block Flink's Python effort on the long term > > > approach and the necessary design and evolution of the > > > language-independent DAG. > > > - Depending on what the outcome of above investigation is, Flink may > > > initially go with a simple approach to map the Python Table API to the > > > the Java Table API via Py4J, as outlined in FLIP-38: > > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8 > > > > > > <https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8>