Hi Robert, In addition to the questions described by Dian, I also want to know what difficult problems Py4j's solution will encounter in add UDF support, which you mentioned as follows:
Using something like Py4j is an easy way to get up an running, especially > for a very faithful API, but the instant one wants to add UDFs one hits a > cliff of sorts (which is surmountable, but likely a lot harder than having > gone the above approach). I appreciate if you can share more specific cases? Thanks, Jincheng Dian Fu <dian0511...@gmail.com> 于2019年4月25日周四 上午11:53写道: > Thanks everyone for the discussion here. > > Regarding to the Java/Scala UDF and the built-in UDF to execute in the > current Flink way (directly in JVM, not via RPC), I share the same thoughts > with Max and Robert and I think it will not be a big problem. From the > design doc, I guess the main reason to take the Py4J way instead of the DAG > way at present is that DAG has some limitations in some scenarios such as > interactive programing which may be a strong requirement for data scientist. > > > In addition (and I'll admit this is rather subjective) it seems to me > one of the primary values of a table-like API in a given language (vs. just > using (say) plain old SQL itself via a console) is the ability to embed it > in a larger pipeline, or at least drop in operations that are not (as) > naturally expressed in the "table way," including existing libraries. In > other words, a full SDK. The Py4j wrapping doesn't extend itself to such > integration nearly as easily. > > > Hi Robert, regarding to "a larger pipeline", do you mean translating a > table-like API jobs from/to another kind of API job or embedding third-part > libraries into a table-like API jobs via UDF? Could you kindly explain why > this would be a problem for Py4J and will not be a problem if expressing > the job with DAG? > > Thanks, > Dian > > > > 在 2019年4月25日,上午12:16,Robert Bradshaw <rober...@google.com> 写道: > > > > Thanks for the meeting summary, Stephan. Sound like you covered a lot of > ground. Some more comments below, adding onto what Max has said. > > > > On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org>> wrote: > > > > > > Hi Stephan, > > > > > > This is excited! Thanks for sharing. The inter-process communication > > > code looks like the most natural choice as a common ground. To go > > > further, there are indeed some challenges to solve. > > > > It certainly does make sense to share this work, though it does to me > seem like a rather low level to integrate at. > > > > > > => Biggest question is whether the language-independent DAG is > expressive enough to capture all the expressions that we want to map > directly to Table API expressions. Currently much is hidden in opaque UDFs. > Kenn mentioned the structure should be flexible enough to capture more > expressions transparently. > > > > > > Just to add some context how this could be done, there is the concept > of > > > a FunctionSpec which is part of a transform in the DAG. FunctionSpec > > > contains a URN and with a payload. FunctionSpec can be either (1) > > > translated by the Runner directly, e.g. map to table API concepts or > (2) > > > run a user-defined function with an Environment. It could be feasible > > > for Flink to choose the direct path, whereas Beam Runners would > leverage > > > the more generic approach using UDFs. Granted, compatibility across > > > Flink and Beam would only work if both of the translation paths yielded > > > the same semantics. > > > > To elaborate a bit on this, Beam DAGs are built up by applying > Transforms (basically operations) to PColections (the equivalent of > dataset/datastream), but the key point here is that these transforms are > often composite operations that expand out into smaller subtransforms. This > expansion happens during pipeline construction, but with the recent work on > cross language pipelines can happen out of process. This is one point of > extendability. Secondly, and importantly, this composite structure is > preserved in the DAG, and so a runner is free to ignore the provided > expansion and supply its own (so long as semantically it produces exactly > the same output). These composite operations can be identified by arbitrary > URNs + payloads, and any runner that does not understand them simply uses > the pre-provided expansion. > > > > The existing Flink runner operates on exactly this principle, > translating URNs for the leaf operations (Map, Flatten, ...) as well as > some composites it can do better (e.g. Reshard). It is intentionally easy > to define and add new ones. This actually seems the easier approach (to me > at least, but that's probably heavily influenced by what I'm familiar with > vs. what I'm not). > > > > As for how well this maps onto the Flink Tables API, part of that > depends on how much of the API is the operations themselves, and how much > is concerning configuration/environment/etc. which is harder to talk about > in an agnostic way. > > > > Using something like Py4j is an easy way to get up an running, > especially for a very faithful API, but the instant one wants to add UDFs > one hits a cliff of sorts (which is surmountable, but likely a lot harder > than having gone the above approach). In addition (and I'll admit this is > rather subjective) it seems to me one of the primary values of a table-like > API in a given language (vs. just using (say) plain old SQL itself via a > console) is the ability to embed it in a larger pipeline, or at least drop > in operations that are not (as) naturally expressed in the "table way," > including existing libraries. In other words, a full SDK. The Py4j wrapping > doesn't extend itself to such integration nearly as easily. > > > > But I really do understand the desire to not block immediate work (and > value) for a longer term solution. > > > > > > If the DAG is generic enough to capture the additional information, > we probably still need some standardization, so that all the different > language APIs represent their expressions the same way > > > > > > I wonder whether that's necessary as a first step. I think it would be > > > fine for Flink to have its own way to represent API concepts in the > Beam > > > DAG which Beam Runners may not be able to understand. We could then > > > successively add the capability for these transforms to run with Beam. > > > > > > > Similarly, it makes sense to standardize the type system (and type > inference) as far as built-in expressions and their interaction with UDFs > are concerned. The Flink Table API and Blink teams found this to be > essential for a consistent API behavior. This would not prevent all-UDF > programs from still using purely binary/opaque types. > > > > > > Beam has a set of standard coders which can be used across languages. > We > > > will have to expand those to play well with Flink's: > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types > < > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types > > > > > > > > I think we will need to exchange more ideas to work out a model that > > > will work for both Flink and Beam. A regular meeting could be helpful. > > > > +1, I think this would be really good for both this effort and general > collaboration between the Beam and Flink communities. > > > > > Thanks, > > > Max > > > > > > On 23.04.19 21:23, Stephan Ewen wrote: > > > > Hi all! > > > > > > > > Below are my notes on the discussion last week on how to collaborate > > > > between Beam and Flink. > > > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, > Shaoxuan, > > > > Jincheng, and me. > > > > > > > > This represents my understanding of the discussion, please augment > this > > > > where I missed something or where your conclusion was different. > > > > > > > > Best, > > > > Stephan > > > > > > > > ======================================================= > > > > > > > > *Beams Python and Portability Framework* > > > > > > > > - Portability core to Beam > > > > - Language independent dataflow DAG that is defined via ProtoBuf > > > > - DAG can be generated from various languages (Java, Python, Go) > > > > - The DAG describes the pipelines and contains additional > parameters > > > > to describe each operator, and contains artifacts that need to be > > > > deployed / executed as part of an operator execution. > > > > - Operators execute in language-specific containers, data is > > > > exchanged between the language-specific container and the runner > > > > container (JVM) via gRPC. > > > > > > > > *Flink's desiderata for Python API* > > > > > > > > - Python API should mirror Java / Scala Table API > > > > - All relational expressions that correspond to built-in functions > > > > should be translated to corresponding expressions in the Table API. > That > > > > way the planner generated Java code for the data types and built-in > > > > expressions, meaning no Python code is necessary during execution > > > > - UDFs should be supported and run similarly as in Beam's approach > > > > - Python programs should be similarly created and > submitted/deployed > > > > as Java / Scala programs (CLI, web, containerized, etc.) > > > > > > > > *Consensus to share inter-process communication code* > > > > > > > > - Crucial code for robust setup and high performance data exchange > > > > across processes > > > > - The code for the SDK harness, the artifact boostrapping, and the > > > > data exchange make sense to share. > > > > - Ongoing discussion whether this can be a dedicated module with > slim > > > > dependencies in Beam > > > > > > > > *Potential Long Term Perspective: Share language-independent DAG > > > > representation* > > > > > > > > - Beam's language independent DAG could become a standard > > > > representation used in both projects > > > > - Flink would need an way to receive that DAG, map it to the Table > > > > API, execute it from there > > > > - The DAG would need to have a standardized representation of > > > > functions and expressions that then get mapped to Table API > expressions > > > > to let the planner optimize those and generate Java code for those > > > > - Similar as UDFs are supported in the Table API, there would be > > > > additional "external UDFs" that would go through the above mentioned > > > > inter-process communication layer > > > > > > > > - _Advantages:_ > > > > => Flink and Beam could share more language bindings > > > > => Flink would execute Beam portability programs fast, without > > > > intermediate abstraction and directly in the JVM for many operators. > > > > Abstraction is necessary around UDFs and to bridge between > > > > serializers / coders, etc. > > > > > > > > - _Open issues:_ > > > > => Biggest question is whether the language-independent DAG is > > > > expressive enough to capture all the expressions that we want to map > > > > directly to Table API expressions. Currently much is hidden in opaque > > > > UDFs. Kenn mentioned the structure should be flexible enough to > capture > > > > more expressions transparently. > > > > > > > > => If the DAG is generic enough to capture the additional > > > > information, we probably still need some standardization, so that all > > > > the different language APIs represent their expressions the same way > > > > => Similarly, it makes sense to standardize the type system (and > > > > type inference) as far as built-in expressions and their interaction > > > > with UDFs are concerned. The Flink Table API and Blink teams found > this > > > > to be essential for a consistent API behavior. This would not prevent > > > > all-UDF programs from still using purely binary/opaque types. > > > > > > > > => We need to create a Python API that follows the same structure > as > > > > Flink's Table API that produces the language-independent DAG > > > > > > > > *Short-term approach in Flink* > > > > > > > > - Goal is to not block Flink's Python effort on the long term > > > > approach and the necessary design and evolution of the > > > > language-independent DAG. > > > > - Depending on what the outcome of above investigation is, Flink > may > > > > initially go with a simple approach to map the Python Table API to > the > > > > the Java Table API via Py4J, as outlined in FLIP-38: > > > > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8 > < > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8 > > >