On Thu, Apr 25, 2019 at 5:59 AM Dian Fu <dian0511...@gmail.com> wrote: > > Thanks everyone for the discussion here. > > Regarding to the Java/Scala UDF and the built-in UDF to execute in the > current Flink way (directly in JVM, not via RPC), I share the same thoughts > with Max and Robert and I think it will not be a big problem. From the design > doc, I guess the main reason to take the Py4J way instead of the DAG way at > present is that DAG has some limitations in some scenarios such as > interactive programing which may be a strong requirement for data scientist.
I definitely agree that interactive is strong requirement for the data scientist (and others). I don't think this is incompatible with the DAG model, and something I want to see more of. (For one exploration, see BeamPython's (still WIP) InteractiveRunner). There are lots of interesting challenges here (e.g. sampling, partial results, optimal caching of results vs. re-execution, especially in the face of fusion) that would be worth working out together. > In addition (and I'll admit this is rather subjective) it seems to me one of > the primary values of a table-like API in a given language (vs. just using > (say) plain old SQL itself via a console) is the ability to embed it in a > larger pipeline, or at least drop in operations that are not (as) naturally > expressed in the "table way," including existing libraries. In other words, a > full SDK. The Py4j wrapping doesn't extend itself to such integration nearly > as easily. > > Hi Robert, regarding to "a larger pipeline", do you mean translating a > table-like API jobs from/to another kind of API job or embedding third-part > libraries into a table-like API jobs via UDF? Could you kindly explain why > this would be a problem for Py4J and will not be a problem if expressing the > job with DAG? I'm talking about anything one would want to do after tableEnv.toDataSet() or before tableEnv.registerTable(...). Unless you plan on also wrapping the DataSet/DataStream APIs too, which is a much taller task. Let alone wrapping all the libraries one might want to use that are built on these APIs. If this is instead integrated at a higher level, you could swap back and forth between the new Tables API and the existing Python SDK (including libraries such as TFX, and cross langauge capabilities) almost for free. > 在 2019年4月25日,上午12:16,Robert Bradshaw <rober...@google.com> 写道: > > Thanks for the meeting summary, Stephan. Sound like you covered a lot of > ground. Some more comments below, adding onto what Max has said. > > On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <m...@apache.org> wrote: > > > > Hi Stephan, > > > > This is excited! Thanks for sharing. The inter-process communication > > code looks like the most natural choice as a common ground. To go > > further, there are indeed some challenges to solve. > > It certainly does make sense to share this work, though it does to me seem > like a rather low level to integrate at. > > > > => Biggest question is whether the language-independent DAG is expressive > > > enough to capture all the expressions that we want to map directly to > > > Table API expressions. Currently much is hidden in opaque UDFs. Kenn > > > mentioned the structure should be flexible enough to capture more > > > expressions transparently. > > > > Just to add some context how this could be done, there is the concept of > > a FunctionSpec which is part of a transform in the DAG. FunctionSpec > > contains a URN and with a payload. FunctionSpec can be either (1) > > translated by the Runner directly, e.g. map to table API concepts or (2) > > run a user-defined function with an Environment. It could be feasible > > for Flink to choose the direct path, whereas Beam Runners would leverage > > the more generic approach using UDFs. Granted, compatibility across > > Flink and Beam would only work if both of the translation paths yielded > > the same semantics. > > To elaborate a bit on this, Beam DAGs are built up by applying Transforms > (basically operations) to PColections (the equivalent of dataset/datastream), > but the key point here is that these transforms are often composite > operations that expand out into smaller subtransforms. This expansion happens > during pipeline construction, but with the recent work on cross language > pipelines can happen out of process. This is one point of extendability. > Secondly, and importantly, this composite structure is preserved in the DAG, > and so a runner is free to ignore the provided expansion and supply its own > (so long as semantically it produces exactly the same output). These > composite operations can be identified by arbitrary URNs + payloads, and any > runner that does not understand them simply uses the pre-provided expansion. > > The existing Flink runner operates on exactly this principle, translating > URNs for the leaf operations (Map, Flatten, ...) as well as some composites > it can do better (e.g. Reshard). It is intentionally easy to define and add > new ones. This actually seems the easier approach (to me at least, but that's > probably heavily influenced by what I'm familiar with vs. what I'm not). > > As for how well this maps onto the Flink Tables API, part of that depends on > how much of the API is the operations themselves, and how much is concerning > configuration/environment/etc. which is harder to talk about in an agnostic > way. > > Using something like Py4j is an easy way to get up an running, especially for > a very faithful API, but the instant one wants to add UDFs one hits a cliff > of sorts (which is surmountable, but likely a lot harder than having gone the > above approach). In addition (and I'll admit this is rather subjective) it > seems to me one of the primary values of a table-like API in a given language > (vs. just using (say) plain old SQL itself via a console) is the ability to > embed it in a larger pipeline, or at least drop in operations that are not > (as) naturally expressed in the "table way," including existing libraries. In > other words, a full SDK. The Py4j wrapping doesn't extend itself to such > integration nearly as easily. > > But I really do understand the desire to not block immediate work (and value) > for a longer term solution. > > > > If the DAG is generic enough to capture the additional information, we > > > probably still need some standardization, so that all the different > > > language APIs represent their expressions the same way > > > > I wonder whether that's necessary as a first step. I think it would be > > fine for Flink to have its own way to represent API concepts in the Beam > > DAG which Beam Runners may not be able to understand. We could then > > successively add the capability for these transforms to run with Beam. > > > > > Similarly, it makes sense to standardize the type system (and type > > > inference) as far as built-in expressions and their interaction with UDFs > > > are concerned. The Flink Table API and Blink teams found this to be > > > essential for a consistent API behavior. This would not prevent all-UDF > > > programs from still using purely binary/opaque types. > > > > Beam has a set of standard coders which can be used across languages. We > > will have to expand those to play well with Flink's: > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types > > > > I think we will need to exchange more ideas to work out a model that > > will work for both Flink and Beam. A regular meeting could be helpful. > > +1, I think this would be really good for both this effort and general > collaboration between the Beam and Flink communities. > > > Thanks, > > Max > > > > On 23.04.19 21:23, Stephan Ewen wrote: > > > Hi all! > > > > > > Below are my notes on the discussion last week on how to collaborate > > > between Beam and Flink. > > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan, > > > Jincheng, and me. > > > > > > This represents my understanding of the discussion, please augment this > > > where I missed something or where your conclusion was different. > > > > > > Best, > > > Stephan > > > > > > ======================================================= > > > > > > *Beams Python and Portability Framework* > > > > > > - Portability core to Beam > > > - Language independent dataflow DAG that is defined via ProtoBuf > > > - DAG can be generated from various languages (Java, Python, Go) > > > - The DAG describes the pipelines and contains additional parameters > > > to describe each operator, and contains artifacts that need to be > > > deployed / executed as part of an operator execution. > > > - Operators execute in language-specific containers, data is > > > exchanged between the language-specific container and the runner > > > container (JVM) via gRPC. > > > > > > *Flink's desiderata for Python API* > > > > > > - Python API should mirror Java / Scala Table API > > > - All relational expressions that correspond to built-in functions > > > should be translated to corresponding expressions in the Table API. That > > > way the planner generated Java code for the data types and built-in > > > expressions, meaning no Python code is necessary during execution > > > - UDFs should be supported and run similarly as in Beam's approach > > > - Python programs should be similarly created and submitted/deployed > > > as Java / Scala programs (CLI, web, containerized, etc.) > > > > > > *Consensus to share inter-process communication code* > > > > > > - Crucial code for robust setup and high performance data exchange > > > across processes > > > - The code for the SDK harness, the artifact boostrapping, and the > > > data exchange make sense to share. > > > - Ongoing discussion whether this can be a dedicated module with slim > > > dependencies in Beam > > > > > > *Potential Long Term Perspective: Share language-independent DAG > > > representation* > > > > > > - Beam's language independent DAG could become a standard > > > representation used in both projects > > > - Flink would need an way to receive that DAG, map it to the Table > > > API, execute it from there > > > - The DAG would need to have a standardized representation of > > > functions and expressions that then get mapped to Table API expressions > > > to let the planner optimize those and generate Java code for those > > > - Similar as UDFs are supported in the Table API, there would be > > > additional "external UDFs" that would go through the above mentioned > > > inter-process communication layer > > > > > > - _Advantages:_ > > > => Flink and Beam could share more language bindings > > > => Flink would execute Beam portability programs fast, without > > > intermediate abstraction and directly in the JVM for many operators. > > > Abstraction is necessary around UDFs and to bridge between > > > serializers / coders, etc. > > > > > > - _Open issues:_ > > > => Biggest question is whether the language-independent DAG is > > > expressive enough to capture all the expressions that we want to map > > > directly to Table API expressions. Currently much is hidden in opaque > > > UDFs. Kenn mentioned the structure should be flexible enough to capture > > > more expressions transparently. > > > > > > => If the DAG is generic enough to capture the additional > > > information, we probably still need some standardization, so that all > > > the different language APIs represent their expressions the same way > > > => Similarly, it makes sense to standardize the type system (and > > > type inference) as far as built-in expressions and their interaction > > > with UDFs are concerned. The Flink Table API and Blink teams found this > > > to be essential for a consistent API behavior. This would not prevent > > > all-UDF programs from still using purely binary/opaque types. > > > > > > => We need to create a Python API that follows the same structure as > > > Flink's Table API that produces the language-independent DAG > > > > > > *Short-term approach in Flink* > > > > > > - Goal is to not block Flink's Python effort on the long term > > > approach and the necessary design and evolution of the > > > language-independent DAG. > > > - Depending on what the outcome of above investigation is, Flink may > > > initially go with a simple approach to map the Python Table API to the > > > the Java Table API via Py4J, as outlined in FLIP-38: > > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8 > >