Hi, Isn’t the problem of multiple expressions limited only to `flat***` functions and to be more specific only to having two (or more) different table functions passed as an expressions? `.flatAgg(TableAggA('a), scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined (duplicate result of every scalar function to every record. Or am I missing something?
Another remark, I would be in favour of not using abbreviations and naming `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. Piotrek > On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Jincheng, > > I said before, that I think that the append() method is better than > implicitly forwarding keys, but still, I believe it adds unnecessary boiler > plate code. > > Moreover, I haven't seen a convincing argument why map(Expression*) is > worse than map(Expression). In either case we need to do all kinds of > checks to prevent invalid use of functions. > If the method is not correctly used, we can emit a good error message and > documenting map(Expression*) will be easier than map(append(Expression*)), > in my opinion. > I think we should not add unnessary syntax unless there is a good reason > and to be honest, I haven't seen this reason yet. > > Regarding the groupBy.agg() method, I think it should behave just like any > other method, i.e., not do any implicit forwarding. > Let's take the example of the windowed group by, that you posted before. > > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > .select('k1, 'col1, 'w.rowtime as 'rtime) > > What happens if 'w.rowtime is not selected? What is the data type of the > field 'w in the resulting Table? Is it a regular field at all or just a > system field that disappears if it is not selected? > > IMO, the following syntax is shorter, more explicit, and better aligned > with the regular window.groupBy.select aggregations that are supported > today. > > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) > > > Best, Fabian > > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < > sunjincheng...@gmail.com>: > >> Hi Fabian/Xiaowei, >> >> I am very sorry for my late reply! Glad to see your reply, and sounds >> pretty good! >> I agree that the approach with append() which can clearly defined the >> result schema is better which Fabian mentioned. >> In addition and append() and also contains non-time attributes, e.g.: >> >> tab('name, 'age, 'address, 'rowtime) >> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >> 'address, 'rowtime) >> .window(Tumble over 5.millis on 'rowtime as 'w) >> .groupBy('w, 'address) >> >> In this way the append() is very useful, and the behavior is very similar >> to withForwardedFields() in DataSet. >> So +1 to using append() approach for the map()&flatmap()! >> >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree >> Xiaowei's approach that define the keys to be implied in the result table >> and appears at the beginning, for example as follows: >> tab.window(Tumble ... as 'w) >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >> What to you think? @Fabian @Xiaowei >> >> Thanks, >> Jincheng >> >> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道: >> >>> Hi Jincheng, >>> >>> Thanks for the summary! >>> I like the approach with append() better than the implicit forwarding as >> it >>> clearly indicates which fields are forwarded. >>> However, I don't see much benefit over the flatMap(Expression*) variant, >> as >>> we would still need to analyze the full expression tree to ensure that at >>> most (or exactly?) one Scalar / TableFunction is used. >>> >>> Best, >>> Fabian >>> >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >>> sunjincheng...@gmail.com>: >>> >>>> Hi all, >>>> >>>> We are discussing very detailed content about this proposal. We are >>> trying >>>> to design the API in many aspects (functionality, compatibility, ease >> of >>>> use, etc.). I think this is a very good process. Only such a detailed >>>> discussion, In order to develop PR more clearly and smoothly in the >> later >>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot of >>>> good ideas. >>>> About the definition of method signatures I want to share my points >> here >>>> which I am discussing with fabian in google doc (not yet completed), as >>>> follows: >>>> >>>> Assume we have a table: >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, >>>> 'proctime.proctime) >>>> >>>> Approach 1: >>>> case1: Map follows Source Table >>>> val result = >>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied >> in >>>> the output >>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>> >>>> case2: FatAgg follows Window (Fabian mentioned above) >>>> val result = >>>> tab.window(Tumble ... as 'w) >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>> >>>> Approach 2: Similar to Fabian‘s approach, which the result schema would >>> be >>>> clearly defined, but add a built-in append UDF. That make >>>> map/flatmap/agg/flatAgg interface only accept one Expression. >>>> val result = >>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, >>>> 'long, 'proctime) >>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>> >>>> Note: Append is a special UDF for built-in that can pass through any >>>> column. >>>> >>>> So, May be we can defined the as table.map(Expression) first, If >>>> necessary, we can extend to table.map(Expression*) in the future ? Of >>>> course, I also hope that we can do more perfection in this proposal >>> through >>>> discussion. >>>> >>>> Thanks, >>>> Jincheng >>>> >>>> >>>> >>>> >>>> >>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道: >>>> >>>>> Hi Fabian, >>>>> >>>>> I think that the key question you raised is if we allow extra >>> parameters >>>> in >>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that may >>>> appear >>>>> more convenient in some cases. However, it might also cause some >>>> confusions >>>>> if we do that. For example, do we allow multiple UDFs in these >>>> expressions? >>>>> If we do, the semantics may be weird to define, e.g. what does >>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even >>> though >>>>> not allowing it may appear less powerful, but it can make things more >>>>> intuitive too. In the case of agg/flatAgg, we can define the keys to >> be >>>>> implied in the result table and appears at the beginning. You can >> use a >>>>> select method if you want to modify this behavior. I think that >>>> eventually >>>>> we will have some API which allows other expressions as additional >>>>> parameters, but I think it's better to do that after we introduce the >>>>> concept of nested tables. A lot of things we suggested here can be >>>>> considered as special cases of that. But things are much simpler if >> we >>>>> leave that to later. >>>>> >>>>> Regards, >>>>> Xiaowei >>>>> >>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com> >>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> * Re emit: >>>>>> I think we should start with a well understood semantics of full >>>>>> replacement. This is how the other agg functions work. >>>>>> As was said before, there are open questions regarding an append >> mode >>>>>> (checkpointing, whether supporting retractions or not and if yes >> how >>> to >>>>>> declare them, ...). >>>>>> Since this seems to be an optimization, I'd postpone it. >>>>>> >>>>>> * Re grouping keys: >>>>>> I don't think we should automatically add them because the result >>>> schema >>>>>> would not be intuitive. >>>>>> Would they be added at the beginning of the tuple or at the end? >> What >>>>>> metadata fields of windows would be added? In which order would >> they >>> be >>>>>> added? >>>>>> >>>>>> However, we could support syntax like this: >>>>>> val t: Table = ??? >>>>>> t >>>>>> .window(Tumble ... as 'w) >>>>>> .groupBy('a, 'b) >>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as >>>> 'rtime) >>>>>> >>>>>> The result schema would be clearly defined as [b, a, f1, f2, ..., >> fn, >>>>> wend, >>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. >>>>>> >>>>>> * Re Multi-staged evaluation: >>>>>> I think this should be an optimization that can be applied if the >> UDF >>>>>> implements the merge() method. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >>>>>> wshaox...@gmail.com >>>>>>> : >>>>>> >>>>>>> Hi xiaowei, >>>>>>> >>>>>>> Yes, I agree with you that the semantics of >> TableAggregateFunction >>>> emit >>>>>> is >>>>>>> much more complex than AggregateFunction. The fundamental >>> difference >>>> is >>>>>>> that TableAggregateFunction emits a "table" while >> AggregateFunction >>>>>> outputs >>>>>>> (a column of) a "row". In the case of AggregateFunction it only >> has >>>> one >>>>>>> mode which is “replacing” (complete update). But for >>>>>>> TableAggregateFunction, it could be incremental (only emit the >> new >>>>>> updated >>>>>>> results) update or complete update (always emit the entire table >>> when >>>>>>> “emit" is triggered). From the performance perspective, we might >>>> want >>>>> to >>>>>>> use incremental update. But we need review and design this >>> carefully, >>>>>>> especially taking into account the cases of the failover (instead >>> of >>>>> just >>>>>>> back-up the ACC it may also needs to remember the emit offset) >> and >>>>>>> retractions, as the semantics of TableAggregateFunction emit are >>>>>> different >>>>>>> than other UDFs. TableFunction also emits a table, but it does >> not >>>> need >>>>>> to >>>>>>> worry this due to the nature of stateless. >>>>>>> >>>>>>> Regards, >>>>>>> Shaoxuan >>>>>>> >>>>>>> >>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <xiaow...@gmail.com >>> >>>>> wrote: >>>>>>> >>>>>>>> Hi Jincheng, >>>>>>>> >>>>>>>> Thanks for adding the public interfaces! I think that it's a >> very >>>>> good >>>>>>>> start. There are a few points that we need to have more >>>> discussions. >>>>>>>> >>>>>>>> - TableAggregateFunction - this is a very complex beast, >>>>> definitely >>>>>>> the >>>>>>>> most complex user defined objects we introduced so far. I >>> think >>>>>> there >>>>>>>> are >>>>>>>> quite some interesting questions here. For example, do we >>> allow >>>>>>>> multi-staged TableAggregate in this case? What is the >>> semantics >>>> of >>>>>>>> emit? Is >>>>>>>> it amendments to the previous output, or replacing it? I >> think >>>>> that >>>>>>> this >>>>>>>> subject itself is worth a discussion to make sure we get the >>>>> details >>>>>>>> right. >>>>>>>> - GroupedTable.agg - does the group keys automatically >> appear >>> in >>>>> the >>>>>>>> output? how about the case of windowing aggregation? >>>>>>>> >>>>>>>> Regards, >>>>>>>> Xiaowei >>>>>>>> >>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >>>>> sunjincheng...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, Xiaowei, >>>>>>>>> >>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >>> Outline >>>> ! >>>>>>>>> >>>>>>>>> I quickly looked at the overall content, these are good >>>> expressions >>>>>> of >>>>>>>> our >>>>>>>>> offline discussions. But from the points of my view, we >> should >>>> add >>>>>> the >>>>>>>>> usage of public interfaces that we will introduce in this >>>> propose. >>>>>>> So, I >>>>>>>>> added the following usage description of interface and >>> operators >>>>> in >>>>>>>>> google doc: >>>>>>>>> >>>>>>>>> 1. Map Operator >>>>>>>>> Map operator is a new operator of Table, Map operator can >>>>> apply a >>>>>>>>> scalar function, and can return multi-column. The usage as >>>> follows: >>>>>>>>> >>>>>>>>> val res = tab >>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >>>>>>>>> .select(‘a, ‘c) >>>>>>>>> >>>>>>>>> 2. FlatMap Operator >>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >>> operator >>>>> can >>>>>>>> apply >>>>>>>>> a table function, and can return multi-row. The usage as >>> follows: >>>>>>>>> >>>>>>>>> val res = tab >>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >>>>>>>>> .select(‘a, ‘c) >>>>>>>>> >>>>>>>>> 3. Agg Operator >>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg >>>>>> operator >>>>>>>> can >>>>>>>>> apply a aggregate function, and can return multi-column. The >>>> usage >>>>> as >>>>>>>>> follows: >>>>>>>>> >>>>>>>>> val res = tab >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> global >>>>>>>> aggregates >>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>> .select(‘a, ‘c) >>>>>>>>> >>>>>>>>> 4. FlatAgg Operator >>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, >>>>> FaltAgg >>>>>>>>> operator can apply a table aggregate function, and can return >>>>>>> multi-row. >>>>>>>>> The usage as follows: >>>>>>>>> >>>>>>>>> val res = tab >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >>> global >>>>>> table >>>>>>>>> aggregates >>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>> .select(‘a, ‘c) >>>>>>>>> >>>>>>>>> 5. TableAggregateFunction >>>>>>>>> The behavior of table aggregates is most like >>>>>> GroupReduceFunction >>>>>>>> did, >>>>>>>>> which computed for a group of elements, and output a group >> of >>>>>>> elements. >>>>>>>>> The TableAggregateFunction can be applied on >>>>> GroupedTable.flatAgg() . >>>>>>> The >>>>>>>>> interface of TableAggregateFunction has a lot of content, so >> I >>>>> don't >>>>>>> copy >>>>>>>>> it here, Please look at the detail in google doc: >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >>>>>>>>> >>>>>>>>> I will be very appreciate to anyone for reviewing and >>> commenting. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jincheng >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>