+1. I agree that we should open the JIRAs to start the work. We may have better ideas on the flavor of the interface when implement/review the code.
Regards, shaoxuan On 11/20/18, jincheng sun <sunjincheng...@gmail.com> wrote: > Hi all, > > Thanks all for the feedback. > > @Piotr About not using abbreviations naming, +1,I like > your proposal!Currently both DataSet and DataStream API are using > `aggregate`, > BTW,I find other language also not using abbreviations naming,such as R. > > Sometimes the interface of the API is really difficult to perfect, we need > to spend a lot of time thinking and feedback from a large number of users, > and constantly improve, but for backward compatibility issues, we have to > adopt the most conservative approach when designing the API(Of course, I am > more in favor of developing more rich features, when we discuss clearly). > Therefore, I propose to divide the function implementation of > map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that > support time attributes and groupKeys. We can develop the features which > we have already agreed on the design. And we will continue to discuss the > uncertain design. > > In fact, in addition to the design of APIs, there will be various > performance optimization details, such as: table Aggregate function > emitValue will generate multiple calculation results, in extreme cases, > each record will trigger a large number of retract messages, this will have > poor performance,so we will also optimize the interface design, such as > adding the emitWithRetractValue interface (I have updated the google doc) > to allow the user to optionally perform incremental calculations, thus > avoiding a large number of retracts. Details like this are difficult to > fully discuss in the mail list, so I recommend creating JIRAs/FLIP first, > we develop designs that have been agreed upon and continue to discuss > non-deterministic designs! What do you think? @Fabian & Piotr & XiaoWei > > Best, > Jincheng > > Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一 上午12:07写道: > >> Hi Fabian & Piotr, thanks for the feedback! >> >> I appreciate your concerns, both on timestamp attributes as well as on >> implicit group keys. At the same time, I'm also concerned with the >> proposed >> approach of allowing Expression* as parameters, especially for >> flatMap/flatAgg. So far, we never allowed a scalar expression to appear >> together with table expressions. With the Expression* approach, this will >> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on if >> we >> fully understand the consequences when we try to extend our system in the >> future. I would be extra cautious in doing this. To avoid this, I think >> an >> implicit group key for flatAgg is safer. For flatMap, if users want to >> keep >> the rowtime column, he can use crossApply/join instead. So we are not >> losing any real functionality here. >> >> Also a clarification on the following example: >> tab.window(Tumble ... as 'w) >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> .select('k1, 'col1, 'w.rowtime as 'rtime) >> If we did not have the select clause in this example, we will have 'w as >> a >> regular column in the output. It should not magically disappear. >> >> The concern is not as strong for Table.map/Table.agg because we are not >> mixing scalar and table expressions. But we also want to be a bit >> consistent with these methods. If we used implicit group keys for >> Table.flatAgg, we probably should do the same for Table.agg. Now we only >> have to choose what to do with Table.map. I can see good arguments from >> both sides. But starting with a single Expression seems safer because >> that >> we can always extend to Expression* in the future. >> >> While thinking about this problem, it appears that we may need more work >> in >> our handling of watermarks for SQL/Table API. Our current way of >> propagating the watermarks from source all the way to sink might not be >> optimal. For example, after a tumbling window, the watermark can actually >> be advanced to just before the expiring of next window. I think that in >> general, each operator may need to generate new watermarks instead of >> simply propagating them. Once we accept that watermarks may change during >> the execution, it appears that the timestamp columns may also change, as >> long as we have some way to associate watermark with it. My intuition is >> that once we have a through solution for the watermark issue, we may be >> able to solve the problem we encountered for Table.map in a cleaner way. >> But this is a complex issue which deserves a discussion on its own. >> >> Regards, >> Xiaowei >> >> >> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >> > Hi, >> > >> > Isn’t the problem of multiple expressions limited only to `flat***` >> > functions and to be more specific only to having two (or more) >> > different >> > table functions passed as an expressions? `.flatAgg(TableAggA('a), >> > scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined >> > (duplicate result of every scalar function to every record. Or am I >> missing >> > something? >> > >> > Another remark, I would be in favour of not using abbreviations and >> naming >> > `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. >> > >> > Piotrek >> > >> > > On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com> wrote: >> > > >> > > Hi Jincheng, >> > > >> > > I said before, that I think that the append() method is better than >> > > implicitly forwarding keys, but still, I believe it adds unnecessary >> > boiler >> > > plate code. >> > > >> > > Moreover, I haven't seen a convincing argument why map(Expression*) >> > > is >> > > worse than map(Expression). In either case we need to do all kinds of >> > > checks to prevent invalid use of functions. >> > > If the method is not correctly used, we can emit a good error message >> and >> > > documenting map(Expression*) will be easier than >> > map(append(Expression*)), >> > > in my opinion. >> > > I think we should not add unnessary syntax unless there is a good >> reason >> > > and to be honest, I haven't seen this reason yet. >> > > >> > > Regarding the groupBy.agg() method, I think it should behave just >> > > like >> > any >> > > other method, i.e., not do any implicit forwarding. >> > > Let's take the example of the windowed group by, that you posted >> before. >> > > >> > > tab.window(Tumble ... as 'w) >> > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> > > .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> > > .select('k1, 'col1, 'w.rowtime as 'rtime) >> > > >> > > What happens if 'w.rowtime is not selected? What is the data type of >> the >> > > field 'w in the resulting Table? Is it a regular field at all or just >> > > a >> > > system field that disappears if it is not selected? >> > > >> > > IMO, the following syntax is shorter, more explicit, and better >> > > aligned >> > > with the regular window.groupBy.select aggregations that are >> > > supported >> > > today. >> > > >> > > tab.window(Tumble ... as 'w) >> > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> > > .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) >> > > >> > > >> > > Best, Fabian >> > > >> > > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < >> > > sunjincheng...@gmail.com>: >> > > >> > >> Hi Fabian/Xiaowei, >> > >> >> > >> I am very sorry for my late reply! Glad to see your reply, and >> > >> sounds >> > >> pretty good! >> > >> I agree that the approach with append() which can clearly defined >> > >> the >> > >> result schema is better which Fabian mentioned. >> > >> In addition and append() and also contains non-time attributes, >> > >> e.g.: >> > >> >> > >> tab('name, 'age, 'address, 'rowtime) >> > >> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >> > >> 'address, 'rowtime) >> > >> .window(Tumble over 5.millis on 'rowtime as 'w) >> > >> .groupBy('w, 'address) >> > >> >> > >> In this way the append() is very useful, and the behavior is very >> > similar >> > >> to withForwardedFields() in DataSet. >> > >> So +1 to using append() approach for the map()&flatmap()! >> > >> >> > >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree >> > >> Xiaowei's approach that define the keys to be implied in the result >> > table >> > >> and appears at the beginning, for example as follows: >> > >> tab.window(Tumble ... as 'w) >> > >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> > >> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> > >> .select('k1, 'col1, 'w.rowtime as 'rtime) >> > >> >> > >> What to you think? @Fabian @Xiaowei >> > >> >> > >> Thanks, >> > >> Jincheng >> > >> >> > >> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道: >> > >> >> > >>> Hi Jincheng, >> > >>> >> > >>> Thanks for the summary! >> > >>> I like the approach with append() better than the implicit >> > >>> forwarding >> > as >> > >> it >> > >>> clearly indicates which fields are forwarded. >> > >>> However, I don't see much benefit over the flatMap(Expression*) >> > variant, >> > >> as >> > >>> we would still need to analyze the full expression tree to ensure >> that >> > at >> > >>> most (or exactly?) one Scalar / TableFunction is used. >> > >>> >> > >>> Best, >> > >>> Fabian >> > >>> >> > >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >> > >>> sunjincheng...@gmail.com>: >> > >>> >> > >>>> Hi all, >> > >>>> >> > >>>> We are discussing very detailed content about this proposal. We >> > >>>> are >> > >>> trying >> > >>>> to design the API in many aspects (functionality, compatibility, >> ease >> > >> of >> > >>>> use, etc.). I think this is a very good process. Only such a >> detailed >> > >>>> discussion, In order to develop PR more clearly and smoothly in >> > >>>> the >> > >> later >> > >>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a >> > >>>> lot >> > of >> > >>>> good ideas. >> > >>>> About the definition of method signatures I want to share my >> > >>>> points >> > >> here >> > >>>> which I am discussing with fabian in google doc (not yet >> > >>>> completed), >> > as >> > >>>> follows: >> > >>>> >> > >>>> Assume we have a table: >> > >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, >> > >>>> 'proctime.proctime) >> > >>>> >> > >>>> Approach 1: >> > >>>> case1: Map follows Source Table >> > >>>> val result = >> > >>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime >> implied >> > >> in >> > >>>> the output >> > >>>> .window(Tumble over 5.millis on 'proctime as 'w) >> > >>>> >> > >>>> case2: FatAgg follows Window (Fabian mentioned above) >> > >>>> val result = >> > >>>> tab.window(Tumble ... as 'w) >> > >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> > >>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >> > >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> > >>>> >> > >>>> Approach 2: Similar to Fabian‘s approach, which the result schema >> > would >> > >>> be >> > >>>> clearly defined, but add a built-in append UDF. That make >> > >>>> map/flatmap/agg/flatAgg interface only accept one Expression. >> > >>>> val result = >> > >>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, >> > >>>> 'col2, >> > >>>> 'long, 'proctime) >> > >>>> .window(Tumble over 5.millis on 'proctime as 'w) >> > >>>> >> > >>>> Note: Append is a special UDF for built-in that can pass through >> > >>>> any >> > >>>> column. >> > >>>> >> > >>>> So, May be we can defined the as table.map(Expression) first, If >> > >>>> necessary, we can extend to table.map(Expression*) in the future >> > >>>> ? >> > Of >> > >>>> course, I also hope that we can do more perfection in this >> > >>>> proposal >> > >>> through >> > >>>> discussion. >> > >>>> >> > >>>> Thanks, >> > >>>> Jincheng >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道: >> > >>>> >> > >>>>> Hi Fabian, >> > >>>>> >> > >>>>> I think that the key question you raised is if we allow extra >> > >>> parameters >> > >>>> in >> > >>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that >> may >> > >>>> appear >> > >>>>> more convenient in some cases. However, it might also cause some >> > >>>> confusions >> > >>>>> if we do that. For example, do we allow multiple UDFs in these >> > >>>> expressions? >> > >>>>> If we do, the semantics may be weird to define, e.g. what does >> > >>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? >> > >>>>> Even >> > >>> though >> > >>>>> not allowing it may appear less powerful, but it can make things >> more >> > >>>>> intuitive too. In the case of agg/flatAgg, we can define the keys >> to >> > >> be >> > >>>>> implied in the result table and appears at the beginning. You can >> > >> use a >> > >>>>> select method if you want to modify this behavior. I think that >> > >>>> eventually >> > >>>>> we will have some API which allows other expressions as >> > >>>>> additional >> > >>>>> parameters, but I think it's better to do that after we introduce >> the >> > >>>>> concept of nested tables. A lot of things we suggested here can >> > >>>>> be >> > >>>>> considered as special cases of that. But things are much simpler >> > >>>>> if >> > >> we >> > >>>>> leave that to later. >> > >>>>> >> > >>>>> Regards, >> > >>>>> Xiaowei >> > >>>>> >> > >>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com> >> > >>> wrote: >> > >>>>> >> > >>>>>> Hi, >> > >>>>>> >> > >>>>>> * Re emit: >> > >>>>>> I think we should start with a well understood semantics of full >> > >>>>>> replacement. This is how the other agg functions work. >> > >>>>>> As was said before, there are open questions regarding an append >> > >> mode >> > >>>>>> (checkpointing, whether supporting retractions or not and if yes >> > >> how >> > >>> to >> > >>>>>> declare them, ...). >> > >>>>>> Since this seems to be an optimization, I'd postpone it. >> > >>>>>> >> > >>>>>> * Re grouping keys: >> > >>>>>> I don't think we should automatically add them because the >> > >>>>>> result >> > >>>> schema >> > >>>>>> would not be intuitive. >> > >>>>>> Would they be added at the beginning of the tuple or at the end? >> > >> What >> > >>>>>> metadata fields of windows would be added? In which order would >> > >> they >> > >>> be >> > >>>>>> added? >> > >>>>>> >> > >>>>>> However, we could support syntax like this: >> > >>>>>> val t: Table = ??? >> > >>>>>> t >> > >>>>>> .window(Tumble ... as 'w) >> > >>>>>> .groupBy('a, 'b) >> > >>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as >> > >>>> 'rtime) >> > >>>>>> >> > >>>>>> The result schema would be clearly defined as [b, a, f1, f2, >> > >>>>>> ..., >> > >> fn, >> > >>>>> wend, >> > >>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. >> > >>>>>> >> > >>>>>> * Re Multi-staged evaluation: >> > >>>>>> I think this should be an optimization that can be applied if >> > >>>>>> the >> > >> UDF >> > >>>>>> implements the merge() method. >> > >>>>>> >> > >>>>>> Best, Fabian >> > >>>>>> >> > >>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >> > >>>>>> wshaox...@gmail.com >> > >>>>>>> : >> > >>>>>> >> > >>>>>>> Hi xiaowei, >> > >>>>>>> >> > >>>>>>> Yes, I agree with you that the semantics of >> > >> TableAggregateFunction >> > >>>> emit >> > >>>>>> is >> > >>>>>>> much more complex than AggregateFunction. The fundamental >> > >>> difference >> > >>>> is >> > >>>>>>> that TableAggregateFunction emits a "table" while >> > >> AggregateFunction >> > >>>>>> outputs >> > >>>>>>> (a column of) a "row". In the case of AggregateFunction it only >> > >> has >> > >>>> one >> > >>>>>>> mode which is “replacing” (complete update). But for >> > >>>>>>> TableAggregateFunction, it could be incremental (only emit the >> > >> new >> > >>>>>> updated >> > >>>>>>> results) update or complete update (always emit the entire >> > >>>>>>> table >> > >>> when >> > >>>>>>> “emit" is triggered). From the performance perspective, we >> > >>>>>>> might >> > >>>> want >> > >>>>> to >> > >>>>>>> use incremental update. But we need review and design this >> > >>> carefully, >> > >>>>>>> especially taking into account the cases of the failover >> > >>>>>>> (instead >> > >>> of >> > >>>>> just >> > >>>>>>> back-up the ACC it may also needs to remember the emit offset) >> > >> and >> > >>>>>>> retractions, as the semantics of TableAggregateFunction emit >> > >>>>>>> are >> > >>>>>> different >> > >>>>>>> than other UDFs. TableFunction also emits a table, but it does >> > >> not >> > >>>> need >> > >>>>>> to >> > >>>>>>> worry this due to the nature of stateless. >> > >>>>>>> >> > >>>>>>> Regards, >> > >>>>>>> Shaoxuan >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang >> > >>>>>>> <xiaow...@gmail.com >> > >>> >> > >>>>> wrote: >> > >>>>>>> >> > >>>>>>>> Hi Jincheng, >> > >>>>>>>> >> > >>>>>>>> Thanks for adding the public interfaces! I think that it's a >> > >> very >> > >>>>> good >> > >>>>>>>> start. There are a few points that we need to have more >> > >>>> discussions. >> > >>>>>>>> >> > >>>>>>>> - TableAggregateFunction - this is a very complex beast, >> > >>>>> definitely >> > >>>>>>> the >> > >>>>>>>> most complex user defined objects we introduced so far. I >> > >>> think >> > >>>>>> there >> > >>>>>>>> are >> > >>>>>>>> quite some interesting questions here. For example, do we >> > >>> allow >> > >>>>>>>> multi-staged TableAggregate in this case? What is the >> > >>> semantics >> > >>>> of >> > >>>>>>>> emit? Is >> > >>>>>>>> it amendments to the previous output, or replacing it? I >> > >> think >> > >>>>> that >> > >>>>>>> this >> > >>>>>>>> subject itself is worth a discussion to make sure we get the >> > >>>>> details >> > >>>>>>>> right. >> > >>>>>>>> - GroupedTable.agg - does the group keys automatically >> > >> appear >> > >>> in >> > >>>>> the >> > >>>>>>>> output? how about the case of windowing aggregation? >> > >>>>>>>> >> > >>>>>>>> Regards, >> > >>>>>>>> Xiaowei >> > >>>>>>>> >> > >>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >> > >>>>> sunjincheng...@gmail.com> >> > >>>>>>>> wrote: >> > >>>>>>>> >> > >>>>>>>>> Hi, Xiaowei, >> > >>>>>>>>> >> > >>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >> > >>> Outline >> > >>>> ! >> > >>>>>>>>> >> > >>>>>>>>> I quickly looked at the overall content, these are good >> > >>>> expressions >> > >>>>>> of >> > >>>>>>>> our >> > >>>>>>>>> offline discussions. But from the points of my view, we >> > >> should >> > >>>> add >> > >>>>>> the >> > >>>>>>>>> usage of public interfaces that we will introduce in this >> > >>>> propose. >> > >>>>>>> So, I >> > >>>>>>>>> added the following usage description of interface and >> > >>> operators >> > >>>>> in >> > >>>>>>>>> google doc: >> > >>>>>>>>> >> > >>>>>>>>> 1. Map Operator >> > >>>>>>>>> Map operator is a new operator of Table, Map operator can >> > >>>>> apply a >> > >>>>>>>>> scalar function, and can return multi-column. The usage as >> > >>>> follows: >> > >>>>>>>>> >> > >>>>>>>>> val res = tab >> > >>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >> > >>>>>>>>> .select(‘a, ‘c) >> > >>>>>>>>> >> > >>>>>>>>> 2. FlatMap Operator >> > >>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >> > >>> operator >> > >>>>> can >> > >>>>>>>> apply >> > >>>>>>>>> a table function, and can return multi-row. The usage as >> > >>> follows: >> > >>>>>>>>> >> > >>>>>>>>> val res = tab >> > >>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >> > >>>>>>>>> .select(‘a, ‘c) >> > >>>>>>>>> >> > >>>>>>>>> 3. Agg Operator >> > >>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg >> > >>>>>> operator >> > >>>>>>>> can >> > >>>>>>>>> apply a aggregate function, and can return multi-column. The >> > >>>> usage >> > >>>>> as >> > >>>>>>>>> follows: >> > >>>>>>>>> >> > >>>>>>>>> val res = tab >> > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> > >> global >> > >>>>>>>> aggregates >> > >>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >> > >>>>>>>>> .select(‘a, ‘c) >> > >>>>>>>>> >> > >>>>>>>>> 4. FlatAgg Operator >> > >>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, >> > >>>>> FaltAgg >> > >>>>>>>>> operator can apply a table aggregate function, and can return >> > >>>>>>> multi-row. >> > >>>>>>>>> The usage as follows: >> > >>>>>>>>> >> > >>>>>>>>> val res = tab >> > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> > >>> global >> > >>>>>> table >> > >>>>>>>>> aggregates >> > >>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >> > >>>>>>>>> .select(‘a, ‘c) >> > >>>>>>>>> >> > >>>>>>>>> 5. TableAggregateFunction >> > >>>>>>>>> The behavior of table aggregates is most like >> > >>>>>> GroupReduceFunction >> > >>>>>>>> did, >> > >>>>>>>>> which computed for a group of elements, and output a group >> > >> of >> > >>>>>>> elements. >> > >>>>>>>>> The TableAggregateFunction can be applied on >> > >>>>> GroupedTable.flatAgg() . >> > >>>>>>> The >> > >>>>>>>>> interface of TableAggregateFunction has a lot of content, so >> > >> I >> > >>>>> don't >> > >>>>>>> copy >> > >>>>>>>>> it here, Please look at the detail in google doc: >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>> >> > >>>>>>> >> > >>>>>> >> > >>>>> >> > >>>> >> > >>> >> > >> >> > >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >> > >>>>>>>>> >> > >>>>>>>>> I will be very appreciate to anyone for reviewing and >> > >>> commenting. >> > >>>>>>>>> >> > >>>>>>>>> Best, >> > >>>>>>>>> Jincheng >> > >>>>>>>>> >> > >>>>>>>> >> > >>>>>>> >> > >>>>>> >> > >>>>> >> > >>>> >> > >>> >> > >> >> > >> > >> > -- ----------------------------------------------------------------------------------- *Rome was not built in one day* -----------------------------------------------------------------------------------