Hi Jincheng, #1) ok, got it.
#3) > From points of my view I we can using > `Expression`, and after the discussion decided to use Expression*, then > improve it. In any case, we can use Expression, and there is an opportunity > to become Expression* (compatibility). If we use Expression* directly, it > is difficult for us to become Expression, which will break the > compatibility between versions. What do you think? I don’t think that’s the case here. If we start with single param `flatMap(Expression)`, it will need implicit columns to be present in the result, which: a) IMO it brakes SQL convention (that’s why I’m against this) b) we can not later easily introduce `flatMap(Expression*)` without those implicit columns, without braking the compatibility or at least without making `flatMap(Expression*)` and `flatMap(Expression)` terribly inconsistent. To elaborate on (a). It’s not nice if our own API is inconsistent and it sometimes behaves one way and sometimes another way: table.groupBy(‘k).select(scalarAggregateFunction(‘v)) => single column result, just the output of `scalarAggregateFunction` vs table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v)) => both result of `tableAggregateFunction` plus key (and an optional window context ?) Thus I think we have to now decide which way we want to jump, since later will be too late. Or again, am I missing something? :) Piotrek > On 22 Nov 2018, at 02:07, jincheng sun <sunjincheng...@gmail.com> wrote: > > Hi Piotrek, > #1)We have unbounded and bounded group window aggregate, for unbounded case > we should early fire the result with retract message, we can not using > watermark, because unbounded aggregate never finished. (for improvement we > can introduce micro-batch in feature), for bounded window we never support > early fire, so we do not need retract. > #3) About validation of `table.select(F(‘a).unnest(), ‘b, > G(‘c).unnest())/table.flatMap(F(‘a), ‘b, scalarG(‘c))` Fabian had mentioned > above, please look at the prior mail. For `table.flatMap(F(‘a), ‘b, > scalarG(‘c))` that we concerned, i.e.: we should discuss the issue of > `Expression*` vs `Expression`. From points of my view I we can using > `Expression`, and after the discussion decided to use Expression*, then > improve it. In any case, we can use Expression, and there is an opportunity > to become Expression* (compatibility). If we use Expression* directly, it > is difficult for us to become Expression, which will break the > compatibility between versions. What do you think? > > If there anything not clearly, welcome any feedback!Agains,thanks for share > your thoughts! > > Thanks, > Jincheng > > Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午9:37写道: > >> Hi Jincheng, >> >>> #1) No,watermark solves the issue of the late event. Here, the >> performance >>> problem is caused by the update emit mode. i.e.: When current calculation >>> result is output, the previous calculation result needs to be retracted. >> >> Hmm, yes I missed this. For time-windowed cases (some >> aggregate/flatAggregate cases) emitting only on watermark should solve the >> problem. For non time windowed cases it would reduce the amount of >> retractions, right? Or am I still missing something? >> >>> #3)I still hope to keep the simplicity that select only support projected >>> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b, >>> flatmap('d)). >> >> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest()) >> >> Could be rejected during some validation phase. On the other hand: >> >> table.select(F(‘a).unnest(), ‘b, scalarG(‘c)) >> or >> table.flatMap(F(‘a), ‘b, scalarG(‘c)) >> >> Could work and be more or less a syntax sugar for cross apply. >> >> Piotrek >> >>> On 21 Nov 2018, at 12:16, jincheng sun <sunjincheng...@gmail.com> wrote: >>> >>> Hi shaoxuan & Hequn, >>> >>> Thanks for your suggestion,I'll file the JIRAs later. >>> We can prepare PRs while continuing to move forward the ongoing >> discussion. >>> >>> Regards, >>> Jincheng >>> >>> jincheng sun <sunjincheng...@gmail.com> 于2018年11月21日周三 下午7:07写道: >>> >>>> Hi Piotrek, >>>> Thanks for your feedback, and thanks for share your thoughts! >>>> >>>> #1) No,watermark solves the issue of the late event. Here, the >> performance >>>> problem is caused by the update emit mode. i.e.: When current >> calculation >>>> result is output, the previous calculation result needs to be retracted. >>>> #2) As I mentioned above we should continue the discussion until we >> solve >>>> the problems raised by Xiaowei and Fabian. >>>> #3)I still hope to keep the simplicity that select only support >> projected >>>> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b, >>>> flatmap('d)). >>>> >>>> Thanks, >>>> Jincheng >>>> >>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午5:24写道: >>>> >>>>> Hi, >>>>> >>>>> 1. >>>>> >>>>>> In fact, in addition to the design of APIs, there will be various >>>>>> performance optimization details, such as: table Aggregate function >>>>>> emitValue will generate multiple calculation results, in extreme >> cases, >>>>>> each record will trigger a large number of retract messages, this will >>>>> have >>>>>> poor performance >>>>> >>>>> Can this be solved/mitigated by emitting the results only on >> watermarks? >>>>> I think that was the path that we decided to take both for Temporal >> Joins >>>>> and upsert stream conversion. I know that this increases the latency >> and >>>>> there is a place for a future global setting/user preference “emit the >> data >>>>> ASAP mode”, but emitting only on watermarks seems to me as a >> better/more >>>>> sane default. >>>>> >>>>> 2. >>>>> >>>>> With respect to the API discussion and implicit columns. The problem >> for >>>>> me so far is I’m not sure if I like the additionally complexity of >>>>> `append()` solution, while implicit columns are definitely not in the >>>>> spirit of SQL. Neither joins nor aggregations add extra unexpected >> columns >>>>> to the result without asking. This definitely can be confusing for the >>>>> users since it brakes the convention. Thus I would lean towards >> Fabian’s >>>>> proposal of multi-argument `map(Expression*)` from those 3 options. >>>>> >>>>> 3. >>>>> >>>>> Another topic is that I’m not 100% convinced that we should be adding >> new >>>>> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I >> think >>>>> the same could be achieved by changing >>>>> >>>>> table.map(F('x)) >>>>> >>>>> into >>>>> >>>>> table.select(F('x)).unnest() >>>>> or >>>>> table.select(F('x).unnest()) >>>>> >>>>> Where `unnest()` means unnest row/tuple type into a columnar table. >>>>> >>>>> table.flatMap(F('x)) >>>>> >>>>> Could be on the other hand also handled by >>>>> >>>>> table.select(F('x)) >>>>> >>>>> By correctly deducing that F(x) is a multi row output function >>>>> >>>>> Same might apply to `aggregate(F('x))`, but this maybe could be >> replaced >>>>> by: >>>>> >>>>> table.groupBy(…).select(F('x).unnest()) >>>>> >>>>> Adding scalar functions should also be possible: >>>>> >>>>> table.groupBy('k).select(F('x).unnest(), ‘k) >>>>> >>>>> Maybe such approach would allow us to implement the same features in >> the >>>>> SQL as well? >>>>> >>>>> Piotrek >>>>> >>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <chenghe...@gmail.com> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> Thank you all for the great proposal and discussion! >>>>>> I also prefer to move on to the next step, so +1 for opening the JIRAs >>>>> to >>>>>> start the work. >>>>>> We can have more detailed discussion there. Btw, we can start with >> JIRAs >>>>>> which we have agreed on. >>>>>> >>>>>> Best, >>>>>> Hequn >>>>>> >>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <wshaox...@gmail.com> >>>>> wrote: >>>>>> >>>>>>> +1. I agree that we should open the JIRAs to start the work. We may >>>>>>> have better ideas on the flavor of the interface when >> implement/review >>>>>>> the code. >>>>>>> >>>>>>> Regards, >>>>>>> shaoxuan >>>>>>> >>>>>>> >>>>>>> On 11/20/18, jincheng sun <sunjincheng...@gmail.com> wrote: >>>>>>>> Hi all, >>>>>>>> >>>>>>>> Thanks all for the feedback. >>>>>>>> >>>>>>>> @Piotr About not using abbreviations naming, +1,I like >>>>>>>> your proposal!Currently both DataSet and DataStream API are using >>>>>>>> `aggregate`, >>>>>>>> BTW,I find other language also not using abbreviations naming,such >> as >>>>> R. >>>>>>>> >>>>>>>> Sometimes the interface of the API is really difficult to perfect, >> we >>>>>>> need >>>>>>>> to spend a lot of time thinking and feedback from a large number of >>>>>>> users, >>>>>>>> and constantly improve, but for backward compatibility issues, we >>>>> have to >>>>>>>> adopt the most conservative approach when designing the API(Of >>>>> course, I >>>>>>> am >>>>>>>> more in favor of developing more rich features, when we discuss >>>>> clearly). >>>>>>>> Therefore, I propose to divide the function implementation of >>>>>>>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that >>>>>>>> support time attributes and groupKeys. We can develop the features >>>>> which >>>>>>>> we have already agreed on the design. And we will continue to >> discuss >>>>>>> the >>>>>>>> uncertain design. >>>>>>>> >>>>>>>> In fact, in addition to the design of APIs, there will be various >>>>>>>> performance optimization details, such as: table Aggregate function >>>>>>>> emitValue will generate multiple calculation results, in extreme >>>>> cases, >>>>>>>> each record will trigger a large number of retract messages, this >> will >>>>>>> have >>>>>>>> poor performance,so we will also optimize the interface design, such >>>>> as >>>>>>>> adding the emitWithRetractValue interface (I have updated the google >>>>> doc) >>>>>>>> to allow the user to optionally perform incremental calculations, >> thus >>>>>>>> avoiding a large number of retracts. Details like this are difficult >>>>> to >>>>>>>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP >>>>> first, >>>>>>>> we develop designs that have been agreed upon and continue to >> discuss >>>>>>>> non-deterministic designs! What do you think? @Fabian & Piotr & >>>>> XiaoWei >>>>>>>> >>>>>>>> Best, >>>>>>>> Jincheng >>>>>>>> >>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一 上午12:07写道: >>>>>>>> >>>>>>>>> Hi Fabian & Piotr, thanks for the feedback! >>>>>>>>> >>>>>>>>> I appreciate your concerns, both on timestamp attributes as well as >>>>> on >>>>>>>>> implicit group keys. At the same time, I'm also concerned with the >>>>>>>>> proposed >>>>>>>>> approach of allowing Expression* as parameters, especially for >>>>>>>>> flatMap/flatAgg. So far, we never allowed a scalar expression to >>>>> appear >>>>>>>>> together with table expressions. With the Expression* approach, >> this >>>>>>> will >>>>>>>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned >> on >>>>> if >>>>>>>>> we >>>>>>>>> fully understand the consequences when we try to extend our system >> in >>>>>>> the >>>>>>>>> future. I would be extra cautious in doing this. To avoid this, I >>>>> think >>>>>>>>> an >>>>>>>>> implicit group key for flatAgg is safer. For flatMap, if users want >>>>> to >>>>>>>>> keep >>>>>>>>> the rowtime column, he can use crossApply/join instead. So we are >> not >>>>>>>>> losing any real functionality here. >>>>>>>>> >>>>>>>>> Also a clarification on the following example: >>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>>>> If we did not have the select clause in this example, we will have >>>>> 'w as >>>>>>>>> a >>>>>>>>> regular column in the output. It should not magically disappear. >>>>>>>>> >>>>>>>>> The concern is not as strong for Table.map/Table.agg because we are >>>>> not >>>>>>>>> mixing scalar and table expressions. But we also want to be a bit >>>>>>>>> consistent with these methods. If we used implicit group keys for >>>>>>>>> Table.flatAgg, we probably should do the same for Table.agg. Now we >>>>> only >>>>>>>>> have to choose what to do with Table.map. I can see good arguments >>>>> from >>>>>>>>> both sides. But starting with a single Expression seems safer >> because >>>>>>>>> that >>>>>>>>> we can always extend to Expression* in the future. >>>>>>>>> >>>>>>>>> While thinking about this problem, it appears that we may need more >>>>> work >>>>>>>>> in >>>>>>>>> our handling of watermarks for SQL/Table API. Our current way of >>>>>>>>> propagating the watermarks from source all the way to sink might >> not >>>>> be >>>>>>>>> optimal. For example, after a tumbling window, the watermark can >>>>>>> actually >>>>>>>>> be advanced to just before the expiring of next window. I think >> that >>>>> in >>>>>>>>> general, each operator may need to generate new watermarks instead >> of >>>>>>>>> simply propagating them. Once we accept that watermarks may change >>>>>>> during >>>>>>>>> the execution, it appears that the timestamp columns may also >>>>> change, as >>>>>>>>> long as we have some way to associate watermark with it. My >>>>> intuition is >>>>>>>>> that once we have a through solution for the watermark issue, we >> may >>>>> be >>>>>>>>> able to solve the problem we encountered for Table.map in a cleaner >>>>> way. >>>>>>>>> But this is a complex issue which deserves a discussion on its own. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Xiaowei >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski < >>>>>>> pi...@data-artisans.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Isn’t the problem of multiple expressions limited only to >> `flat***` >>>>>>>>>> functions and to be more specific only to having two (or more) >>>>>>>>>> different >>>>>>>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a), >>>>>>>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well >> defined >>>>>>>>>> (duplicate result of every scalar function to every record. Or am >> I >>>>>>>>> missing >>>>>>>>>> something? >>>>>>>>>> >>>>>>>>>> Another remark, I would be in favour of not using abbreviations >> and >>>>>>>>> naming >>>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. >>>>>>>>>> >>>>>>>>>> Piotrek >>>>>>>>>> >>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com> >> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Jincheng, >>>>>>>>>>> >>>>>>>>>>> I said before, that I think that the append() method is better >> than >>>>>>>>>>> implicitly forwarding keys, but still, I believe it adds >>>>> unnecessary >>>>>>>>>> boiler >>>>>>>>>>> plate code. >>>>>>>>>>> >>>>>>>>>>> Moreover, I haven't seen a convincing argument why >> map(Expression*) >>>>>>>>>>> is >>>>>>>>>>> worse than map(Expression). In either case we need to do all >> kinds >>>>>>> of >>>>>>>>>>> checks to prevent invalid use of functions. >>>>>>>>>>> If the method is not correctly used, we can emit a good error >>>>>>> message >>>>>>>>> and >>>>>>>>>>> documenting map(Expression*) will be easier than >>>>>>>>>> map(append(Expression*)), >>>>>>>>>>> in my opinion. >>>>>>>>>>> I think we should not add unnessary syntax unless there is a good >>>>>>>>> reason >>>>>>>>>>> and to be honest, I haven't seen this reason yet. >>>>>>>>>>> >>>>>>>>>>> Regarding the groupBy.agg() method, I think it should behave just >>>>>>>>>>> like >>>>>>>>>> any >>>>>>>>>>> other method, i.e., not do any implicit forwarding. >>>>>>>>>>> Let's take the example of the windowed group by, that you posted >>>>>>>>> before. >>>>>>>>>>> >>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>>>>>> >>>>>>>>>>> What happens if 'w.rowtime is not selected? What is the data type >>>>> of >>>>>>>>> the >>>>>>>>>>> field 'w in the resulting Table? Is it a regular field at all or >>>>>>> just >>>>>>>>>>> a >>>>>>>>>>> system field that disappears if it is not selected? >>>>>>>>>>> >>>>>>>>>>> IMO, the following syntax is shorter, more explicit, and better >>>>>>>>>>> aligned >>>>>>>>>>> with the regular window.groupBy.select aggregations that are >>>>>>>>>>> supported >>>>>>>>>>> today. >>>>>>>>>>> >>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, Fabian >>>>>>>>>>> >>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < >>>>>>>>>>> sunjincheng...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> Hi Fabian/Xiaowei, >>>>>>>>>>>> >>>>>>>>>>>> I am very sorry for my late reply! Glad to see your reply, and >>>>>>>>>>>> sounds >>>>>>>>>>>> pretty good! >>>>>>>>>>>> I agree that the approach with append() which can clearly >> defined >>>>>>>>>>>> the >>>>>>>>>>>> result schema is better which Fabian mentioned. >>>>>>>>>>>> In addition and append() and also contains non-time attributes, >>>>>>>>>>>> e.g.: >>>>>>>>>>>> >>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime) >>>>>>>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >>>>>>>>>>>> 'address, 'rowtime) >>>>>>>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w) >>>>>>>>>>>> .groupBy('w, 'address) >>>>>>>>>>>> >>>>>>>>>>>> In this way the append() is very useful, and the behavior is >> very >>>>>>>>>> similar >>>>>>>>>>>> to withForwardedFields() in DataSet. >>>>>>>>>>>> So +1 to using append() approach for the map()&flatmap()! >>>>>>>>>>>> >>>>>>>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I >> agree >>>>>>>>>>>> Xiaowei's approach that define the keys to be implied in the >>>>> result >>>>>>>>>> table >>>>>>>>>>>> and appears at the beginning, for example as follows: >>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>>>>>>> >>>>>>>>>>>> What to you think? @Fabian @Xiaowei >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Jincheng >>>>>>>>>>>> >>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the summary! >>>>>>>>>>>>> I like the approach with append() better than the implicit >>>>>>>>>>>>> forwarding >>>>>>>>>> as >>>>>>>>>>>> it >>>>>>>>>>>>> clearly indicates which fields are forwarded. >>>>>>>>>>>>> However, I don't see much benefit over the flatMap(Expression*) >>>>>>>>>> variant, >>>>>>>>>>>> as >>>>>>>>>>>>> we would still need to analyze the full expression tree to >> ensure >>>>>>>>> that >>>>>>>>>> at >>>>>>>>>>>>> most (or exactly?) one Scalar / TableFunction is used. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Fabian >>>>>>>>>>>>> >>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >>>>>>>>>>>>> sunjincheng...@gmail.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>> >>>>>>>>>>>>>> We are discussing very detailed content about this proposal. >> We >>>>>>>>>>>>>> are >>>>>>>>>>>>> trying >>>>>>>>>>>>>> to design the API in many aspects (functionality, >> compatibility, >>>>>>>>> ease >>>>>>>>>>>> of >>>>>>>>>>>>>> use, etc.). I think this is a very good process. Only such a >>>>>>>>> detailed >>>>>>>>>>>>>> discussion, In order to develop PR more clearly and smoothly >> in >>>>>>>>>>>>>> the >>>>>>>>>>>> later >>>>>>>>>>>>>> stage. I am very grateful to @Fabian and @Xiaowei for >> sharing a >>>>>>>>>>>>>> lot >>>>>>>>>> of >>>>>>>>>>>>>> good ideas. >>>>>>>>>>>>>> About the definition of method signatures I want to share my >>>>>>>>>>>>>> points >>>>>>>>>>>> here >>>>>>>>>>>>>> which I am discussing with fabian in google doc (not yet >>>>>>>>>>>>>> completed), >>>>>>>>>> as >>>>>>>>>>>>>> follows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Assume we have a table: >>>>>>>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, >>>>>>> 'string, >>>>>>>>>>>>>> 'proctime.proctime) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Approach 1: >>>>>>>>>>>>>> case1: Map follows Source Table >>>>>>>>>>>>>> val result = >>>>>>>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime >>>>>>>>> implied >>>>>>>>>>>> in >>>>>>>>>>>>>> the output >>>>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>>>>>>>>>>>> >>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above) >>>>>>>>>>>>>> val result = >>>>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result >>>>> schema >>>>>>>>>> would >>>>>>>>>>>>> be >>>>>>>>>>>>>> clearly defined, but add a built-in append UDF. That make >>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression. >>>>>>>>>>>>>> val result = >>>>>>>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, >>>>>>>>>>>>>> 'col2, >>>>>>>>>>>>>> 'long, 'proctime) >>>>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Note: Append is a special UDF for built-in that can pass >> through >>>>>>>>>>>>>> any >>>>>>>>>>>>>> column. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So, May be we can defined the as table.map(Expression) >> first, >>>>>>> If >>>>>>>>>>>>>> necessary, we can extend to table.map(Expression*) in the >>>>> future >>>>>>>>>>>>>> ? >>>>>>>>>> Of >>>>>>>>>>>>>> course, I also hope that we can do more perfection in this >>>>>>>>>>>>>> proposal >>>>>>>>>>>>> through >>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Fabian, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think that the key question you raised is if we allow extra >>>>>>>>>>>>> parameters >>>>>>>>>>>>>> in >>>>>>>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing >>>>> that >>>>>>>>> may >>>>>>>>>>>>>> appear >>>>>>>>>>>>>>> more convenient in some cases. However, it might also cause >>>>> some >>>>>>>>>>>>>> confusions >>>>>>>>>>>>>>> if we do that. For example, do we allow multiple UDFs in >> these >>>>>>>>>>>>>> expressions? >>>>>>>>>>>>>>> If we do, the semantics may be weird to define, e.g. what >> does >>>>>>>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? >>>>>>>>>>>>>>> Even >>>>>>>>>>>>> though >>>>>>>>>>>>>>> not allowing it may appear less powerful, but it can make >>>>> things >>>>>>>>> more >>>>>>>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the >>>>>>> keys >>>>>>>>> to >>>>>>>>>>>> be >>>>>>>>>>>>>>> implied in the result table and appears at the beginning. You >>>>>>> can >>>>>>>>>>>> use a >>>>>>>>>>>>>>> select method if you want to modify this behavior. I think >> that >>>>>>>>>>>>>> eventually >>>>>>>>>>>>>>> we will have some API which allows other expressions as >>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>> parameters, but I think it's better to do that after we >>>>>>> introduce >>>>>>>>> the >>>>>>>>>>>>>>> concept of nested tables. A lot of things we suggested here >> can >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>> considered as special cases of that. But things are much >>>>> simpler >>>>>>>>>>>>>>> if >>>>>>>>>>>> we >>>>>>>>>>>>>>> leave that to later. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> Xiaowei >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske < >>>>> fhue...@gmail.com >>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * Re emit: >>>>>>>>>>>>>>>> I think we should start with a well understood semantics of >>>>>>> full >>>>>>>>>>>>>>>> replacement. This is how the other agg functions work. >>>>>>>>>>>>>>>> As was said before, there are open questions regarding an >>>>>>> append >>>>>>>>>>>> mode >>>>>>>>>>>>>>>> (checkpointing, whether supporting retractions or not and if >>>>>>> yes >>>>>>>>>>>> how >>>>>>>>>>>>> to >>>>>>>>>>>>>>>> declare them, ...). >>>>>>>>>>>>>>>> Since this seems to be an optimization, I'd postpone it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * Re grouping keys: >>>>>>>>>>>>>>>> I don't think we should automatically add them because the >>>>>>>>>>>>>>>> result >>>>>>>>>>>>>> schema >>>>>>>>>>>>>>>> would not be intuitive. >>>>>>>>>>>>>>>> Would they be added at the beginning of the tuple or at the >>>>>>> end? >>>>>>>>>>>> What >>>>>>>>>>>>>>>> metadata fields of windows would be added? In which order >>>>> would >>>>>>>>>>>> they >>>>>>>>>>>>> be >>>>>>>>>>>>>>>> added? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> However, we could support syntax like this: >>>>>>>>>>>>>>>> val t: Table = ??? >>>>>>>>>>>>>>>> t >>>>>>>>>>>>>>>> .window(Tumble ... as 'w) >>>>>>>>>>>>>>>> .groupBy('a, 'b) >>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime >>>>>>> as >>>>>>>>>>>>>> 'rtime) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2, >>>>>>>>>>>>>>>> ..., >>>>>>>>>>>> fn, >>>>>>>>>>>>>>> wend, >>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the >> UDF. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * Re Multi-staged evaluation: >>>>>>>>>>>>>>>> I think this should be an optimization that can be applied >> if >>>>>>>>>>>>>>>> the >>>>>>>>>>>> UDF >>>>>>>>>>>>>>>> implements the merge() method. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >>>>>>>>>>>>>>>> wshaox...@gmail.com >>>>>>>>>>>>>>>>> : >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi xiaowei, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yes, I agree with you that the semantics of >>>>>>>>>>>> TableAggregateFunction >>>>>>>>>>>>>> emit >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> much more complex than AggregateFunction. The fundamental >>>>>>>>>>>>> difference >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> that TableAggregateFunction emits a "table" while >>>>>>>>>>>> AggregateFunction >>>>>>>>>>>>>>>> outputs >>>>>>>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it >>>>>>> only >>>>>>>>>>>> has >>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>> mode which is “replacing” (complete update). But for >>>>>>>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit >>>>> the >>>>>>>>>>>> new >>>>>>>>>>>>>>>> updated >>>>>>>>>>>>>>>>> results) update or complete update (always emit the entire >>>>>>>>>>>>>>>>> table >>>>>>>>>>>>> when >>>>>>>>>>>>>>>>> “emit" is triggered). From the performance perspective, we >>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>> want >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> use incremental update. But we need review and design this >>>>>>>>>>>>> carefully, >>>>>>>>>>>>>>>>> especially taking into account the cases of the failover >>>>>>>>>>>>>>>>> (instead >>>>>>>>>>>>> of >>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>> back-up the ACC it may also needs to remember the emit >>>>> offset) >>>>>>>>>>>> and >>>>>>>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction >> emit >>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it >>>>> does >>>>>>>>>>>> not >>>>>>>>>>>>>> need >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> worry this due to the nature of stateless. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> Shaoxuan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang >>>>>>>>>>>>>>>>> <xiaow...@gmail.com >>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for adding the public interfaces! I think that >> it's a >>>>>>>>>>>> very >>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>> start. There are a few points that we need to have more >>>>>>>>>>>>>> discussions. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast, >>>>>>>>>>>>>>> definitely >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> most complex user defined objects we introduced so far. I >>>>>>>>>>>>> think >>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> quite some interesting questions here. For example, do we >>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the >>>>>>>>>>>>> semantics >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> emit? Is >>>>>>>>>>>>>>>>>> it amendments to the previous output, or replacing it? I >>>>>>>>>>>> think >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>> subject itself is worth a discussion to make sure we get >>>>>>> the >>>>>>>>>>>>>>> details >>>>>>>>>>>>>>>>>> right. >>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically >>>>>>>>>>>> appear >>>>>>>>>>>>> in >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> output? how about the case of windowing aggregation? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>> Xiaowei >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >>>>>>>>>>>>>>> sunjincheng...@gmail.com> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, Xiaowei, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >>>>>>>>>>>>> Outline >>>>>>>>>>>>>> ! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I quickly looked at the overall content, these are good >>>>>>>>>>>>>> expressions >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> our >>>>>>>>>>>>>>>>>>> offline discussions. But from the points of my view, we >>>>>>>>>>>> should >>>>>>>>>>>>>> add >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> usage of public interfaces that we will introduce in this >>>>>>>>>>>>>> propose. >>>>>>>>>>>>>>>>> So, I >>>>>>>>>>>>>>>>>>> added the following usage description of interface and >>>>>>>>>>>>> operators >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> google doc: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. Map Operator >>>>>>>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator >> can >>>>>>>>>>>>>>> apply a >>>>>>>>>>>>>>>>>>> scalar function, and can return multi-column. The usage >> as >>>>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2. FlatMap Operator >>>>>>>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >>>>>>>>>>>>> operator >>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>> apply >>>>>>>>>>>>>>>>>>> a table function, and can return multi-row. The usage as >>>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3. Agg Operator >>>>>>>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable, >> Agg >>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>> apply a aggregate function, and can return multi-column. >>>>> The >>>>>>>>>>>>>> usage >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >>>>>>>>>>>> global >>>>>>>>>>>>>>>>>> aggregates >>>>>>>>>>>>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 4. FlatAgg Operator >>>>>>>>>>>>>>>>>>> FlatAgg operator is a new operator of >> Table/GroupedTable, >>>>>>>>>>>>>>> FaltAgg >>>>>>>>>>>>>>>>>>> operator can apply a table aggregate function, and can >>>>>>> return >>>>>>>>>>>>>>>>> multi-row. >>>>>>>>>>>>>>>>>>> The usage as follows: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >>>>>>>>>>>>> global >>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>> aggregates >>>>>>>>>>>>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction >>>>>>>>>>>>>>>>>>> The behavior of table aggregates is most like >>>>>>>>>>>>>>>> GroupReduceFunction >>>>>>>>>>>>>>>>>> did, >>>>>>>>>>>>>>>>>>> which computed for a group of elements, and output a >> group >>>>>>>>>>>> of >>>>>>>>>>>>>>>>> elements. >>>>>>>>>>>>>>>>>>> The TableAggregateFunction can be applied on >>>>>>>>>>>>>>> GroupedTable.flatAgg() . >>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content, >>>>> so >>>>>>>>>>>> I >>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>> copy >>>>>>>>>>>>>>>>>>> it here, Please look at the detail in google doc: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and >>>>>>>>>>>>> commenting. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> >>>>> >> ----------------------------------------------------------------------------------- >>>>>>> >>>>>>> *Rome was not built in one day* >>>>>>> >>>>>>> >>>>>>> >>>>> >> ----------------------------------------------------------------------------------- >>>>>>> >>>>> >>>>> >> >>