Thanks, I will have a look at your comments tomorrow and create a PR which should superseed 210. BTW, is there already a test case where I can see the suggested way to do staged execution in with the new ExecutionEnvironment API?
I thought about your second remark as well. The following lines pitch summarize some of these thoughts and can (or cannot) be used for future improvements in the API and the runtime design. Ideally, a runtime for parallel collection processing should support at least two types of values: - collections of type DataSet[T]; these are consumed and produced by the parallel dataflows; - "simple types" of any type either T; these are used to represent "broadcast variables" as well as results for global aggregates; By "simple types" I don't mean that T should be a scalar -- it could also be a complex type like a POJO or Typle; it rather entails that the system does not "understand" and cannot make use of its internal type structure; At the moment, the runtime only supports values of type DataSet[T]. This is inconvenient because - you have to wrap simple typed values into a DataSet[T] in order to expose them to your UDFs as broadcast variables; this is not visible to the user but makes for some confusing code in the internals; - global aggregates produce a value of type DataSet[T] rather than T; This is inconsistent with the result type of the fold operator (and its variants) which can be seen in other programming languages; I think that the ideas in your email go in that direction. I suggest to have the following hierarchy of types: - Value[T] - an abstract base for all values - Singleton[T] extends Value[T] - a container for exactly one value of type T - DataSet[T] extends Value[T] - a container for a (parallelizable) homogeneous collection of values of type T We should then rethink which of the runtime operators can consume or produce both value types and which can only consume/produce a Singleton[T] or a DataSet[T] adapt their signatures accordingly. 2015-01-16 15:24 GMT+01:00 Stephan Ewen <se...@apache.org>: > @Alex That sounds great. I added a few inline comments to PR 210 and then > it is good to merge. If you want, feel free to fix it up and we will merge > it. > > Feel free to also add (or suggest and stub) more of such functions. Is that > what you meant by "designing interfaces" ? > > Here is a thought that crossed my mind: > - Should functions like reduce() and aggregate() (in their ungrouped > version) produce a "SingleValuedDataSet" (or ScalarDataSet) that is known > to have only a single value? That data set could offer an additional method > "get()" that directly grabs that value (rather then collect() getting a > list). > > Stephan > > > > > On Thu, Jan 15, 2015 at 11:30 AM, Ufuk Celebi <u...@apache.org> wrote: > > > > > On 13 Jan 2015, at 16:50, Stephan Ewen <se...@apache.org> wrote: > > > > > Hi! > > > > > > To follow up on what Ufuk explaned: > > > > > > - Ufuk is right, the problem is not getting the data set. > > > https://github.com/apache/flink/pull/210 does that for anything that > is > > not > > > too gigantic, which is a good start. I think we should merge this as > soon > > > as we agree on the signature and names of the API methods. We can swap > > the > > > internal realization for something more robust later. > > > > > > - For anything that just issues a program and wants the result back, > this > > > is actually perfectly fine. > > > > > > - For true interactive programs, we need to back track to intermediate > > > results (rather than to the source) to avoid re-executing large parts. > > This > > > is the biggest missing piece, next to the persistent materialization of > > > intermediate results (Ufuk is working on this). The logic is the same > as > > > for fault tolerance, so it is part of that development. > > > > > > @alexander: I want to create the feature branch for that on Thursday. > Are > > > you interested in contributing to that feature? > > > > > > - For streaming results continuously back, we need another mechanism > than > > > the accumulators. Let's create a design doc or thread an get working on > > > that. Probably involves adding another set of akka messages from TM -> > JM > > > -> Client. Or something like an extension to the BLOB manager for > > streams? > > > > For streaming results back, we can use the same mechanisms used by the > > task managers. Let me add documentation (FLINK-1373) for the network > stack > > this week. >