Hi Jincheng, Thanks for the summary! I like the approach with append() better than the implicit forwarding as it clearly indicates which fields are forwarded. However, I don't see much benefit over the flatMap(Expression*) variant, as we would still need to analyze the full expression tree to ensure that at most (or exactly?) one Scalar / TableFunction is used.
Best, Fabian Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < sunjincheng...@gmail.com>: > Hi all, > > We are discussing very detailed content about this proposal. We are trying > to design the API in many aspects (functionality, compatibility, ease of > use, etc.). I think this is a very good process. Only such a detailed > discussion, In order to develop PR more clearly and smoothly in the later > stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot of > good ideas. > About the definition of method signatures I want to share my points here > which I am discussing with fabian in google doc (not yet completed), as > follows: > > Assume we have a table: > val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, > 'proctime.proctime) > > Approach 1: > case1: Map follows Source Table > val result = > tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied in > the output > .window(Tumble over 5.millis on 'proctime as 'w) > > case2: FatAgg follows Window (Fabian mentioned above) > val result = > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > .select('k1, 'col1, 'w.rowtime as 'rtime) > > Approach 2: Similar to Fabian‘s approach, which the result schema would be > clearly defined, but add a built-in append UDF. That make > map/flatmap/agg/flatAgg interface only accept one Expression. > val result = > tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, > 'long, 'proctime) > .window(Tumble over 5.millis on 'proctime as 'w) > > Note: Append is a special UDF for built-in that can pass through any > column. > > So, May be we can defined the as table.map(Expression) first, If > necessary, we can extend to table.map(Expression*) in the future ? Of > course, I also hope that we can do more perfection in this proposal through > discussion. > > Thanks, > Jincheng > > > > > > Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道: > > > Hi Fabian, > > > > I think that the key question you raised is if we allow extra parameters > in > > the methods map/flatMap/agg/flatAgg. I can see why allowing that may > appear > > more convenient in some cases. However, it might also cause some > confusions > > if we do that. For example, do we allow multiple UDFs in these > expressions? > > If we do, the semantics may be weird to define, e.g. what does > > table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even though > > not allowing it may appear less powerful, but it can make things more > > intuitive too. In the case of agg/flatAgg, we can define the keys to be > > implied in the result table and appears at the beginning. You can use a > > select method if you want to modify this behavior. I think that > eventually > > we will have some API which allows other expressions as additional > > parameters, but I think it's better to do that after we introduce the > > concept of nested tables. A lot of things we suggested here can be > > considered as special cases of that. But things are much simpler if we > > leave that to later. > > > > Regards, > > Xiaowei > > > > On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com> wrote: > > > > > Hi, > > > > > > * Re emit: > > > I think we should start with a well understood semantics of full > > > replacement. This is how the other agg functions work. > > > As was said before, there are open questions regarding an append mode > > > (checkpointing, whether supporting retractions or not and if yes how to > > > declare them, ...). > > > Since this seems to be an optimization, I'd postpone it. > > > > > > * Re grouping keys: > > > I don't think we should automatically add them because the result > schema > > > would not be intuitive. > > > Would they be added at the beginning of the tuple or at the end? What > > > metadata fields of windows would be added? In which order would they be > > > added? > > > > > > However, we could support syntax like this: > > > val t: Table = ??? > > > t > > > .window(Tumble ... as 'w) > > > .groupBy('a, 'b) > > > .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as > 'rtime) > > > > > > The result schema would be clearly defined as [b, a, f1, f2, ..., fn, > > wend, > > > rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > > > > > > * Re Multi-staged evaluation: > > > I think this should be an optimization that can be applied if the UDF > > > implements the merge() method. > > > > > > Best, Fabian > > > > > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > > > wshaox...@gmail.com > > > >: > > > > > > > Hi xiaowei, > > > > > > > > Yes, I agree with you that the semantics of TableAggregateFunction > emit > > > is > > > > much more complex than AggregateFunction. The fundamental difference > is > > > > that TableAggregateFunction emits a "table" while AggregateFunction > > > outputs > > > > (a column of) a "row". In the case of AggregateFunction it only has > one > > > > mode which is “replacing” (complete update). But for > > > > TableAggregateFunction, it could be incremental (only emit the new > > > updated > > > > results) update or complete update (always emit the entire table when > > > > “emit" is triggered). From the performance perspective, we might > want > > to > > > > use incremental update. But we need review and design this carefully, > > > > especially taking into account the cases of the failover (instead of > > just > > > > back-up the ACC it may also needs to remember the emit offset) and > > > > retractions, as the semantics of TableAggregateFunction emit are > > > different > > > > than other UDFs. TableFunction also emits a table, but it does not > need > > > to > > > > worry this due to the nature of stateless. > > > > > > > > Regards, > > > > Shaoxuan > > > > > > > > > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <xiaow...@gmail.com> > > wrote: > > > > > > > > > Hi Jincheng, > > > > > > > > > > Thanks for adding the public interfaces! I think that it's a very > > good > > > > > start. There are a few points that we need to have more > discussions. > > > > > > > > > > - TableAggregateFunction - this is a very complex beast, > > definitely > > > > the > > > > > most complex user defined objects we introduced so far. I think > > > there > > > > > are > > > > > quite some interesting questions here. For example, do we allow > > > > > multi-staged TableAggregate in this case? What is the semantics > of > > > > > emit? Is > > > > > it amendments to the previous output, or replacing it? I think > > that > > > > this > > > > > subject itself is worth a discussion to make sure we get the > > details > > > > > right. > > > > > - GroupedTable.agg - does the group keys automatically appear in > > the > > > > > output? how about the case of windowing aggregation? > > > > > > > > > > Regards, > > > > > Xiaowei > > > > > > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > > sunjincheng...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi, Xiaowei, > > > > > > > > > > > > Thanks for bring up the discuss of Table API Enhancement Outline > ! > > > > > > > > > > > > I quickly looked at the overall content, these are good > expressions > > > of > > > > > our > > > > > > offline discussions. But from the points of my view, we should > add > > > the > > > > > > usage of public interfaces that we will introduce in this > propose. > > > > So, I > > > > > > added the following usage description of interface and operators > > in > > > > > > google doc: > > > > > > > > > > > > 1. Map Operator > > > > > > Map operator is a new operator of Table, Map operator can > > apply a > > > > > > scalar function, and can return multi-column. The usage as > follows: > > > > > > > > > > > > val res = tab > > > > > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > 2. FlatMap Operator > > > > > > FaltMap operator is a new operator of Table, FlatMap operator > > can > > > > > apply > > > > > > a table function, and can return multi-row. The usage as follows: > > > > > > > > > > > > val res = tab > > > > > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > 3. Agg Operator > > > > > > Agg operator is a new operator of Table/GroupedTable, Agg > > > operator > > > > > can > > > > > > apply a aggregate function, and can return multi-column. The > usage > > as > > > > > > follows: > > > > > > > > > > > > val res = tab > > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > > > > aggregates > > > > > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > 4. FlatAgg Operator > > > > > > FlatAgg operator is a new operator of Table/GroupedTable, > > FaltAgg > > > > > > operator can apply a table aggregate function, and can return > > > > multi-row. > > > > > > The usage as follows: > > > > > > > > > > > > val res = tab > > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > > table > > > > > > aggregates > > > > > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > 5. TableAggregateFunction > > > > > > The behavior of table aggregates is most like > > > GroupReduceFunction > > > > > did, > > > > > > which computed for a group of elements, and output a group of > > > > elements. > > > > > > The TableAggregateFunction can be applied on > > GroupedTable.flatAgg() . > > > > The > > > > > > interface of TableAggregateFunction has a lot of content, so I > > don't > > > > copy > > > > > > it here, Please look at the detail in google doc: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > > > > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > > > > > > > > > Best, > > > > > > Jincheng > > > > > > > > > > > > > > > > > > > > >