Hi Jincheng, > #1) No,watermark solves the issue of the late event. Here, the performance > problem is caused by the update emit mode. i.e.: When current calculation > result is output, the previous calculation result needs to be retracted.
Hmm, yes I missed this. For time-windowed cases (some aggregate/flatAggregate cases) emitting only on watermark should solve the problem. For non time windowed cases it would reduce the amount of retractions, right? Or am I still missing something? > #3)I still hope to keep the simplicity that select only support projected > scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b, > flatmap('d)). table.select(F(‘a).unnest(), ‘b, G(‘c).unnest()) Could be rejected during some validation phase. On the other hand: table.select(F(‘a).unnest(), ‘b, scalarG(‘c)) or table.flatMap(F(‘a), ‘b, scalarG(‘c)) Could work and be more or less a syntax sugar for cross apply. Piotrek > On 21 Nov 2018, at 12:16, jincheng sun <sunjincheng...@gmail.com> wrote: > > Hi shaoxuan & Hequn, > > Thanks for your suggestion,I'll file the JIRAs later. > We can prepare PRs while continuing to move forward the ongoing discussion. > > Regards, > Jincheng > > jincheng sun <sunjincheng...@gmail.com> 于2018年11月21日周三 下午7:07写道: > >> Hi Piotrek, >> Thanks for your feedback, and thanks for share your thoughts! >> >> #1) No,watermark solves the issue of the late event. Here, the performance >> problem is caused by the update emit mode. i.e.: When current calculation >> result is output, the previous calculation result needs to be retracted. >> #2) As I mentioned above we should continue the discussion until we solve >> the problems raised by Xiaowei and Fabian. >> #3)I still hope to keep the simplicity that select only support projected >> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b, >> flatmap('d)). >> >> Thanks, >> Jincheng >> >> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午5:24写道: >> >>> Hi, >>> >>> 1. >>> >>>> In fact, in addition to the design of APIs, there will be various >>>> performance optimization details, such as: table Aggregate function >>>> emitValue will generate multiple calculation results, in extreme cases, >>>> each record will trigger a large number of retract messages, this will >>> have >>>> poor performance >>> >>> Can this be solved/mitigated by emitting the results only on watermarks? >>> I think that was the path that we decided to take both for Temporal Joins >>> and upsert stream conversion. I know that this increases the latency and >>> there is a place for a future global setting/user preference “emit the data >>> ASAP mode”, but emitting only on watermarks seems to me as a better/more >>> sane default. >>> >>> 2. >>> >>> With respect to the API discussion and implicit columns. The problem for >>> me so far is I’m not sure if I like the additionally complexity of >>> `append()` solution, while implicit columns are definitely not in the >>> spirit of SQL. Neither joins nor aggregations add extra unexpected columns >>> to the result without asking. This definitely can be confusing for the >>> users since it brakes the convention. Thus I would lean towards Fabian’s >>> proposal of multi-argument `map(Expression*)` from those 3 options. >>> >>> 3. >>> >>> Another topic is that I’m not 100% convinced that we should be adding new >>> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think >>> the same could be achieved by changing >>> >>> table.map(F('x)) >>> >>> into >>> >>> table.select(F('x)).unnest() >>> or >>> table.select(F('x).unnest()) >>> >>> Where `unnest()` means unnest row/tuple type into a columnar table. >>> >>> table.flatMap(F('x)) >>> >>> Could be on the other hand also handled by >>> >>> table.select(F('x)) >>> >>> By correctly deducing that F(x) is a multi row output function >>> >>> Same might apply to `aggregate(F('x))`, but this maybe could be replaced >>> by: >>> >>> table.groupBy(…).select(F('x).unnest()) >>> >>> Adding scalar functions should also be possible: >>> >>> table.groupBy('k).select(F('x).unnest(), ‘k) >>> >>> Maybe such approach would allow us to implement the same features in the >>> SQL as well? >>> >>> Piotrek >>> >>>> On 21 Nov 2018, at 09:43, Hequn Cheng <chenghe...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> Thank you all for the great proposal and discussion! >>>> I also prefer to move on to the next step, so +1 for opening the JIRAs >>> to >>>> start the work. >>>> We can have more detailed discussion there. Btw, we can start with JIRAs >>>> which we have agreed on. >>>> >>>> Best, >>>> Hequn >>>> >>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <wshaox...@gmail.com> >>> wrote: >>>> >>>>> +1. I agree that we should open the JIRAs to start the work. We may >>>>> have better ideas on the flavor of the interface when implement/review >>>>> the code. >>>>> >>>>> Regards, >>>>> shaoxuan >>>>> >>>>> >>>>> On 11/20/18, jincheng sun <sunjincheng...@gmail.com> wrote: >>>>>> Hi all, >>>>>> >>>>>> Thanks all for the feedback. >>>>>> >>>>>> @Piotr About not using abbreviations naming, +1,I like >>>>>> your proposal!Currently both DataSet and DataStream API are using >>>>>> `aggregate`, >>>>>> BTW,I find other language also not using abbreviations naming,such as >>> R. >>>>>> >>>>>> Sometimes the interface of the API is really difficult to perfect, we >>>>> need >>>>>> to spend a lot of time thinking and feedback from a large number of >>>>> users, >>>>>> and constantly improve, but for backward compatibility issues, we >>> have to >>>>>> adopt the most conservative approach when designing the API(Of >>> course, I >>>>> am >>>>>> more in favor of developing more rich features, when we discuss >>> clearly). >>>>>> Therefore, I propose to divide the function implementation of >>>>>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that >>>>>> support time attributes and groupKeys. We can develop the features >>> which >>>>>> we have already agreed on the design. And we will continue to discuss >>>>> the >>>>>> uncertain design. >>>>>> >>>>>> In fact, in addition to the design of APIs, there will be various >>>>>> performance optimization details, such as: table Aggregate function >>>>>> emitValue will generate multiple calculation results, in extreme >>> cases, >>>>>> each record will trigger a large number of retract messages, this will >>>>> have >>>>>> poor performance,so we will also optimize the interface design, such >>> as >>>>>> adding the emitWithRetractValue interface (I have updated the google >>> doc) >>>>>> to allow the user to optionally perform incremental calculations, thus >>>>>> avoiding a large number of retracts. Details like this are difficult >>> to >>>>>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP >>> first, >>>>>> we develop designs that have been agreed upon and continue to discuss >>>>>> non-deterministic designs! What do you think? @Fabian & Piotr & >>> XiaoWei >>>>>> >>>>>> Best, >>>>>> Jincheng >>>>>> >>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一 上午12:07写道: >>>>>> >>>>>>> Hi Fabian & Piotr, thanks for the feedback! >>>>>>> >>>>>>> I appreciate your concerns, both on timestamp attributes as well as >>> on >>>>>>> implicit group keys. At the same time, I'm also concerned with the >>>>>>> proposed >>>>>>> approach of allowing Expression* as parameters, especially for >>>>>>> flatMap/flatAgg. So far, we never allowed a scalar expression to >>> appear >>>>>>> together with table expressions. With the Expression* approach, this >>>>> will >>>>>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on >>> if >>>>>>> we >>>>>>> fully understand the consequences when we try to extend our system in >>>>> the >>>>>>> future. I would be extra cautious in doing this. To avoid this, I >>> think >>>>>>> an >>>>>>> implicit group key for flatAgg is safer. For flatMap, if users want >>> to >>>>>>> keep >>>>>>> the rowtime column, he can use crossApply/join instead. So we are not >>>>>>> losing any real functionality here. >>>>>>> >>>>>>> Also a clarification on the following example: >>>>>>> tab.window(Tumble ... as 'w) >>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>> If we did not have the select clause in this example, we will have >>> 'w as >>>>>>> a >>>>>>> regular column in the output. It should not magically disappear. >>>>>>> >>>>>>> The concern is not as strong for Table.map/Table.agg because we are >>> not >>>>>>> mixing scalar and table expressions. But we also want to be a bit >>>>>>> consistent with these methods. If we used implicit group keys for >>>>>>> Table.flatAgg, we probably should do the same for Table.agg. Now we >>> only >>>>>>> have to choose what to do with Table.map. I can see good arguments >>> from >>>>>>> both sides. But starting with a single Expression seems safer because >>>>>>> that >>>>>>> we can always extend to Expression* in the future. >>>>>>> >>>>>>> While thinking about this problem, it appears that we may need more >>> work >>>>>>> in >>>>>>> our handling of watermarks for SQL/Table API. Our current way of >>>>>>> propagating the watermarks from source all the way to sink might not >>> be >>>>>>> optimal. For example, after a tumbling window, the watermark can >>>>> actually >>>>>>> be advanced to just before the expiring of next window. I think that >>> in >>>>>>> general, each operator may need to generate new watermarks instead of >>>>>>> simply propagating them. Once we accept that watermarks may change >>>>> during >>>>>>> the execution, it appears that the timestamp columns may also >>> change, as >>>>>>> long as we have some way to associate watermark with it. My >>> intuition is >>>>>>> that once we have a through solution for the watermark issue, we may >>> be >>>>>>> able to solve the problem we encountered for Table.map in a cleaner >>> way. >>>>>>> But this is a complex issue which deserves a discussion on its own. >>>>>>> >>>>>>> Regards, >>>>>>> Xiaowei >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski < >>>>> pi...@data-artisans.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Isn’t the problem of multiple expressions limited only to `flat***` >>>>>>>> functions and to be more specific only to having two (or more) >>>>>>>> different >>>>>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a), >>>>>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined >>>>>>>> (duplicate result of every scalar function to every record. Or am I >>>>>>> missing >>>>>>>> something? >>>>>>>> >>>>>>>> Another remark, I would be in favour of not using abbreviations and >>>>>>> naming >>>>>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Hi Jincheng, >>>>>>>>> >>>>>>>>> I said before, that I think that the append() method is better than >>>>>>>>> implicitly forwarding keys, but still, I believe it adds >>> unnecessary >>>>>>>> boiler >>>>>>>>> plate code. >>>>>>>>> >>>>>>>>> Moreover, I haven't seen a convincing argument why map(Expression*) >>>>>>>>> is >>>>>>>>> worse than map(Expression). In either case we need to do all kinds >>>>> of >>>>>>>>> checks to prevent invalid use of functions. >>>>>>>>> If the method is not correctly used, we can emit a good error >>>>> message >>>>>>> and >>>>>>>>> documenting map(Expression*) will be easier than >>>>>>>> map(append(Expression*)), >>>>>>>>> in my opinion. >>>>>>>>> I think we should not add unnessary syntax unless there is a good >>>>>>> reason >>>>>>>>> and to be honest, I haven't seen this reason yet. >>>>>>>>> >>>>>>>>> Regarding the groupBy.agg() method, I think it should behave just >>>>>>>>> like >>>>>>>> any >>>>>>>>> other method, i.e., not do any implicit forwarding. >>>>>>>>> Let's take the example of the windowed group by, that you posted >>>>>>> before. >>>>>>>>> >>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>>>> >>>>>>>>> What happens if 'w.rowtime is not selected? What is the data type >>> of >>>>>>> the >>>>>>>>> field 'w in the resulting Table? Is it a regular field at all or >>>>> just >>>>>>>>> a >>>>>>>>> system field that disappears if it is not selected? >>>>>>>>> >>>>>>>>> IMO, the following syntax is shorter, more explicit, and better >>>>>>>>> aligned >>>>>>>>> with the regular window.groupBy.select aggregations that are >>>>>>>>> supported >>>>>>>>> today. >>>>>>>>> >>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < >>>>>>>>> sunjincheng...@gmail.com>: >>>>>>>>> >>>>>>>>>> Hi Fabian/Xiaowei, >>>>>>>>>> >>>>>>>>>> I am very sorry for my late reply! Glad to see your reply, and >>>>>>>>>> sounds >>>>>>>>>> pretty good! >>>>>>>>>> I agree that the approach with append() which can clearly defined >>>>>>>>>> the >>>>>>>>>> result schema is better which Fabian mentioned. >>>>>>>>>> In addition and append() and also contains non-time attributes, >>>>>>>>>> e.g.: >>>>>>>>>> >>>>>>>>>> tab('name, 'age, 'address, 'rowtime) >>>>>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >>>>>>>>>> 'address, 'rowtime) >>>>>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w) >>>>>>>>>> .groupBy('w, 'address) >>>>>>>>>> >>>>>>>>>> In this way the append() is very useful, and the behavior is very >>>>>>>> similar >>>>>>>>>> to withForwardedFields() in DataSet. >>>>>>>>>> So +1 to using append() approach for the map()&flatmap()! >>>>>>>>>> >>>>>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree >>>>>>>>>> Xiaowei's approach that define the keys to be implied in the >>> result >>>>>>>> table >>>>>>>>>> and appears at the beginning, for example as follows: >>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>>>>> >>>>>>>>>> What to you think? @Fabian @Xiaowei >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Jincheng >>>>>>>>>> >>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道: >>>>>>>>>> >>>>>>>>>>> Hi Jincheng, >>>>>>>>>>> >>>>>>>>>>> Thanks for the summary! >>>>>>>>>>> I like the approach with append() better than the implicit >>>>>>>>>>> forwarding >>>>>>>> as >>>>>>>>>> it >>>>>>>>>>> clearly indicates which fields are forwarded. >>>>>>>>>>> However, I don't see much benefit over the flatMap(Expression*) >>>>>>>> variant, >>>>>>>>>> as >>>>>>>>>>> we would still need to analyze the full expression tree to ensure >>>>>>> that >>>>>>>> at >>>>>>>>>>> most (or exactly?) one Scalar / TableFunction is used. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Fabian >>>>>>>>>>> >>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >>>>>>>>>>> sunjincheng...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> Hi all, >>>>>>>>>>>> >>>>>>>>>>>> We are discussing very detailed content about this proposal. We >>>>>>>>>>>> are >>>>>>>>>>> trying >>>>>>>>>>>> to design the API in many aspects (functionality, compatibility, >>>>>>> ease >>>>>>>>>> of >>>>>>>>>>>> use, etc.). I think this is a very good process. Only such a >>>>>>> detailed >>>>>>>>>>>> discussion, In order to develop PR more clearly and smoothly in >>>>>>>>>>>> the >>>>>>>>>> later >>>>>>>>>>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a >>>>>>>>>>>> lot >>>>>>>> of >>>>>>>>>>>> good ideas. >>>>>>>>>>>> About the definition of method signatures I want to share my >>>>>>>>>>>> points >>>>>>>>>> here >>>>>>>>>>>> which I am discussing with fabian in google doc (not yet >>>>>>>>>>>> completed), >>>>>>>> as >>>>>>>>>>>> follows: >>>>>>>>>>>> >>>>>>>>>>>> Assume we have a table: >>>>>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, >>>>> 'string, >>>>>>>>>>>> 'proctime.proctime) >>>>>>>>>>>> >>>>>>>>>>>> Approach 1: >>>>>>>>>>>> case1: Map follows Source Table >>>>>>>>>>>> val result = >>>>>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime >>>>>>> implied >>>>>>>>>> in >>>>>>>>>>>> the output >>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>>>>>>>>>> >>>>>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above) >>>>>>>>>>>> val result = >>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>>>>>>> >>>>>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result >>> schema >>>>>>>> would >>>>>>>>>>> be >>>>>>>>>>>> clearly defined, but add a built-in append UDF. That make >>>>>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression. >>>>>>>>>>>> val result = >>>>>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, >>>>>>>>>>>> 'col2, >>>>>>>>>>>> 'long, 'proctime) >>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>>>>>>>>>> >>>>>>>>>>>> Note: Append is a special UDF for built-in that can pass through >>>>>>>>>>>> any >>>>>>>>>>>> column. >>>>>>>>>>>> >>>>>>>>>>>> So, May be we can defined the as table.map(Expression) first, >>>>> If >>>>>>>>>>>> necessary, we can extend to table.map(Expression*) in the >>> future >>>>>>>>>>>> ? >>>>>>>> Of >>>>>>>>>>>> course, I also hope that we can do more perfection in this >>>>>>>>>>>> proposal >>>>>>>>>>> through >>>>>>>>>>>> discussion. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Jincheng >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Fabian, >>>>>>>>>>>>> >>>>>>>>>>>>> I think that the key question you raised is if we allow extra >>>>>>>>>>> parameters >>>>>>>>>>>> in >>>>>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing >>> that >>>>>>> may >>>>>>>>>>>> appear >>>>>>>>>>>>> more convenient in some cases. However, it might also cause >>> some >>>>>>>>>>>> confusions >>>>>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these >>>>>>>>>>>> expressions? >>>>>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does >>>>>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? >>>>>>>>>>>>> Even >>>>>>>>>>> though >>>>>>>>>>>>> not allowing it may appear less powerful, but it can make >>> things >>>>>>> more >>>>>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the >>>>> keys >>>>>>> to >>>>>>>>>> be >>>>>>>>>>>>> implied in the result table and appears at the beginning. You >>>>> can >>>>>>>>>> use a >>>>>>>>>>>>> select method if you want to modify this behavior. I think that >>>>>>>>>>>> eventually >>>>>>>>>>>>> we will have some API which allows other expressions as >>>>>>>>>>>>> additional >>>>>>>>>>>>> parameters, but I think it's better to do that after we >>>>> introduce >>>>>>> the >>>>>>>>>>>>> concept of nested tables. A lot of things we suggested here can >>>>>>>>>>>>> be >>>>>>>>>>>>> considered as special cases of that. But things are much >>> simpler >>>>>>>>>>>>> if >>>>>>>>>> we >>>>>>>>>>>>> leave that to later. >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Xiaowei >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske < >>> fhue...@gmail.com >>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> * Re emit: >>>>>>>>>>>>>> I think we should start with a well understood semantics of >>>>> full >>>>>>>>>>>>>> replacement. This is how the other agg functions work. >>>>>>>>>>>>>> As was said before, there are open questions regarding an >>>>> append >>>>>>>>>> mode >>>>>>>>>>>>>> (checkpointing, whether supporting retractions or not and if >>>>> yes >>>>>>>>>> how >>>>>>>>>>> to >>>>>>>>>>>>>> declare them, ...). >>>>>>>>>>>>>> Since this seems to be an optimization, I'd postpone it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> * Re grouping keys: >>>>>>>>>>>>>> I don't think we should automatically add them because the >>>>>>>>>>>>>> result >>>>>>>>>>>> schema >>>>>>>>>>>>>> would not be intuitive. >>>>>>>>>>>>>> Would they be added at the beginning of the tuple or at the >>>>> end? >>>>>>>>>> What >>>>>>>>>>>>>> metadata fields of windows would be added? In which order >>> would >>>>>>>>>> they >>>>>>>>>>> be >>>>>>>>>>>>>> added? >>>>>>>>>>>>>> >>>>>>>>>>>>>> However, we could support syntax like this: >>>>>>>>>>>>>> val t: Table = ??? >>>>>>>>>>>>>> t >>>>>>>>>>>>>> .window(Tumble ... as 'w) >>>>>>>>>>>>>> .groupBy('a, 'b) >>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime >>>>> as >>>>>>>>>>>> 'rtime) >>>>>>>>>>>>>> >>>>>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2, >>>>>>>>>>>>>> ..., >>>>>>>>>> fn, >>>>>>>>>>>>> wend, >>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. >>>>>>>>>>>>>> >>>>>>>>>>>>>> * Re Multi-staged evaluation: >>>>>>>>>>>>>> I think this should be an optimization that can be applied if >>>>>>>>>>>>>> the >>>>>>>>>> UDF >>>>>>>>>>>>>> implements the merge() method. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>> >>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >>>>>>>>>>>>>> wshaox...@gmail.com >>>>>>>>>>>>>>> : >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi xiaowei, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Yes, I agree with you that the semantics of >>>>>>>>>> TableAggregateFunction >>>>>>>>>>>> emit >>>>>>>>>>>>>> is >>>>>>>>>>>>>>> much more complex than AggregateFunction. The fundamental >>>>>>>>>>> difference >>>>>>>>>>>> is >>>>>>>>>>>>>>> that TableAggregateFunction emits a "table" while >>>>>>>>>> AggregateFunction >>>>>>>>>>>>>> outputs >>>>>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it >>>>> only >>>>>>>>>> has >>>>>>>>>>>> one >>>>>>>>>>>>>>> mode which is “replacing” (complete update). But for >>>>>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit >>> the >>>>>>>>>> new >>>>>>>>>>>>>> updated >>>>>>>>>>>>>>> results) update or complete update (always emit the entire >>>>>>>>>>>>>>> table >>>>>>>>>>> when >>>>>>>>>>>>>>> “emit" is triggered). From the performance perspective, we >>>>>>>>>>>>>>> might >>>>>>>>>>>> want >>>>>>>>>>>>> to >>>>>>>>>>>>>>> use incremental update. But we need review and design this >>>>>>>>>>> carefully, >>>>>>>>>>>>>>> especially taking into account the cases of the failover >>>>>>>>>>>>>>> (instead >>>>>>>>>>> of >>>>>>>>>>>>> just >>>>>>>>>>>>>>> back-up the ACC it may also needs to remember the emit >>> offset) >>>>>>>>>> and >>>>>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit >>>>>>>>>>>>>>> are >>>>>>>>>>>>>> different >>>>>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it >>> does >>>>>>>>>> not >>>>>>>>>>>> need >>>>>>>>>>>>>> to >>>>>>>>>>>>>>> worry this due to the nature of stateless. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> Shaoxuan >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang >>>>>>>>>>>>>>> <xiaow...@gmail.com >>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a >>>>>>>>>> very >>>>>>>>>>>>> good >>>>>>>>>>>>>>>> start. There are a few points that we need to have more >>>>>>>>>>>> discussions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast, >>>>>>>>>>>>> definitely >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> most complex user defined objects we introduced so far. I >>>>>>>>>>> think >>>>>>>>>>>>>> there >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> quite some interesting questions here. For example, do we >>>>>>>>>>> allow >>>>>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the >>>>>>>>>>> semantics >>>>>>>>>>>> of >>>>>>>>>>>>>>>> emit? Is >>>>>>>>>>>>>>>> it amendments to the previous output, or replacing it? I >>>>>>>>>> think >>>>>>>>>>>>> that >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>> subject itself is worth a discussion to make sure we get >>>>> the >>>>>>>>>>>>> details >>>>>>>>>>>>>>>> right. >>>>>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically >>>>>>>>>> appear >>>>>>>>>>> in >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> output? how about the case of windowing aggregation? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>> Xiaowei >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >>>>>>>>>>>>> sunjincheng...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, Xiaowei, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >>>>>>>>>>> Outline >>>>>>>>>>>> ! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I quickly looked at the overall content, these are good >>>>>>>>>>>> expressions >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> our >>>>>>>>>>>>>>>>> offline discussions. But from the points of my view, we >>>>>>>>>> should >>>>>>>>>>>> add >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> usage of public interfaces that we will introduce in this >>>>>>>>>>>> propose. >>>>>>>>>>>>>>> So, I >>>>>>>>>>>>>>>>> added the following usage description of interface and >>>>>>>>>>> operators >>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> google doc: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. Map Operator >>>>>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator can >>>>>>>>>>>>> apply a >>>>>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as >>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. FlatMap Operator >>>>>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >>>>>>>>>>> operator >>>>>>>>>>>>> can >>>>>>>>>>>>>>>> apply >>>>>>>>>>>>>>>>> a table function, and can return multi-row. The usage as >>>>>>>>>>> follows: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. Agg Operator >>>>>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg >>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>> apply a aggregate function, and can return multi-column. >>> The >>>>>>>>>>>> usage >>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >>>>>>>>>> global >>>>>>>>>>>>>>>> aggregates >>>>>>>>>>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 4. FlatAgg Operator >>>>>>>>>>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, >>>>>>>>>>>>> FaltAgg >>>>>>>>>>>>>>>>> operator can apply a table aggregate function, and can >>>>> return >>>>>>>>>>>>>>> multi-row. >>>>>>>>>>>>>>>>> The usage as follows: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >>>>>>>>>>> global >>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>> aggregates >>>>>>>>>>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 5. TableAggregateFunction >>>>>>>>>>>>>>>>> The behavior of table aggregates is most like >>>>>>>>>>>>>> GroupReduceFunction >>>>>>>>>>>>>>>> did, >>>>>>>>>>>>>>>>> which computed for a group of elements, and output a group >>>>>>>>>> of >>>>>>>>>>>>>>> elements. >>>>>>>>>>>>>>>>> The TableAggregateFunction can be applied on >>>>>>>>>>>>> GroupedTable.flatAgg() . >>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content, >>> so >>>>>>>>>> I >>>>>>>>>>>>> don't >>>>>>>>>>>>>>> copy >>>>>>>>>>>>>>>>> it here, Please look at the detail in google doc: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and >>>>>>>>>>> commenting. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> >>> ----------------------------------------------------------------------------------- >>>>> >>>>> *Rome was not built in one day* >>>>> >>>>> >>>>> >>> ----------------------------------------------------------------------------------- >>>>> >>> >>>