Hi Piotrek, Thanks for your feedback, and thanks for share your thoughts! #1) No,watermark solves the issue of the late event. Here, the performance problem is caused by the update emit mode. i.e.: When current calculation result is output, the previous calculation result needs to be retracted. #2) As I mentioned above we should continue the discussion until we solve the problems raised by Xiaowei and Fabian. #3)I still hope to keep the simplicity that select only support projected scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b, flatmap('d)).
Thanks, Jincheng Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午5:24写道: > Hi, > > 1. > > > In fact, in addition to the design of APIs, there will be various > > performance optimization details, such as: table Aggregate function > > emitValue will generate multiple calculation results, in extreme cases, > > each record will trigger a large number of retract messages, this will > have > > poor performance > > Can this be solved/mitigated by emitting the results only on watermarks? I > think that was the path that we decided to take both for Temporal Joins and > upsert stream conversion. I know that this increases the latency and there > is a place for a future global setting/user preference “emit the data ASAP > mode”, but emitting only on watermarks seems to me as a better/more sane > default. > > 2. > > With respect to the API discussion and implicit columns. The problem for > me so far is I’m not sure if I like the additionally complexity of > `append()` solution, while implicit columns are definitely not in the > spirit of SQL. Neither joins nor aggregations add extra unexpected columns > to the result without asking. This definitely can be confusing for the > users since it brakes the convention. Thus I would lean towards Fabian’s > proposal of multi-argument `map(Expression*)` from those 3 options. > > 3. > > Another topic is that I’m not 100% convinced that we should be adding new > api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think > the same could be achieved by changing > > table.map(F('x)) > > into > > table.select(F('x)).unnest() > or > table.select(F('x).unnest()) > > Where `unnest()` means unnest row/tuple type into a columnar table. > > table.flatMap(F('x)) > > Could be on the other hand also handled by > > table.select(F('x)) > > By correctly deducing that F(x) is a multi row output function > > Same might apply to `aggregate(F('x))`, but this maybe could be replaced > by: > > table.groupBy(…).select(F('x).unnest()) > > Adding scalar functions should also be possible: > > table.groupBy('k).select(F('x).unnest(), ‘k) > > Maybe such approach would allow us to implement the same features in the > SQL as well? > > Piotrek > > > On 21 Nov 2018, at 09:43, Hequn Cheng <chenghe...@gmail.com> wrote: > > > > Hi, > > > > Thank you all for the great proposal and discussion! > > I also prefer to move on to the next step, so +1 for opening the JIRAs to > > start the work. > > We can have more detailed discussion there. Btw, we can start with JIRAs > > which we have agreed on. > > > > Best, > > Hequn > > > > On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <wshaox...@gmail.com> > wrote: > > > >> +1. I agree that we should open the JIRAs to start the work. We may > >> have better ideas on the flavor of the interface when implement/review > >> the code. > >> > >> Regards, > >> shaoxuan > >> > >> > >> On 11/20/18, jincheng sun <sunjincheng...@gmail.com> wrote: > >>> Hi all, > >>> > >>> Thanks all for the feedback. > >>> > >>> @Piotr About not using abbreviations naming, +1,I like > >>> your proposal!Currently both DataSet and DataStream API are using > >>> `aggregate`, > >>> BTW,I find other language also not using abbreviations naming,such as > R. > >>> > >>> Sometimes the interface of the API is really difficult to perfect, we > >> need > >>> to spend a lot of time thinking and feedback from a large number of > >> users, > >>> and constantly improve, but for backward compatibility issues, we have > to > >>> adopt the most conservative approach when designing the API(Of course, > I > >> am > >>> more in favor of developing more rich features, when we discuss > clearly). > >>> Therefore, I propose to divide the function implementation of > >>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that > >>> support time attributes and groupKeys. We can develop the features > which > >>> we have already agreed on the design. And we will continue to discuss > >> the > >>> uncertain design. > >>> > >>> In fact, in addition to the design of APIs, there will be various > >>> performance optimization details, such as: table Aggregate function > >>> emitValue will generate multiple calculation results, in extreme cases, > >>> each record will trigger a large number of retract messages, this will > >> have > >>> poor performance,so we will also optimize the interface design, such as > >>> adding the emitWithRetractValue interface (I have updated the google > doc) > >>> to allow the user to optionally perform incremental calculations, thus > >>> avoiding a large number of retracts. Details like this are difficult to > >>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP > first, > >>> we develop designs that have been agreed upon and continue to discuss > >>> non-deterministic designs! What do you think? @Fabian & Piotr & > XiaoWei > >>> > >>> Best, > >>> Jincheng > >>> > >>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一 上午12:07写道: > >>> > >>>> Hi Fabian & Piotr, thanks for the feedback! > >>>> > >>>> I appreciate your concerns, both on timestamp attributes as well as on > >>>> implicit group keys. At the same time, I'm also concerned with the > >>>> proposed > >>>> approach of allowing Expression* as parameters, especially for > >>>> flatMap/flatAgg. So far, we never allowed a scalar expression to > appear > >>>> together with table expressions. With the Expression* approach, this > >> will > >>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on > if > >>>> we > >>>> fully understand the consequences when we try to extend our system in > >> the > >>>> future. I would be extra cautious in doing this. To avoid this, I > think > >>>> an > >>>> implicit group key for flatAgg is safer. For flatMap, if users want to > >>>> keep > >>>> the rowtime column, he can use crossApply/join instead. So we are not > >>>> losing any real functionality here. > >>>> > >>>> Also a clarification on the following example: > >>>> tab.window(Tumble ... as 'w) > >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>> If we did not have the select clause in this example, we will have 'w > as > >>>> a > >>>> regular column in the output. It should not magically disappear. > >>>> > >>>> The concern is not as strong for Table.map/Table.agg because we are > not > >>>> mixing scalar and table expressions. But we also want to be a bit > >>>> consistent with these methods. If we used implicit group keys for > >>>> Table.flatAgg, we probably should do the same for Table.agg. Now we > only > >>>> have to choose what to do with Table.map. I can see good arguments > from > >>>> both sides. But starting with a single Expression seems safer because > >>>> that > >>>> we can always extend to Expression* in the future. > >>>> > >>>> While thinking about this problem, it appears that we may need more > work > >>>> in > >>>> our handling of watermarks for SQL/Table API. Our current way of > >>>> propagating the watermarks from source all the way to sink might not > be > >>>> optimal. For example, after a tumbling window, the watermark can > >> actually > >>>> be advanced to just before the expiring of next window. I think that > in > >>>> general, each operator may need to generate new watermarks instead of > >>>> simply propagating them. Once we accept that watermarks may change > >> during > >>>> the execution, it appears that the timestamp columns may also change, > as > >>>> long as we have some way to associate watermark with it. My intuition > is > >>>> that once we have a through solution for the watermark issue, we may > be > >>>> able to solve the problem we encountered for Table.map in a cleaner > way. > >>>> But this is a complex issue which deserves a discussion on its own. > >>>> > >>>> Regards, > >>>> Xiaowei > >>>> > >>>> > >>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski < > >> pi...@data-artisans.com> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> Isn’t the problem of multiple expressions limited only to `flat***` > >>>>> functions and to be more specific only to having two (or more) > >>>>> different > >>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a), > >>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined > >>>>> (duplicate result of every scalar function to every record. Or am I > >>>> missing > >>>>> something? > >>>>> > >>>>> Another remark, I would be in favour of not using abbreviations and > >>>> naming > >>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. > >>>>> > >>>>> Piotrek > >>>>> > >>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com> wrote: > >>>>>> > >>>>>> Hi Jincheng, > >>>>>> > >>>>>> I said before, that I think that the append() method is better than > >>>>>> implicitly forwarding keys, but still, I believe it adds unnecessary > >>>>> boiler > >>>>>> plate code. > >>>>>> > >>>>>> Moreover, I haven't seen a convincing argument why map(Expression*) > >>>>>> is > >>>>>> worse than map(Expression). In either case we need to do all kinds > >> of > >>>>>> checks to prevent invalid use of functions. > >>>>>> If the method is not correctly used, we can emit a good error > >> message > >>>> and > >>>>>> documenting map(Expression*) will be easier than > >>>>> map(append(Expression*)), > >>>>>> in my opinion. > >>>>>> I think we should not add unnessary syntax unless there is a good > >>>> reason > >>>>>> and to be honest, I haven't seen this reason yet. > >>>>>> > >>>>>> Regarding the groupBy.agg() method, I think it should behave just > >>>>>> like > >>>>> any > >>>>>> other method, i.e., not do any implicit forwarding. > >>>>>> Let's take the example of the windowed group by, that you posted > >>>> before. > >>>>>> > >>>>>> tab.window(Tumble ... as 'w) > >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>>>> > >>>>>> What happens if 'w.rowtime is not selected? What is the data type of > >>>> the > >>>>>> field 'w in the resulting Table? Is it a regular field at all or > >> just > >>>>>> a > >>>>>> system field that disappears if it is not selected? > >>>>>> > >>>>>> IMO, the following syntax is shorter, more explicit, and better > >>>>>> aligned > >>>>>> with the regular window.groupBy.select aggregations that are > >>>>>> supported > >>>>>> today. > >>>>>> > >>>>>> tab.window(Tumble ... as 'w) > >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) > >>>>>> > >>>>>> > >>>>>> Best, Fabian > >>>>>> > >>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < > >>>>>> sunjincheng...@gmail.com>: > >>>>>> > >>>>>>> Hi Fabian/Xiaowei, > >>>>>>> > >>>>>>> I am very sorry for my late reply! Glad to see your reply, and > >>>>>>> sounds > >>>>>>> pretty good! > >>>>>>> I agree that the approach with append() which can clearly defined > >>>>>>> the > >>>>>>> result schema is better which Fabian mentioned. > >>>>>>> In addition and append() and also contains non-time attributes, > >>>>>>> e.g.: > >>>>>>> > >>>>>>> tab('name, 'age, 'address, 'rowtime) > >>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, > >>>>>>> 'address, 'rowtime) > >>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w) > >>>>>>> .groupBy('w, 'address) > >>>>>>> > >>>>>>> In this way the append() is very useful, and the behavior is very > >>>>> similar > >>>>>>> to withForwardedFields() in DataSet. > >>>>>>> So +1 to using append() approach for the map()&flatmap()! > >>>>>>> > >>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree > >>>>>>> Xiaowei's approach that define the keys to be implied in the result > >>>>> table > >>>>>>> and appears at the beginning, for example as follows: > >>>>>>> tab.window(Tumble ... as 'w) > >>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>>>>> > >>>>>>> What to you think? @Fabian @Xiaowei > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Jincheng > >>>>>>> > >>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道: > >>>>>>> > >>>>>>>> Hi Jincheng, > >>>>>>>> > >>>>>>>> Thanks for the summary! > >>>>>>>> I like the approach with append() better than the implicit > >>>>>>>> forwarding > >>>>> as > >>>>>>> it > >>>>>>>> clearly indicates which fields are forwarded. > >>>>>>>> However, I don't see much benefit over the flatMap(Expression*) > >>>>> variant, > >>>>>>> as > >>>>>>>> we would still need to analyze the full expression tree to ensure > >>>> that > >>>>> at > >>>>>>>> most (or exactly?) one Scalar / TableFunction is used. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Fabian > >>>>>>>> > >>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < > >>>>>>>> sunjincheng...@gmail.com>: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> We are discussing very detailed content about this proposal. We > >>>>>>>>> are > >>>>>>>> trying > >>>>>>>>> to design the API in many aspects (functionality, compatibility, > >>>> ease > >>>>>>> of > >>>>>>>>> use, etc.). I think this is a very good process. Only such a > >>>> detailed > >>>>>>>>> discussion, In order to develop PR more clearly and smoothly in > >>>>>>>>> the > >>>>>>> later > >>>>>>>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a > >>>>>>>>> lot > >>>>> of > >>>>>>>>> good ideas. > >>>>>>>>> About the definition of method signatures I want to share my > >>>>>>>>> points > >>>>>>> here > >>>>>>>>> which I am discussing with fabian in google doc (not yet > >>>>>>>>> completed), > >>>>> as > >>>>>>>>> follows: > >>>>>>>>> > >>>>>>>>> Assume we have a table: > >>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, > >> 'string, > >>>>>>>>> 'proctime.proctime) > >>>>>>>>> > >>>>>>>>> Approach 1: > >>>>>>>>> case1: Map follows Source Table > >>>>>>>>> val result = > >>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime > >>>> implied > >>>>>>> in > >>>>>>>>> the output > >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) > >>>>>>>>> > >>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above) > >>>>>>>>> val result = > >>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > >>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>>>>>>> > >>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result schema > >>>>> would > >>>>>>>> be > >>>>>>>>> clearly defined, but add a built-in append UDF. That make > >>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression. > >>>>>>>>> val result = > >>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, > >>>>>>>>> 'col2, > >>>>>>>>> 'long, 'proctime) > >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) > >>>>>>>>> > >>>>>>>>> Note: Append is a special UDF for built-in that can pass through > >>>>>>>>> any > >>>>>>>>> column. > >>>>>>>>> > >>>>>>>>> So, May be we can defined the as table.map(Expression) first, > >> If > >>>>>>>>> necessary, we can extend to table.map(Expression*) in the future > >>>>>>>>> ? > >>>>> Of > >>>>>>>>> course, I also hope that we can do more perfection in this > >>>>>>>>> proposal > >>>>>>>> through > >>>>>>>>> discussion. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Jincheng > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道: > >>>>>>>>> > >>>>>>>>>> Hi Fabian, > >>>>>>>>>> > >>>>>>>>>> I think that the key question you raised is if we allow extra > >>>>>>>> parameters > >>>>>>>>> in > >>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that > >>>> may > >>>>>>>>> appear > >>>>>>>>>> more convenient in some cases. However, it might also cause some > >>>>>>>>> confusions > >>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these > >>>>>>>>> expressions? > >>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does > >>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? > >>>>>>>>>> Even > >>>>>>>> though > >>>>>>>>>> not allowing it may appear less powerful, but it can make things > >>>> more > >>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the > >> keys > >>>> to > >>>>>>> be > >>>>>>>>>> implied in the result table and appears at the beginning. You > >> can > >>>>>>> use a > >>>>>>>>>> select method if you want to modify this behavior. I think that > >>>>>>>>> eventually > >>>>>>>>>> we will have some API which allows other expressions as > >>>>>>>>>> additional > >>>>>>>>>> parameters, but I think it's better to do that after we > >> introduce > >>>> the > >>>>>>>>>> concept of nested tables. A lot of things we suggested here can > >>>>>>>>>> be > >>>>>>>>>> considered as special cases of that. But things are much simpler > >>>>>>>>>> if > >>>>>>> we > >>>>>>>>>> leave that to later. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Xiaowei > >>>>>>>>>> > >>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com > >>> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> * Re emit: > >>>>>>>>>>> I think we should start with a well understood semantics of > >> full > >>>>>>>>>>> replacement. This is how the other agg functions work. > >>>>>>>>>>> As was said before, there are open questions regarding an > >> append > >>>>>>> mode > >>>>>>>>>>> (checkpointing, whether supporting retractions or not and if > >> yes > >>>>>>> how > >>>>>>>> to > >>>>>>>>>>> declare them, ...). > >>>>>>>>>>> Since this seems to be an optimization, I'd postpone it. > >>>>>>>>>>> > >>>>>>>>>>> * Re grouping keys: > >>>>>>>>>>> I don't think we should automatically add them because the > >>>>>>>>>>> result > >>>>>>>>> schema > >>>>>>>>>>> would not be intuitive. > >>>>>>>>>>> Would they be added at the beginning of the tuple or at the > >> end? > >>>>>>> What > >>>>>>>>>>> metadata fields of windows would be added? In which order would > >>>>>>> they > >>>>>>>> be > >>>>>>>>>>> added? > >>>>>>>>>>> > >>>>>>>>>>> However, we could support syntax like this: > >>>>>>>>>>> val t: Table = ??? > >>>>>>>>>>> t > >>>>>>>>>>> .window(Tumble ... as 'w) > >>>>>>>>>>> .groupBy('a, 'b) > >>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime > >> as > >>>>>>>>> 'rtime) > >>>>>>>>>>> > >>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2, > >>>>>>>>>>> ..., > >>>>>>> fn, > >>>>>>>>>> wend, > >>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > >>>>>>>>>>> > >>>>>>>>>>> * Re Multi-staged evaluation: > >>>>>>>>>>> I think this should be an optimization that can be applied if > >>>>>>>>>>> the > >>>>>>> UDF > >>>>>>>>>>> implements the merge() method. > >>>>>>>>>>> > >>>>>>>>>>> Best, Fabian > >>>>>>>>>>> > >>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > >>>>>>>>>>> wshaox...@gmail.com > >>>>>>>>>>>> : > >>>>>>>>>>> > >>>>>>>>>>>> Hi xiaowei, > >>>>>>>>>>>> > >>>>>>>>>>>> Yes, I agree with you that the semantics of > >>>>>>> TableAggregateFunction > >>>>>>>>> emit > >>>>>>>>>>> is > >>>>>>>>>>>> much more complex than AggregateFunction. The fundamental > >>>>>>>> difference > >>>>>>>>> is > >>>>>>>>>>>> that TableAggregateFunction emits a "table" while > >>>>>>> AggregateFunction > >>>>>>>>>>> outputs > >>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it > >> only > >>>>>>> has > >>>>>>>>> one > >>>>>>>>>>>> mode which is “replacing” (complete update). But for > >>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit the > >>>>>>> new > >>>>>>>>>>> updated > >>>>>>>>>>>> results) update or complete update (always emit the entire > >>>>>>>>>>>> table > >>>>>>>> when > >>>>>>>>>>>> “emit" is triggered). From the performance perspective, we > >>>>>>>>>>>> might > >>>>>>>>> want > >>>>>>>>>> to > >>>>>>>>>>>> use incremental update. But we need review and design this > >>>>>>>> carefully, > >>>>>>>>>>>> especially taking into account the cases of the failover > >>>>>>>>>>>> (instead > >>>>>>>> of > >>>>>>>>>> just > >>>>>>>>>>>> back-up the ACC it may also needs to remember the emit offset) > >>>>>>> and > >>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit > >>>>>>>>>>>> are > >>>>>>>>>>> different > >>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it does > >>>>>>> not > >>>>>>>>> need > >>>>>>>>>>> to > >>>>>>>>>>>> worry this due to the nature of stateless. > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> Shaoxuan > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang > >>>>>>>>>>>> <xiaow...@gmail.com > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Jincheng, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a > >>>>>>> very > >>>>>>>>>> good > >>>>>>>>>>>>> start. There are a few points that we need to have more > >>>>>>>>> discussions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast, > >>>>>>>>>> definitely > >>>>>>>>>>>> the > >>>>>>>>>>>>> most complex user defined objects we introduced so far. I > >>>>>>>> think > >>>>>>>>>>> there > >>>>>>>>>>>>> are > >>>>>>>>>>>>> quite some interesting questions here. For example, do we > >>>>>>>> allow > >>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the > >>>>>>>> semantics > >>>>>>>>> of > >>>>>>>>>>>>> emit? Is > >>>>>>>>>>>>> it amendments to the previous output, or replacing it? I > >>>>>>> think > >>>>>>>>>> that > >>>>>>>>>>>> this > >>>>>>>>>>>>> subject itself is worth a discussion to make sure we get > >> the > >>>>>>>>>> details > >>>>>>>>>>>>> right. > >>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically > >>>>>>> appear > >>>>>>>> in > >>>>>>>>>> the > >>>>>>>>>>>>> output? how about the case of windowing aggregation? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Xiaowei > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > >>>>>>>>>> sunjincheng...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, Xiaowei, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement > >>>>>>>> Outline > >>>>>>>>> ! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I quickly looked at the overall content, these are good > >>>>>>>>> expressions > >>>>>>>>>>> of > >>>>>>>>>>>>> our > >>>>>>>>>>>>>> offline discussions. But from the points of my view, we > >>>>>>> should > >>>>>>>>> add > >>>>>>>>>>> the > >>>>>>>>>>>>>> usage of public interfaces that we will introduce in this > >>>>>>>>> propose. > >>>>>>>>>>>> So, I > >>>>>>>>>>>>>> added the following usage description of interface and > >>>>>>>> operators > >>>>>>>>>> in > >>>>>>>>>>>>>> google doc: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Map Operator > >>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator can > >>>>>>>>>> apply a > >>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as > >>>>>>>>> follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > >>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. FlatMap Operator > >>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap > >>>>>>>> operator > >>>>>>>>>> can > >>>>>>>>>>>>> apply > >>>>>>>>>>>>>> a table function, and can return multi-row. The usage as > >>>>>>>> follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > >>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3. Agg Operator > >>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg > >>>>>>>>>>> operator > >>>>>>>>>>>>> can > >>>>>>>>>>>>>> apply a aggregate function, and can return multi-column. The > >>>>>>>>> usage > >>>>>>>>>> as > >>>>>>>>>>>>>> follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > >>>>>>> global > >>>>>>>>>>>>> aggregates > >>>>>>>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > >>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4. FlatAgg Operator > >>>>>>>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, > >>>>>>>>>> FaltAgg > >>>>>>>>>>>>>> operator can apply a table aggregate function, and can > >> return > >>>>>>>>>>>> multi-row. > >>>>>>>>>>>>>> The usage as follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > >>>>>>>> global > >>>>>>>>>>> table > >>>>>>>>>>>>>> aggregates > >>>>>>>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > >>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 5. TableAggregateFunction > >>>>>>>>>>>>>> The behavior of table aggregates is most like > >>>>>>>>>>> GroupReduceFunction > >>>>>>>>>>>>> did, > >>>>>>>>>>>>>> which computed for a group of elements, and output a group > >>>>>>> of > >>>>>>>>>>>> elements. > >>>>>>>>>>>>>> The TableAggregateFunction can be applied on > >>>>>>>>>> GroupedTable.flatAgg() . > >>>>>>>>>>>> The > >>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content, so > >>>>>>> I > >>>>>>>>>> don't > >>>>>>>>>>>> copy > >>>>>>>>>>>>>> it here, Please look at the detail in google doc: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and > >>>>>>>> commenting. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>>> > >>> > >> > >> > >> -- > >> > >> > ----------------------------------------------------------------------------------- > >> > >> *Rome was not built in one day* > >> > >> > >> > ----------------------------------------------------------------------------------- > >> > >