Hi all, We are discussing very detailed content about this proposal. We are trying to design the API in many aspects (functionality, compatibility, ease of use, etc.). I think this is a very good process. Only such a detailed discussion, In order to develop PR more clearly and smoothly in the later stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot of good ideas. About the definition of method signatures I want to share my points here which I am discussing with fabian in google doc (not yet completed), as follows:
Assume we have a table: val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, 'proctime.proctime) Approach 1: case1: Map follows Source Table val result = tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied in the output .window(Tumble over 5.millis on 'proctime as 'w) case2: FatAgg follows Window (Fabian mentioned above) val result = tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) // 'w should be a group key. .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) .select('k1, 'col1, 'w.rowtime as 'rtime) Approach 2: Similar to Fabian‘s approach, which the result schema would be clearly defined, but add a built-in append UDF. That make map/flatmap/agg/flatAgg interface only accept one Expression. val result = tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, 'long, 'proctime) .window(Tumble over 5.millis on 'proctime as 'w) Note: Append is a special UDF for built-in that can pass through any column. So, May be we can defined the as table.map(Expression) first, If necessary, we can extend to table.map(Expression*) in the future ? Of course, I also hope that we can do more perfection in this proposal through discussion. Thanks, Jincheng Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道: > Hi Fabian, > > I think that the key question you raised is if we allow extra parameters in > the methods map/flatMap/agg/flatAgg. I can see why allowing that may appear > more convenient in some cases. However, it might also cause some confusions > if we do that. For example, do we allow multiple UDFs in these expressions? > If we do, the semantics may be weird to define, e.g. what does > table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even though > not allowing it may appear less powerful, but it can make things more > intuitive too. In the case of agg/flatAgg, we can define the keys to be > implied in the result table and appears at the beginning. You can use a > select method if you want to modify this behavior. I think that eventually > we will have some API which allows other expressions as additional > parameters, but I think it's better to do that after we introduce the > concept of nested tables. A lot of things we suggested here can be > considered as special cases of that. But things are much simpler if we > leave that to later. > > Regards, > Xiaowei > > On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com> wrote: > > > Hi, > > > > * Re emit: > > I think we should start with a well understood semantics of full > > replacement. This is how the other agg functions work. > > As was said before, there are open questions regarding an append mode > > (checkpointing, whether supporting retractions or not and if yes how to > > declare them, ...). > > Since this seems to be an optimization, I'd postpone it. > > > > * Re grouping keys: > > I don't think we should automatically add them because the result schema > > would not be intuitive. > > Would they be added at the beginning of the tuple or at the end? What > > metadata fields of windows would be added? In which order would they be > > added? > > > > However, we could support syntax like this: > > val t: Table = ??? > > t > > .window(Tumble ... as 'w) > > .groupBy('a, 'b) > > .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as 'rtime) > > > > The result schema would be clearly defined as [b, a, f1, f2, ..., fn, > wend, > > rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > > > > * Re Multi-staged evaluation: > > I think this should be an optimization that can be applied if the UDF > > implements the merge() method. > > > > Best, Fabian > > > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > > wshaox...@gmail.com > > >: > > > > > Hi xiaowei, > > > > > > Yes, I agree with you that the semantics of TableAggregateFunction emit > > is > > > much more complex than AggregateFunction. The fundamental difference is > > > that TableAggregateFunction emits a "table" while AggregateFunction > > outputs > > > (a column of) a "row". In the case of AggregateFunction it only has one > > > mode which is “replacing” (complete update). But for > > > TableAggregateFunction, it could be incremental (only emit the new > > updated > > > results) update or complete update (always emit the entire table when > > > “emit" is triggered). From the performance perspective, we might want > to > > > use incremental update. But we need review and design this carefully, > > > especially taking into account the cases of the failover (instead of > just > > > back-up the ACC it may also needs to remember the emit offset) and > > > retractions, as the semantics of TableAggregateFunction emit are > > different > > > than other UDFs. TableFunction also emits a table, but it does not need > > to > > > worry this due to the nature of stateless. > > > > > > Regards, > > > Shaoxuan > > > > > > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <xiaow...@gmail.com> > wrote: > > > > > > > Hi Jincheng, > > > > > > > > Thanks for adding the public interfaces! I think that it's a very > good > > > > start. There are a few points that we need to have more discussions. > > > > > > > > - TableAggregateFunction - this is a very complex beast, > definitely > > > the > > > > most complex user defined objects we introduced so far. I think > > there > > > > are > > > > quite some interesting questions here. For example, do we allow > > > > multi-staged TableAggregate in this case? What is the semantics of > > > > emit? Is > > > > it amendments to the previous output, or replacing it? I think > that > > > this > > > > subject itself is worth a discussion to make sure we get the > details > > > > right. > > > > - GroupedTable.agg - does the group keys automatically appear in > the > > > > output? how about the case of windowing aggregation? > > > > > > > > Regards, > > > > Xiaowei > > > > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > sunjincheng...@gmail.com> > > > > wrote: > > > > > > > > > Hi, Xiaowei, > > > > > > > > > > Thanks for bring up the discuss of Table API Enhancement Outline ! > > > > > > > > > > I quickly looked at the overall content, these are good expressions > > of > > > > our > > > > > offline discussions. But from the points of my view, we should add > > the > > > > > usage of public interfaces that we will introduce in this propose. > > > So, I > > > > > added the following usage description of interface and operators > in > > > > > google doc: > > > > > > > > > > 1. Map Operator > > > > > Map operator is a new operator of Table, Map operator can > apply a > > > > > scalar function, and can return multi-column. The usage as follows: > > > > > > > > > > val res = tab > > > > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > > > > .select(‘a, ‘c) > > > > > > > > > > 2. FlatMap Operator > > > > > FaltMap operator is a new operator of Table, FlatMap operator > can > > > > apply > > > > > a table function, and can return multi-row. The usage as follows: > > > > > > > > > > val res = tab > > > > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > > > > .select(‘a, ‘c) > > > > > > > > > > 3. Agg Operator > > > > > Agg operator is a new operator of Table/GroupedTable, Agg > > operator > > > > can > > > > > apply a aggregate function, and can return multi-column. The usage > as > > > > > follows: > > > > > > > > > > val res = tab > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > > > aggregates > > > > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > > > > .select(‘a, ‘c) > > > > > > > > > > 4. FlatAgg Operator > > > > > FlatAgg operator is a new operator of Table/GroupedTable, > FaltAgg > > > > > operator can apply a table aggregate function, and can return > > > multi-row. > > > > > The usage as follows: > > > > > > > > > > val res = tab > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > table > > > > > aggregates > > > > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > > > > .select(‘a, ‘c) > > > > > > > > > > 5. TableAggregateFunction > > > > > The behavior of table aggregates is most like > > GroupReduceFunction > > > > did, > > > > > which computed for a group of elements, and output a group of > > > elements. > > > > > The TableAggregateFunction can be applied on > GroupedTable.flatAgg() . > > > The > > > > > interface of TableAggregateFunction has a lot of content, so I > don't > > > copy > > > > > it here, Please look at the detail in google doc: > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > > > > > > > Best, > > > > > Jincheng > > > > > > > > > > > > > > >