Hi shaoxuan & Hequn, Thanks for your suggestion,I'll file the JIRAs later. We can prepare PRs while continuing to move forward the ongoing discussion.
Regards, Jincheng jincheng sun <sunjincheng...@gmail.com> 于2018年11月21日周三 下午7:07写道: > Hi Piotrek, > Thanks for your feedback, and thanks for share your thoughts! > > #1) No,watermark solves the issue of the late event. Here, the performance > problem is caused by the update emit mode. i.e.: When current calculation > result is output, the previous calculation result needs to be retracted. > #2) As I mentioned above we should continue the discussion until we solve > the problems raised by Xiaowei and Fabian. > #3)I still hope to keep the simplicity that select only support projected > scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b, > flatmap('d)). > > Thanks, > Jincheng > > Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午5:24写道: > >> Hi, >> >> 1. >> >> > In fact, in addition to the design of APIs, there will be various >> > performance optimization details, such as: table Aggregate function >> > emitValue will generate multiple calculation results, in extreme cases, >> > each record will trigger a large number of retract messages, this will >> have >> > poor performance >> >> Can this be solved/mitigated by emitting the results only on watermarks? >> I think that was the path that we decided to take both for Temporal Joins >> and upsert stream conversion. I know that this increases the latency and >> there is a place for a future global setting/user preference “emit the data >> ASAP mode”, but emitting only on watermarks seems to me as a better/more >> sane default. >> >> 2. >> >> With respect to the API discussion and implicit columns. The problem for >> me so far is I’m not sure if I like the additionally complexity of >> `append()` solution, while implicit columns are definitely not in the >> spirit of SQL. Neither joins nor aggregations add extra unexpected columns >> to the result without asking. This definitely can be confusing for the >> users since it brakes the convention. Thus I would lean towards Fabian’s >> proposal of multi-argument `map(Expression*)` from those 3 options. >> >> 3. >> >> Another topic is that I’m not 100% convinced that we should be adding new >> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think >> the same could be achieved by changing >> >> table.map(F('x)) >> >> into >> >> table.select(F('x)).unnest() >> or >> table.select(F('x).unnest()) >> >> Where `unnest()` means unnest row/tuple type into a columnar table. >> >> table.flatMap(F('x)) >> >> Could be on the other hand also handled by >> >> table.select(F('x)) >> >> By correctly deducing that F(x) is a multi row output function >> >> Same might apply to `aggregate(F('x))`, but this maybe could be replaced >> by: >> >> table.groupBy(…).select(F('x).unnest()) >> >> Adding scalar functions should also be possible: >> >> table.groupBy('k).select(F('x).unnest(), ‘k) >> >> Maybe such approach would allow us to implement the same features in the >> SQL as well? >> >> Piotrek >> >> > On 21 Nov 2018, at 09:43, Hequn Cheng <chenghe...@gmail.com> wrote: >> > >> > Hi, >> > >> > Thank you all for the great proposal and discussion! >> > I also prefer to move on to the next step, so +1 for opening the JIRAs >> to >> > start the work. >> > We can have more detailed discussion there. Btw, we can start with JIRAs >> > which we have agreed on. >> > >> > Best, >> > Hequn >> > >> > On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <wshaox...@gmail.com> >> wrote: >> > >> >> +1. I agree that we should open the JIRAs to start the work. We may >> >> have better ideas on the flavor of the interface when implement/review >> >> the code. >> >> >> >> Regards, >> >> shaoxuan >> >> >> >> >> >> On 11/20/18, jincheng sun <sunjincheng...@gmail.com> wrote: >> >>> Hi all, >> >>> >> >>> Thanks all for the feedback. >> >>> >> >>> @Piotr About not using abbreviations naming, +1,I like >> >>> your proposal!Currently both DataSet and DataStream API are using >> >>> `aggregate`, >> >>> BTW,I find other language also not using abbreviations naming,such as >> R. >> >>> >> >>> Sometimes the interface of the API is really difficult to perfect, we >> >> need >> >>> to spend a lot of time thinking and feedback from a large number of >> >> users, >> >>> and constantly improve, but for backward compatibility issues, we >> have to >> >>> adopt the most conservative approach when designing the API(Of >> course, I >> >> am >> >>> more in favor of developing more rich features, when we discuss >> clearly). >> >>> Therefore, I propose to divide the function implementation of >> >>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that >> >>> support time attributes and groupKeys. We can develop the features >> which >> >>> we have already agreed on the design. And we will continue to discuss >> >> the >> >>> uncertain design. >> >>> >> >>> In fact, in addition to the design of APIs, there will be various >> >>> performance optimization details, such as: table Aggregate function >> >>> emitValue will generate multiple calculation results, in extreme >> cases, >> >>> each record will trigger a large number of retract messages, this will >> >> have >> >>> poor performance,so we will also optimize the interface design, such >> as >> >>> adding the emitWithRetractValue interface (I have updated the google >> doc) >> >>> to allow the user to optionally perform incremental calculations, thus >> >>> avoiding a large number of retracts. Details like this are difficult >> to >> >>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP >> first, >> >>> we develop designs that have been agreed upon and continue to discuss >> >>> non-deterministic designs! What do you think? @Fabian & Piotr & >> XiaoWei >> >>> >> >>> Best, >> >>> Jincheng >> >>> >> >>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一 上午12:07写道: >> >>> >> >>>> Hi Fabian & Piotr, thanks for the feedback! >> >>>> >> >>>> I appreciate your concerns, both on timestamp attributes as well as >> on >> >>>> implicit group keys. At the same time, I'm also concerned with the >> >>>> proposed >> >>>> approach of allowing Expression* as parameters, especially for >> >>>> flatMap/flatAgg. So far, we never allowed a scalar expression to >> appear >> >>>> together with table expressions. With the Expression* approach, this >> >> will >> >>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on >> if >> >>>> we >> >>>> fully understand the consequences when we try to extend our system in >> >> the >> >>>> future. I would be extra cautious in doing this. To avoid this, I >> think >> >>>> an >> >>>> implicit group key for flatAgg is safer. For flatMap, if users want >> to >> >>>> keep >> >>>> the rowtime column, he can use crossApply/join instead. So we are not >> >>>> losing any real functionality here. >> >>>> >> >>>> Also a clarification on the following example: >> >>>> tab.window(Tumble ... as 'w) >> >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >>>> If we did not have the select clause in this example, we will have >> 'w as >> >>>> a >> >>>> regular column in the output. It should not magically disappear. >> >>>> >> >>>> The concern is not as strong for Table.map/Table.agg because we are >> not >> >>>> mixing scalar and table expressions. But we also want to be a bit >> >>>> consistent with these methods. If we used implicit group keys for >> >>>> Table.flatAgg, we probably should do the same for Table.agg. Now we >> only >> >>>> have to choose what to do with Table.map. I can see good arguments >> from >> >>>> both sides. But starting with a single Expression seems safer because >> >>>> that >> >>>> we can always extend to Expression* in the future. >> >>>> >> >>>> While thinking about this problem, it appears that we may need more >> work >> >>>> in >> >>>> our handling of watermarks for SQL/Table API. Our current way of >> >>>> propagating the watermarks from source all the way to sink might not >> be >> >>>> optimal. For example, after a tumbling window, the watermark can >> >> actually >> >>>> be advanced to just before the expiring of next window. I think that >> in >> >>>> general, each operator may need to generate new watermarks instead of >> >>>> simply propagating them. Once we accept that watermarks may change >> >> during >> >>>> the execution, it appears that the timestamp columns may also >> change, as >> >>>> long as we have some way to associate watermark with it. My >> intuition is >> >>>> that once we have a through solution for the watermark issue, we may >> be >> >>>> able to solve the problem we encountered for Table.map in a cleaner >> way. >> >>>> But this is a complex issue which deserves a discussion on its own. >> >>>> >> >>>> Regards, >> >>>> Xiaowei >> >>>> >> >>>> >> >>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski < >> >> pi...@data-artisans.com> >> >>>> wrote: >> >>>> >> >>>>> Hi, >> >>>>> >> >>>>> Isn’t the problem of multiple expressions limited only to `flat***` >> >>>>> functions and to be more specific only to having two (or more) >> >>>>> different >> >>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a), >> >>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined >> >>>>> (duplicate result of every scalar function to every record. Or am I >> >>>> missing >> >>>>> something? >> >>>>> >> >>>>> Another remark, I would be in favour of not using abbreviations and >> >>>> naming >> >>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. >> >>>>> >> >>>>> Piotrek >> >>>>> >> >>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com> wrote: >> >>>>>> >> >>>>>> Hi Jincheng, >> >>>>>> >> >>>>>> I said before, that I think that the append() method is better than >> >>>>>> implicitly forwarding keys, but still, I believe it adds >> unnecessary >> >>>>> boiler >> >>>>>> plate code. >> >>>>>> >> >>>>>> Moreover, I haven't seen a convincing argument why map(Expression*) >> >>>>>> is >> >>>>>> worse than map(Expression). In either case we need to do all kinds >> >> of >> >>>>>> checks to prevent invalid use of functions. >> >>>>>> If the method is not correctly used, we can emit a good error >> >> message >> >>>> and >> >>>>>> documenting map(Expression*) will be easier than >> >>>>> map(append(Expression*)), >> >>>>>> in my opinion. >> >>>>>> I think we should not add unnessary syntax unless there is a good >> >>>> reason >> >>>>>> and to be honest, I haven't seen this reason yet. >> >>>>>> >> >>>>>> Regarding the groupBy.agg() method, I think it should behave just >> >>>>>> like >> >>>>> any >> >>>>>> other method, i.e., not do any implicit forwarding. >> >>>>>> Let's take the example of the windowed group by, that you posted >> >>>> before. >> >>>>>> >> >>>>>> tab.window(Tumble ... as 'w) >> >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> >>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >>>>>> >> >>>>>> What happens if 'w.rowtime is not selected? What is the data type >> of >> >>>> the >> >>>>>> field 'w in the resulting Table? Is it a regular field at all or >> >> just >> >>>>>> a >> >>>>>> system field that disappears if it is not selected? >> >>>>>> >> >>>>>> IMO, the following syntax is shorter, more explicit, and better >> >>>>>> aligned >> >>>>>> with the regular window.groupBy.select aggregations that are >> >>>>>> supported >> >>>>>> today. >> >>>>>> >> >>>>>> tab.window(Tumble ... as 'w) >> >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) >> >>>>>> >> >>>>>> >> >>>>>> Best, Fabian >> >>>>>> >> >>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < >> >>>>>> sunjincheng...@gmail.com>: >> >>>>>> >> >>>>>>> Hi Fabian/Xiaowei, >> >>>>>>> >> >>>>>>> I am very sorry for my late reply! Glad to see your reply, and >> >>>>>>> sounds >> >>>>>>> pretty good! >> >>>>>>> I agree that the approach with append() which can clearly defined >> >>>>>>> the >> >>>>>>> result schema is better which Fabian mentioned. >> >>>>>>> In addition and append() and also contains non-time attributes, >> >>>>>>> e.g.: >> >>>>>>> >> >>>>>>> tab('name, 'age, 'address, 'rowtime) >> >>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >> >>>>>>> 'address, 'rowtime) >> >>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w) >> >>>>>>> .groupBy('w, 'address) >> >>>>>>> >> >>>>>>> In this way the append() is very useful, and the behavior is very >> >>>>> similar >> >>>>>>> to withForwardedFields() in DataSet. >> >>>>>>> So +1 to using append() approach for the map()&flatmap()! >> >>>>>>> >> >>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree >> >>>>>>> Xiaowei's approach that define the keys to be implied in the >> result >> >>>>> table >> >>>>>>> and appears at the beginning, for example as follows: >> >>>>>>> tab.window(Tumble ... as 'w) >> >>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> >>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >>>>>>> >> >>>>>>> What to you think? @Fabian @Xiaowei >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Jincheng >> >>>>>>> >> >>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道: >> >>>>>>> >> >>>>>>>> Hi Jincheng, >> >>>>>>>> >> >>>>>>>> Thanks for the summary! >> >>>>>>>> I like the approach with append() better than the implicit >> >>>>>>>> forwarding >> >>>>> as >> >>>>>>> it >> >>>>>>>> clearly indicates which fields are forwarded. >> >>>>>>>> However, I don't see much benefit over the flatMap(Expression*) >> >>>>> variant, >> >>>>>>> as >> >>>>>>>> we would still need to analyze the full expression tree to ensure >> >>>> that >> >>>>> at >> >>>>>>>> most (or exactly?) one Scalar / TableFunction is used. >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> Fabian >> >>>>>>>> >> >>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >> >>>>>>>> sunjincheng...@gmail.com>: >> >>>>>>>> >> >>>>>>>>> Hi all, >> >>>>>>>>> >> >>>>>>>>> We are discussing very detailed content about this proposal. We >> >>>>>>>>> are >> >>>>>>>> trying >> >>>>>>>>> to design the API in many aspects (functionality, compatibility, >> >>>> ease >> >>>>>>> of >> >>>>>>>>> use, etc.). I think this is a very good process. Only such a >> >>>> detailed >> >>>>>>>>> discussion, In order to develop PR more clearly and smoothly in >> >>>>>>>>> the >> >>>>>>> later >> >>>>>>>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a >> >>>>>>>>> lot >> >>>>> of >> >>>>>>>>> good ideas. >> >>>>>>>>> About the definition of method signatures I want to share my >> >>>>>>>>> points >> >>>>>>> here >> >>>>>>>>> which I am discussing with fabian in google doc (not yet >> >>>>>>>>> completed), >> >>>>> as >> >>>>>>>>> follows: >> >>>>>>>>> >> >>>>>>>>> Assume we have a table: >> >>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, >> >> 'string, >> >>>>>>>>> 'proctime.proctime) >> >>>>>>>>> >> >>>>>>>>> Approach 1: >> >>>>>>>>> case1: Map follows Source Table >> >>>>>>>>> val result = >> >>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime >> >>>> implied >> >>>>>>> in >> >>>>>>>>> the output >> >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >> >>>>>>>>> >> >>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above) >> >>>>>>>>> val result = >> >>>>>>>>> tab.window(Tumble ... as 'w) >> >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >> >>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >>>>>>>>> >> >>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result >> schema >> >>>>> would >> >>>>>>>> be >> >>>>>>>>> clearly defined, but add a built-in append UDF. That make >> >>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression. >> >>>>>>>>> val result = >> >>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, >> >>>>>>>>> 'col2, >> >>>>>>>>> 'long, 'proctime) >> >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >> >>>>>>>>> >> >>>>>>>>> Note: Append is a special UDF for built-in that can pass through >> >>>>>>>>> any >> >>>>>>>>> column. >> >>>>>>>>> >> >>>>>>>>> So, May be we can defined the as table.map(Expression) first, >> >> If >> >>>>>>>>> necessary, we can extend to table.map(Expression*) in the >> future >> >>>>>>>>> ? >> >>>>> Of >> >>>>>>>>> course, I also hope that we can do more perfection in this >> >>>>>>>>> proposal >> >>>>>>>> through >> >>>>>>>>> discussion. >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> Jincheng >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道: >> >>>>>>>>> >> >>>>>>>>>> Hi Fabian, >> >>>>>>>>>> >> >>>>>>>>>> I think that the key question you raised is if we allow extra >> >>>>>>>> parameters >> >>>>>>>>> in >> >>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing >> that >> >>>> may >> >>>>>>>>> appear >> >>>>>>>>>> more convenient in some cases. However, it might also cause >> some >> >>>>>>>>> confusions >> >>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these >> >>>>>>>>> expressions? >> >>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does >> >>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? >> >>>>>>>>>> Even >> >>>>>>>> though >> >>>>>>>>>> not allowing it may appear less powerful, but it can make >> things >> >>>> more >> >>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the >> >> keys >> >>>> to >> >>>>>>> be >> >>>>>>>>>> implied in the result table and appears at the beginning. You >> >> can >> >>>>>>> use a >> >>>>>>>>>> select method if you want to modify this behavior. I think that >> >>>>>>>>> eventually >> >>>>>>>>>> we will have some API which allows other expressions as >> >>>>>>>>>> additional >> >>>>>>>>>> parameters, but I think it's better to do that after we >> >> introduce >> >>>> the >> >>>>>>>>>> concept of nested tables. A lot of things we suggested here can >> >>>>>>>>>> be >> >>>>>>>>>> considered as special cases of that. But things are much >> simpler >> >>>>>>>>>> if >> >>>>>>> we >> >>>>>>>>>> leave that to later. >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Xiaowei >> >>>>>>>>>> >> >>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske < >> fhue...@gmail.com >> >>> >> >>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Hi, >> >>>>>>>>>>> >> >>>>>>>>>>> * Re emit: >> >>>>>>>>>>> I think we should start with a well understood semantics of >> >> full >> >>>>>>>>>>> replacement. This is how the other agg functions work. >> >>>>>>>>>>> As was said before, there are open questions regarding an >> >> append >> >>>>>>> mode >> >>>>>>>>>>> (checkpointing, whether supporting retractions or not and if >> >> yes >> >>>>>>> how >> >>>>>>>> to >> >>>>>>>>>>> declare them, ...). >> >>>>>>>>>>> Since this seems to be an optimization, I'd postpone it. >> >>>>>>>>>>> >> >>>>>>>>>>> * Re grouping keys: >> >>>>>>>>>>> I don't think we should automatically add them because the >> >>>>>>>>>>> result >> >>>>>>>>> schema >> >>>>>>>>>>> would not be intuitive. >> >>>>>>>>>>> Would they be added at the beginning of the tuple or at the >> >> end? >> >>>>>>> What >> >>>>>>>>>>> metadata fields of windows would be added? In which order >> would >> >>>>>>> they >> >>>>>>>> be >> >>>>>>>>>>> added? >> >>>>>>>>>>> >> >>>>>>>>>>> However, we could support syntax like this: >> >>>>>>>>>>> val t: Table = ??? >> >>>>>>>>>>> t >> >>>>>>>>>>> .window(Tumble ... as 'w) >> >>>>>>>>>>> .groupBy('a, 'b) >> >>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime >> >> as >> >>>>>>>>> 'rtime) >> >>>>>>>>>>> >> >>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2, >> >>>>>>>>>>> ..., >> >>>>>>> fn, >> >>>>>>>>>> wend, >> >>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. >> >>>>>>>>>>> >> >>>>>>>>>>> * Re Multi-staged evaluation: >> >>>>>>>>>>> I think this should be an optimization that can be applied if >> >>>>>>>>>>> the >> >>>>>>> UDF >> >>>>>>>>>>> implements the merge() method. >> >>>>>>>>>>> >> >>>>>>>>>>> Best, Fabian >> >>>>>>>>>>> >> >>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >> >>>>>>>>>>> wshaox...@gmail.com >> >>>>>>>>>>>> : >> >>>>>>>>>>> >> >>>>>>>>>>>> Hi xiaowei, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Yes, I agree with you that the semantics of >> >>>>>>> TableAggregateFunction >> >>>>>>>>> emit >> >>>>>>>>>>> is >> >>>>>>>>>>>> much more complex than AggregateFunction. The fundamental >> >>>>>>>> difference >> >>>>>>>>> is >> >>>>>>>>>>>> that TableAggregateFunction emits a "table" while >> >>>>>>> AggregateFunction >> >>>>>>>>>>> outputs >> >>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it >> >> only >> >>>>>>> has >> >>>>>>>>> one >> >>>>>>>>>>>> mode which is “replacing” (complete update). But for >> >>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit >> the >> >>>>>>> new >> >>>>>>>>>>> updated >> >>>>>>>>>>>> results) update or complete update (always emit the entire >> >>>>>>>>>>>> table >> >>>>>>>> when >> >>>>>>>>>>>> “emit" is triggered). From the performance perspective, we >> >>>>>>>>>>>> might >> >>>>>>>>> want >> >>>>>>>>>> to >> >>>>>>>>>>>> use incremental update. But we need review and design this >> >>>>>>>> carefully, >> >>>>>>>>>>>> especially taking into account the cases of the failover >> >>>>>>>>>>>> (instead >> >>>>>>>> of >> >>>>>>>>>> just >> >>>>>>>>>>>> back-up the ACC it may also needs to remember the emit >> offset) >> >>>>>>> and >> >>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit >> >>>>>>>>>>>> are >> >>>>>>>>>>> different >> >>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it >> does >> >>>>>>> not >> >>>>>>>>> need >> >>>>>>>>>>> to >> >>>>>>>>>>>> worry this due to the nature of stateless. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Regards, >> >>>>>>>>>>>> Shaoxuan >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang >> >>>>>>>>>>>> <xiaow...@gmail.com >> >>>>>>>> >> >>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> Hi Jincheng, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a >> >>>>>>> very >> >>>>>>>>>> good >> >>>>>>>>>>>>> start. There are a few points that we need to have more >> >>>>>>>>> discussions. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast, >> >>>>>>>>>> definitely >> >>>>>>>>>>>> the >> >>>>>>>>>>>>> most complex user defined objects we introduced so far. I >> >>>>>>>> think >> >>>>>>>>>>> there >> >>>>>>>>>>>>> are >> >>>>>>>>>>>>> quite some interesting questions here. For example, do we >> >>>>>>>> allow >> >>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the >> >>>>>>>> semantics >> >>>>>>>>> of >> >>>>>>>>>>>>> emit? Is >> >>>>>>>>>>>>> it amendments to the previous output, or replacing it? I >> >>>>>>> think >> >>>>>>>>>> that >> >>>>>>>>>>>> this >> >>>>>>>>>>>>> subject itself is worth a discussion to make sure we get >> >> the >> >>>>>>>>>> details >> >>>>>>>>>>>>> right. >> >>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically >> >>>>>>> appear >> >>>>>>>> in >> >>>>>>>>>> the >> >>>>>>>>>>>>> output? how about the case of windowing aggregation? >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>> Xiaowei >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >> >>>>>>>>>> sunjincheng...@gmail.com> >> >>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> Hi, Xiaowei, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >> >>>>>>>> Outline >> >>>>>>>>> ! >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I quickly looked at the overall content, these are good >> >>>>>>>>> expressions >> >>>>>>>>>>> of >> >>>>>>>>>>>>> our >> >>>>>>>>>>>>>> offline discussions. But from the points of my view, we >> >>>>>>> should >> >>>>>>>>> add >> >>>>>>>>>>> the >> >>>>>>>>>>>>>> usage of public interfaces that we will introduce in this >> >>>>>>>>> propose. >> >>>>>>>>>>>> So, I >> >>>>>>>>>>>>>> added the following usage description of interface and >> >>>>>>>> operators >> >>>>>>>>>> in >> >>>>>>>>>>>>>> google doc: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 1. Map Operator >> >>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator can >> >>>>>>>>>> apply a >> >>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as >> >>>>>>>>> follows: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> val res = tab >> >>>>>>>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >> >>>>>>>>>>>>>> .select(‘a, ‘c) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 2. FlatMap Operator >> >>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >> >>>>>>>> operator >> >>>>>>>>>> can >> >>>>>>>>>>>>> apply >> >>>>>>>>>>>>>> a table function, and can return multi-row. The usage as >> >>>>>>>> follows: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> val res = tab >> >>>>>>>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >> >>>>>>>>>>>>>> .select(‘a, ‘c) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 3. Agg Operator >> >>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg >> >>>>>>>>>>> operator >> >>>>>>>>>>>>> can >> >>>>>>>>>>>>>> apply a aggregate function, and can return multi-column. >> The >> >>>>>>>>> usage >> >>>>>>>>>> as >> >>>>>>>>>>>>>> follows: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> val res = tab >> >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> >>>>>>> global >> >>>>>>>>>>>>> aggregates >> >>>>>>>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >> >>>>>>>>>>>>>> .select(‘a, ‘c) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 4. FlatAgg Operator >> >>>>>>>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, >> >>>>>>>>>> FaltAgg >> >>>>>>>>>>>>>> operator can apply a table aggregate function, and can >> >> return >> >>>>>>>>>>>> multi-row. >> >>>>>>>>>>>>>> The usage as follows: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> val res = tab >> >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> >>>>>>>> global >> >>>>>>>>>>> table >> >>>>>>>>>>>>>> aggregates >> >>>>>>>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >> >>>>>>>>>>>>>> .select(‘a, ‘c) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 5. TableAggregateFunction >> >>>>>>>>>>>>>> The behavior of table aggregates is most like >> >>>>>>>>>>> GroupReduceFunction >> >>>>>>>>>>>>> did, >> >>>>>>>>>>>>>> which computed for a group of elements, and output a group >> >>>>>>> of >> >>>>>>>>>>>> elements. >> >>>>>>>>>>>>>> The TableAggregateFunction can be applied on >> >>>>>>>>>> GroupedTable.flatAgg() . >> >>>>>>>>>>>> The >> >>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content, >> so >> >>>>>>> I >> >>>>>>>>>> don't >> >>>>>>>>>>>> copy >> >>>>>>>>>>>>>> it here, Please look at the detail in google doc: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>> >> >> >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and >> >>>>>>>> commenting. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>> Jincheng >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>>> >> >>>> >> >>> >> >> >> >> >> >> -- >> >> >> >> >> ----------------------------------------------------------------------------------- >> >> >> >> *Rome was not built in one day* >> >> >> >> >> >> >> ----------------------------------------------------------------------------------- >> >> >> >>