Hi Jincheng & Fabian, +1 From my point of view.
I like this idea that you have to close `flatAggregate` with `select` statement. In a way it will be consistent with normal `groupBy` and indeed it solves the problem of mixing table and scalar functions. I would be against supporting `select(‘*)` that returns only `col1` and `col2`. `select('*)` for me would mean “give me everything that you have” and it would be very confusing for me that: tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) .flatAggregate(tableAgg('a)) .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) would return more columns compared to: tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) .flatAggregate(tableAgg('a)) .select(‘*) Also I would be fine with not supporting ever `.select(‘*)`. After all it would be consistent with regular aggregation where this doesn’t make sense and is not supported as well: tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) .select(‘*) Piotrek > On 29 Nov 2018, at 10:51, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi, > > OK, not supporting select('*) in the first version, sounds like a good > first step, +1 for this. > > However, I don't think that select('*) returning only the result columns of > the agg function would be a significant break in semantics. > Since aggregate()/flatAggregate() is the last command and (visibly) only > forwards the result columns of the agg function, returning those for '* > seems fine with me. > The grouping columns (+window properties) are "side-passed" from the > groupBy. > > So, IMO, both select('*) and select('k1, 'k2, 'w.rowtime, '*) have well > defined semantics. But we can add these shortcuts later. > > Best, > Fabian > > > Am Mi., 28. Nov. 2018 um 11:13 Uhr schrieb jincheng sun < > sunjincheng...@gmail.com>: > >> Hi Fabian, >> >> Thank you for listing the detailed example of forcing the use of select. >> >> If I didn't make it clear before, I would like to share my thoughts about >> the group keys here: >> >> 1. agg/flatagg(Expression) keeps a single Expression; >> 2. The way to force users to use select is as follows(As you mentioned): >> >> val tabX2 = tab.window(Tumble ... as 'w) >> >> .groupBy('w, 'k1, 'k2) >> >> .flatAgg(tableAgg('a)) >> >> .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) >> >> >> 3. IMO. we do not support select "*" in window group table at the first >> version. >> But i am fine if you want support the shout cut,i.e.: >> `select('w.rowtime, 'k1, 'k2, '*)` , which meant "*" does not include group >> keys. >> Just a little bit worried that "*" represents all columns in a >> non-grouped table, and groupedTable does not represent all columns. >> The semantics of “*” are not uniform in this case. >> >> What do you think? >> >> Thanks, >> Jincheng >> >> >> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午7:27写道: >> >>> I don't think we came to a conclusion yet. >>> Are you suggesting that 'w would disappear if we do the following: >>> >>> val tabX = tab.window(Tumble ... as 'w) >>> .groupBy('w, 'k1, 'k2) >>> .flatAgg(tableAgg('a)) >>> .select('*) >>> >>> Given that tableAgg returns [col1, col2], what would the schema of tabX >> be? >>> >>> 1) [col1, col2]: fine with me >>> 2) [k1, k2, col1, col2]: I'm very skeptical about this option because it >> is >>> IMO inconsistent. Users will ask, where did 'w go. >>> 2) [w, k1, k2, col1, col2]: back to our original problem. What's the data >>> type of 'w? >>> >>> If we go for option 1, we can still allow to select keys. >>> >>> val tabX2 = tab.window(Tumble ... as 'w) >>> .groupBy('w, 'k1, 'k2) >>> .flatAgg(tableAgg('a)) >>> .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) >>> >>> The good thing about enforcing select is that the result of flatAgg is >> not >>> a Table, i.e., the definition of the operation is not completed yet. >>> On the other hand, we would need to list all result attributes of the >> table >>> function if we want to select keys. We could to that with a short cut: >>> >>> val tabX3 = tab.window(Tumble ... as 'w) >>> .groupBy('w, 'k1, 'k2) >>> .flatAgg(tableAgg('a)) >>> .select('w.rowtime, 'k1, 'k2, '*) >>> >>> >>> Am Di., 27. Nov. 2018 um 10:46 Uhr schrieb jincheng sun < >>> sunjincheng...@gmail.com>: >>> >>>> Thanks Fabian, if we enforcing select, as i said before user should >> using >>>> 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX' etc. In this way we >>>> should not defined the type of 'w, we can keep the current way of using >>> 'w. >>>> I'll file the JIRA. and open the PR ASAP. >>>> >>>> Thanks, >>>> Jincheng >>>> >>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午4:50写道: >>>> >>>>> I thought about it again. >>>>> Enforcing select won't help us with the choice of how to represent >> 'w. >>>>> >>>>> select('*) and >>>>> select('a, 'b, 'w). >>>>> >>>>> would still be valid expressions and we need to decide how 'w is >>>>> represented. >>>>> As I said before, Tuple, Row, and Map have disadvantages because the >>>> syntax >>>>> 'w.rowtime or 'w.end would not be supported. >>>>> >>>>> Am Di., 27. Nov. 2018 um 01:05 Uhr schrieb jincheng sun < >>>>> sunjincheng...@gmail.com>: >>>>> >>>>>> Before we have a good support for nest-table, may be forcing the >> use >>> of >>>>>> select is good way, at least not causing compatibility issues. >>>>>> >>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月26日周一 下午6:48写道: >>>>>> >>>>>>> I think the question is what is the data type of 'w. >>>>>>> >>>>>>> Until now, I assumed it would be a nested tuple (Row or Tuple). >>>>>>> Accessing nested fields in Row, Tuple or Pojo is done with get, >>> i.e., >>>>>>> 'w.get("rowtime"). >>>>>>> Using a Map would not help because the access would be 'w.at >>>>> ("rowtime"). >>>>>>> >>>>>>> We can of course also enforce the select() if aggregate / >>>> flatAggregate >>>>>> do >>>>>>> not return a Table but some kind of AggregatedTable that does not >>>>> provide >>>>>>> any other function than select(). >>>>>>> >>>>>>> Am Mo., 26. Nov. 2018 um 01:12 Uhr schrieb jincheng sun < >>>>>>> sunjincheng...@gmail.com>: >>>>>>> >>>>>>>> Yes,I agree the problem is needs attention. IMO. It depends on >>> how >>>> we >>>>>>>> define the ‘w type. The way you above defines the 'w type as a >>>> tuple. >>>>>> If >>>>>>>> you serialize 'w to a Map, the compatibility will be better. >> Even >>>>> more >>>>>> we >>>>>>>> can define ‘w as a special type. UDF and Sink can't be used >>>> directly. >>>>>>> Must >>>>>>>> use 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX', and I >>> will >>>> be >>>>>>> very >>>>>>>> grateful if you can share your solution to this problem, and >> we >>>> also >>>>>> can >>>>>>>> discuss it carefully in the PR to be opened. What to you think? >>>>>>>> >>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午6:21写道: >>>>>>>> >>>>>>>>> Something like: >>>>>>>>> >>>>>>>>> val x = tab.window(Tumble ... as 'w) >>>>>>>>> .groupBy('w, 'k1, 'k2) >>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>>>> >>>>>>>>> x.insertInto("sinkTable") // fails because result schema has >>>>> changed >>>>>>> from >>>>>>>>> ((start, end, rowtime), k1, k2, col1, col2) to ((start, end, >>>>> rowtime, >>>>>>>>> newProperty), k1, k2, col1, col2) >>>>>>>>> x.select(myUdf('w)) // fails because UDF expects (start, end, >>>>>> rowtime) >>>>>>>> and >>>>>>>>> not (start, end, rowtime, newProperty) >>>>>>>>> >>>>>>>>> Basically, every time when the composite type 'w is used as a >>>>> whole. >>>>>>>>> >>>>>>>>> >>>>>>>>> Am Fr., 23. Nov. 2018 um 10:45 Uhr schrieb jincheng sun < >>>>>>>>> sunjincheng...@gmail.com>: >>>>>>>>> >>>>>>>>>> Hi Fabian, >>>>>>>>>> >>>>>>>>>> I don't fully understand the question you mentioned: >>>>>>>>>> >>>>>>>>>> Any query that relies on the composite type with three >> fields >>>>> will >>>>>>> fail >>>>>>>>>> >>>>>>>>>> after adding a forth field. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I am appreciate if you can give some detail examples ? >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> JIncheng >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午4:41写道: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> My concerns are about the case when there is no >> additional >>>>>> select() >>>>>>>>>> method, >>>>>>>>>>> i.e., >>>>>>>>>>> >>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>> .groupBy('w, 'k1, 'k2) >>>>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>>>>>> >>>>>>>>>>> In this case, 'w is a composite field consisting of three >>>>> fields >>>>>>>> (end, >>>>>>>>>>> start, rowtime). >>>>>>>>>>> Once we add a new property, it would need to be added to >>> the >>>>>>>> composite >>>>>>>>>>> type. >>>>>>>>>>> Any query that relies on the composite type with three >>> fields >>>>>> will >>>>>>>> fail >>>>>>>>>>> after adding a forth field. >>>>>>>>>>> >>>>>>>>>>> Best, Fabian >>>>>>>>>>> >>>>>>>>>>> Am Fr., 23. Nov. 2018 um 02:01 Uhr schrieb jincheng sun < >>>>>>>>>>> sunjincheng...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> Thanks Fabian, >>>>>>>>>>>> >>>>>>>>>>>> Thanks a lot for your feedback, and very important and >>>>>> necessary >>>>>>>>> design >>>>>>>>>>>> reminders! >>>>>>>>>>>> >>>>>>>>>>>> Yes, your are right! Spark is the specified grouping >>>> columns >>>>>>>>> displayed >>>>>>>>>>>> before 1.3, but the grouping columns are implicitly >>> passed >>>> in >>>>>>>>> spark1.4 >>>>>>>>>>> and >>>>>>>>>>>> later. The reason for changing this behavior is that >> due >>> to >>>>> the >>>>>>>> user >>>>>>>>>>>> feedback. Although implicit delivery will have the >>>> drawbacks >>>>>> you >>>>>>>>>>> mentioned, >>>>>>>>>>>> this approach is really convenient for the user. >>>>>>>>>>>> I agree that grouping on windows we have to pay >> attention >>>> to >>>>>> the >>>>>>>>>> handling >>>>>>>>>>>> of the window's properties, because we may introduce >> new >>>>> window >>>>>>>>>> property. >>>>>>>>>>>> So, from the points of view, We delay the processing of >>> the >>>>>>> window >>>>>>>>>>>> property, ie: we pass the complex type 'w on the >>> tableAPI, >>>>> and >>>>>>>>> provide >>>>>>>>>>>> different property method operations in the SELECT >>>> according >>>>> to >>>>>>> the >>>>>>>>>> type >>>>>>>>>>> of >>>>>>>>>>>> 'w, such as: 'w.start, 'w.end, 'w.xxx , in the TableAPI >>>> will >>>>>>> limit >>>>>>>>> and >>>>>>>>>>>> verify the attribute operations that 'w has. An example >>> is >>>> as >>>>>>>>> follows: >>>>>>>>>>>> >>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, >> 'col2) >>>> // >>>>> 'w >>>>>>> is >>>>>>>>>>>> composite field >>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'ts, 'w.xxx as >> 'xx) >>>> // >>>>> In >>>>>>>>> select >>>>>>>>>> we >>>>>>>>>>>> will limit and verify that ’w.xx is allowed >>>>>>>>>>>> >>>>>>>>>>>> I am not sure if I fully understand your concerns, if >>> there >>>>> any >>>>>>>>>>> understand >>>>>>>>>>>> are mistakes, please correct me. Any feedback is >>>> appreciate! >>>>>>>>>>>> >>>>>>>>>>>> Bests, >>>>>>>>>>>> Jincheng >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月22日周四 >>>> 下午10:13写道: >>>>>>>>>>>> >>>>>>>>>>>>> Hi all, >>>>>>>>>>>>> >>>>>>>>>>>>> First of all, it is correct that the >>> flatMap(Expression*) >>>>> and >>>>>>>>>>>>> flatAggregate(Expression*) methods would mix scalar >> and >>>>> table >>>>>>>>> values. >>>>>>>>>>>>> This would be a new concept that is not present in >> the >>>>>> current >>>>>>>> API. >>>>>>>>>>>>> From my point of view, the semantics are quite clear, >>>> but I >>>>>>>>>> understand >>>>>>>>>>>> that >>>>>>>>>>>>> others are more careful and worry about future >>>> extensions. >>>>>>>>>>>>> >>>>>>>>>>>>> I am fine with going for single expression arguments >>> for >>>>>> map() >>>>>>>> and >>>>>>>>>>>>> flatMap(). We can later expand them to Expression* if >>> we >>>>> feel >>>>>>> the >>>>>>>>>> need >>>>>>>>>>>> and >>>>>>>>>>>>> are more comfortable about the implications. >>>>>>>>>>>>> Whenever, a time attribute needs to be forwarded, >> users >>>> can >>>>>>> fall >>>>>>>>> back >>>>>>>>>>> to >>>>>>>>>>>>> join(TableFunction) as Xiaowei mentioned. >>>>>>>>>>>>> So we restrict the usability of the new methods but >>> don't >>>>>> lose >>>>>>>>>>>>> functionality and don't prevent future extensions. >>>>>>>>>>>>> >>>>>>>>>>>>> The aggregate() and flatAggregate() case is more >>>> difficult >>>>>>>> because >>>>>>>>>>>> implicit >>>>>>>>>>>>> forwarding of grouping fields cannot be changed later >>>>> without >>>>>>>>>> breaking >>>>>>>>>>>> the >>>>>>>>>>>>> API. >>>>>>>>>>>>> There are other APIs (e.g., Spark) that also >> implicitly >>>>>> forward >>>>>>>> the >>>>>>>>>>>>> grouping columns. So this is not uncommon. >>>>>>>>>>>>> However, I personally don't like that approach, >> because >>>> it >>>>> is >>>>>>>>>> implicit >>>>>>>>>>>> and >>>>>>>>>>>>> introduces a new behavior that is not present in the >>>>> current >>>>>>> API. >>>>>>>>>>>>> >>>>>>>>>>>>> One thing to consider here is the handling of >> grouping >>> on >>>>>>>> windows. >>>>>>>>>>>>> If I understood Xiaowei correctly, a composite field >>> that >>>>> is >>>>>>>> named >>>>>>>>>> like >>>>>>>>>>>> the >>>>>>>>>>>>> window alias (e.g., 'w) would be implicitly added to >>> the >>>>>> result >>>>>>>> of >>>>>>>>>>>>> aggregate() or flatAggregate(). >>>>>>>>>>>>> The composite field would have fields like (start, >> end, >>>>>>> rowtime) >>>>>>>> or >>>>>>>>>>>> (start, >>>>>>>>>>>>> end, proctime) depending on the window type. >>>>>>>>>>>>> If we would ever introduce a fourth window property, >> we >>>>> might >>>>>>>> break >>>>>>>>>>>>> existing queries. >>>>>>>>>>>>> Is this something that we should worry about? >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Fabian >>>>>>>>>>>>> >>>>>>>>>>>>> Am Do., 22. Nov. 2018 um 14:03 Uhr schrieb Piotr >>>> Nowojski < >>>>>>>>>>>>> pi...@data-artisans.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>>> >>>>>>>>>>>>>> #1) ok, got it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> #3) >>>>>>>>>>>>>>> From points of my view I we can using >>>>>>>>>>>>>>> `Expression`, and after the discussion decided to >>> use >>>>>>>>>> Expression*, >>>>>>>>>>>> then >>>>>>>>>>>>>>> improve it. In any case, we can use Expression, >> and >>>>> there >>>>>>> is >>>>>>>> an >>>>>>>>>>>>>> opportunity >>>>>>>>>>>>>>> to become Expression* (compatibility). If we use >>>>>>> Expression* >>>>>>>>>>>> directly, >>>>>>>>>>>>> it >>>>>>>>>>>>>>> is difficult for us to become Expression, which >>> will >>>>>> break >>>>>>>> the >>>>>>>>>>>>>>> compatibility between versions. What do you >> think? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I don’t think that’s the case here. If we start >> with >>>>> single >>>>>>>> param >>>>>>>>>>>>>> `flatMap(Expression)`, it will need implicit >> columns >>> to >>>>> be >>>>>>>>> present >>>>>>>>>> in >>>>>>>>>>>> the >>>>>>>>>>>>>> result, which: >>>>>>>>>>>>>> >>>>>>>>>>>>>> a) IMO it brakes SQL convention (that’s why I’m >>> against >>>>>> this) >>>>>>>>>>>>>> b) we can not later easily introduce >>>>> `flatMap(Expression*)` >>>>>>>>> without >>>>>>>>>>>> those >>>>>>>>>>>>>> implicit columns, without braking the compatibility >>> or >>>> at >>>>>>> least >>>>>>>>>>> without >>>>>>>>>>>>>> making `flatMap(Expression*)` and >>> `flatMap(Expression)` >>>>>>>> terribly >>>>>>>>>>>>>> inconsistent. >>>>>>>>>>>>>> >>>>>>>>>>>>>> To elaborate on (a). It’s not nice if our own API >> is >>>>>>>> inconsistent >>>>>>>>>> and >>>>>>>>>>>> it >>>>>>>>>>>>>> sometimes behaves one way and sometimes another >> way: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >> table.groupBy(‘k).select(scalarAggregateFunction(‘v)) >>>> => >>>>>>> single >>>>>>>>>>> column >>>>>>>>>>>>>> result, just the output of >> `scalarAggregateFunction` >>>>>>>>>>>>>> vs >>>>>>>>>>>>>> >>>>> table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v)) >>>>>>> => >>>>>>>>> both >>>>>>>>>>>>> result >>>>>>>>>>>>>> of `tableAggregateFunction` plus key (and an >> optional >>>>>> window >>>>>>>>>> context >>>>>>>>>>> ?) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thus I think we have to now decide which way we >> want >>> to >>>>>> jump, >>>>>>>>> since >>>>>>>>>>>> later >>>>>>>>>>>>>> will be too late. Or again, am I missing something? >>> :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 22 Nov 2018, at 02:07, jincheng sun < >>>>>>>>> sunjincheng...@gmail.com >>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Piotrek, >>>>>>>>>>>>>>> #1)We have unbounded and bounded group window >>>>> aggregate, >>>>>>> for >>>>>>>>>>>> unbounded >>>>>>>>>>>>>> case >>>>>>>>>>>>>>> we should early fire the result with retract >>> message, >>>>> we >>>>>>> can >>>>>>>>> not >>>>>>>>>>>> using >>>>>>>>>>>>>>> watermark, because unbounded aggregate never >>>> finished. >>>>>> (for >>>>>>>>>>>> improvement >>>>>>>>>>>>>> we >>>>>>>>>>>>>>> can introduce micro-batch in feature), for >> bounded >>>>>> window >>>>>>> we >>>>>>>>>> never >>>>>>>>>>>>>> support >>>>>>>>>>>>>>> early fire, so we do not need retract. >>>>>>>>>>>>>>> #3) About validation of >>>> `table.select(F(‘a).unnest(), >>>>>> ‘b, >>>>>>>>>>>>>>> G(‘c).unnest())/table.flatMap(F(‘a), ‘b, >>>> scalarG(‘c))` >>>>>>> Fabian >>>>>>>>> had >>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>>> above, please look at the prior mail. For >>>>>>>>> `table.flatMap(F(‘a), >>>>>>>>>>> ‘b, >>>>>>>>>>>>>>> scalarG(‘c))` that we concerned, i.e.: we should >>>>> discuss >>>>>>> the >>>>>>>>>> issue >>>>>>>>>>>> of >>>>>>>>>>>>>>> `Expression*` vs `Expression`. From points of my >>>> view I >>>>>> we >>>>>>>> can >>>>>>>>>>> using >>>>>>>>>>>>>>> `Expression`, and after the discussion decided to >>> use >>>>>>>>>> Expression*, >>>>>>>>>>>> then >>>>>>>>>>>>>>> improve it. In any case, we can use Expression, >> and >>>>> there >>>>>>> is >>>>>>>> an >>>>>>>>>>>>>> opportunity >>>>>>>>>>>>>>> to become Expression* (compatibility). If we use >>>>>>> Expression* >>>>>>>>>>>> directly, >>>>>>>>>>>>> it >>>>>>>>>>>>>>> is difficult for us to become Expression, which >>> will >>>>>> break >>>>>>>> the >>>>>>>>>>>>>>> compatibility between versions. What do you >> think? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If there anything not clearly, welcome any >>>>>>>>> feedback!Agains,thanks >>>>>>>>>>> for >>>>>>>>>>>>>> share >>>>>>>>>>>>>>> your thoughts! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> >>>>> 于2018年11月21日周三 >>>>>>>>>> 下午9:37写道: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> #1) No,watermark solves the issue of the late >>>> event. >>>>>>> Here, >>>>>>>>> the >>>>>>>>>>>>>>>> performance >>>>>>>>>>>>>>>>> problem is caused by the update emit mode. >> i.e.: >>>> When >>>>>>>> current >>>>>>>>>>>>>> calculation >>>>>>>>>>>>>>>>> result is output, the previous calculation >> result >>>>> needs >>>>>>> to >>>>>>>> be >>>>>>>>>>>>>> retracted. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hmm, yes I missed this. For time-windowed cases >>>> (some >>>>>>>>>>>>>>>> aggregate/flatAggregate cases) emitting only on >>>>>> watermark >>>>>>>>> should >>>>>>>>>>>> solve >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> problem. For non time windowed cases it would >>> reduce >>>>> the >>>>>>>>> amount >>>>>>>>>> of >>>>>>>>>>>>>>>> retractions, right? Or am I still missing >>> something? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> #3)I still hope to keep the simplicity that >>> select >>>>> only >>>>>>>>> support >>>>>>>>>>>>>> projected >>>>>>>>>>>>>>>>> scalar, we can hardly tell the semantics of >>>>>>>>>>> tab.select(flatmap('a), >>>>>>>>>>>>> 'b, >>>>>>>>>>>>>>>>> flatmap('d)). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest()) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Could be rejected during some validation phase. >> On >>>> the >>>>>>> other >>>>>>>>>> hand: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> table.select(F(‘a).unnest(), ‘b, scalarG(‘c)) >>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>> table.flatMap(F(‘a), ‘b, scalarG(‘c)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Could work and be more or less a syntax sugar >> for >>>>> cross >>>>>>>> apply. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 21 Nov 2018, at 12:16, jincheng sun < >>>>>>>>>> sunjincheng...@gmail.com >>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi shaoxuan & Hequn, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for your suggestion,I'll file the JIRAs >>>> later. >>>>>>>>>>>>>>>>> We can prepare PRs while continuing to move >>> forward >>>>> the >>>>>>>>> ongoing >>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> jincheng sun <sunjincheng...@gmail.com> >>>>> 于2018年11月21日周三 >>>>>>>>>> 下午7:07写道: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Piotrek, >>>>>>>>>>>>>>>>>> Thanks for your feedback, and thanks for >> share >>>> your >>>>>>>>> thoughts! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> #1) No,watermark solves the issue of the late >>>> event. >>>>>>> Here, >>>>>>>>> the >>>>>>>>>>>>>>>> performance >>>>>>>>>>>>>>>>>> problem is caused by the update emit mode. >> i.e.: >>>>> When >>>>>>>>> current >>>>>>>>>>>>>>>> calculation >>>>>>>>>>>>>>>>>> result is output, the previous calculation >>> result >>>>>> needs >>>>>>> to >>>>>>>>> be >>>>>>>>>>>>>> retracted. >>>>>>>>>>>>>>>>>> #2) As I mentioned above we should continue >> the >>>>>>> discussion >>>>>>>>>> until >>>>>>>>>>>> we >>>>>>>>>>>>>>>> solve >>>>>>>>>>>>>>>>>> the problems raised by Xiaowei and Fabian. >>>>>>>>>>>>>>>>>> #3)I still hope to keep the simplicity that >>> select >>>>>> only >>>>>>>>>> support >>>>>>>>>>>>>>>> projected >>>>>>>>>>>>>>>>>> scalar, we can hardly tell the semantics of >>>>>>>>>>>> tab.select(flatmap('a), >>>>>>>>>>>>>> 'b, >>>>>>>>>>>>>>>>>> flatmap('d)). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> >>>>>> 于2018年11月21日周三 >>>>>>>>>>> 下午5:24写道: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In fact, in addition to the design of APIs, >>>> there >>>>>> will >>>>>>>> be >>>>>>>>>>>> various >>>>>>>>>>>>>>>>>>>> performance optimization details, such as: >>> table >>>>>>>> Aggregate >>>>>>>>>>>>> function >>>>>>>>>>>>>>>>>>>> emitValue will generate multiple calculation >>>>>> results, >>>>>>> in >>>>>>>>>>> extreme >>>>>>>>>>>>>>>> cases, >>>>>>>>>>>>>>>>>>>> each record will trigger a large number of >>>> retract >>>>>>>>> messages, >>>>>>>>>>>> this >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>> poor performance >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Can this be solved/mitigated by emitting the >>>>> results >>>>>>> only >>>>>>>>> on >>>>>>>>>>>>>>>> watermarks? >>>>>>>>>>>>>>>>>>> I think that was the path that we decided to >>> take >>>>>> both >>>>>>>> for >>>>>>>>>>>> Temporal >>>>>>>>>>>>>>>> Joins >>>>>>>>>>>>>>>>>>> and upsert stream conversion. I know that >> this >>>>>>> increases >>>>>>>>> the >>>>>>>>>>>>> latency >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> there is a place for a future global >>> setting/user >>>>>>>>> preference >>>>>>>>>>>> “emit >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>> ASAP mode”, but emitting only on watermarks >>> seems >>>>> to >>>>>> me >>>>>>>> as >>>>>>>>> a >>>>>>>>>>>>>>>> better/more >>>>>>>>>>>>>>>>>>> sane default. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> With respect to the API discussion and >> implicit >>>>>>> columns. >>>>>>>>> The >>>>>>>>>>>>> problem >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> me so far is I’m not sure if I like the >>>>> additionally >>>>>>>>>> complexity >>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> `append()` solution, while implicit columns >> are >>>>>>>> definitely >>>>>>>>>> not >>>>>>>>>>> in >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> spirit of SQL. Neither joins nor aggregations >>> add >>>>>> extra >>>>>>>>>>>> unexpected >>>>>>>>>>>>>>>> columns >>>>>>>>>>>>>>>>>>> to the result without asking. This definitely >>> can >>>>> be >>>>>>>>>> confusing >>>>>>>>>>>> for >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> users since it brakes the convention. Thus I >>>> would >>>>>> lean >>>>>>>>>> towards >>>>>>>>>>>>>>>> Fabian’s >>>>>>>>>>>>>>>>>>> proposal of multi-argument `map(Expression*)` >>>> from >>>>>>> those >>>>>>>> 3 >>>>>>>>>>>> options. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Another topic is that I’m not 100% convinced >>> that >>>>> we >>>>>>>> should >>>>>>>>>> be >>>>>>>>>>>>> adding >>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>> api functions for `map`,`aggregate`,`flatMap` >>> and >>>>>>>>>>>> `flatAggregate`. >>>>>>>>>>>>> I >>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>> the same could be achieved by changing >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> table.map(F('x)) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> table.select(F('x)).unnest() >>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>> table.select(F('x).unnest()) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Where `unnest()` means unnest row/tuple type >>>> into a >>>>>>>>> columnar >>>>>>>>>>>> table. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> table.flatMap(F('x)) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Could be on the other hand also handled by >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> table.select(F('x)) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> By correctly deducing that F(x) is a multi >> row >>>>> output >>>>>>>>>> function >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Same might apply to `aggregate(F('x))`, but >>> this >>>>>> maybe >>>>>>>>> could >>>>>>>>>> be >>>>>>>>>>>>>>>> replaced >>>>>>>>>>>>>>>>>>> by: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> table.groupBy(…).select(F('x).unnest()) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Adding scalar functions should also be >>> possible: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> table.groupBy('k).select(F('x).unnest(), ‘k) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Maybe such approach would allow us to >> implement >>>> the >>>>>>> same >>>>>>>>>>> features >>>>>>>>>>>>> in >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> SQL as well? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng < >>>>>>>>> chenghe...@gmail.com >>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thank you all for the great proposal and >>>>> discussion! >>>>>>>>>>>>>>>>>>>> I also prefer to move on to the next step, >> so >>> +1 >>>>> for >>>>>>>>> opening >>>>>>>>>>> the >>>>>>>>>>>>>> JIRAs >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> start the work. >>>>>>>>>>>>>>>>>>>> We can have more detailed discussion there. >>> Btw, >>>>> we >>>>>>> can >>>>>>>>>> start >>>>>>>>>>>> with >>>>>>>>>>>>>>>> JIRAs >>>>>>>>>>>>>>>>>>>> which we have agreed on. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>> Hequn >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan >>> Wang < >>>>>>>>>>>>> wshaox...@gmail.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> +1. I agree that we should open the JIRAs >> to >>>>> start >>>>>>> the >>>>>>>>>> work. >>>>>>>>>>> We >>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>>>> have better ideas on the flavor of the >>>> interface >>>>>> when >>>>>>>>>>>>>>>> implement/review >>>>>>>>>>>>>>>>>>>>> the code. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>> shaoxuan >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 11/20/18, jincheng sun < >>>>>> sunjincheng...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks all for the feedback. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @Piotr About not using abbreviations >> naming, >>>>> +1,I >>>>>>>> like >>>>>>>>>>>>>>>>>>>>>> your proposal!Currently both DataSet and >>>>>> DataStream >>>>>>>> API >>>>>>>>>> are >>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>>>>>> `aggregate`, >>>>>>>>>>>>>>>>>>>>>> BTW,I find other language also not using >>>>>>> abbreviations >>>>>>>>>>>>> naming,such >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> R. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Sometimes the interface of the API is >> really >>>>>>> difficult >>>>>>>>> to >>>>>>>>>>>>> perfect, >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>> to spend a lot of time thinking and >> feedback >>>>> from >>>>>> a >>>>>>>>> large >>>>>>>>>>>> number >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> users, >>>>>>>>>>>>>>>>>>>>>> and constantly improve, but for backward >>>>>>> compatibility >>>>>>>>>>> issues, >>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> have to >>>>>>>>>>>>>>>>>>>>>> adopt the most conservative approach when >>>>>> designing >>>>>>>> the >>>>>>>>>>> API(Of >>>>>>>>>>>>>>>>>>> course, I >>>>>>>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>>>>>>>> more in favor of developing more rich >>>> features, >>>>>> when >>>>>>>> we >>>>>>>>>>>> discuss >>>>>>>>>>>>>>>>>>> clearly). >>>>>>>>>>>>>>>>>>>>>> Therefore, I propose to divide the >> function >>>>>>>>> implementation >>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> map/faltMap/agg/flatAgg into basic >> functions >>>> of >>>>>>> JIRAs >>>>>>>>> and >>>>>>>>>>>> JIRAs >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> support time attributes and groupKeys. We >>> can >>>>>>> develop >>>>>>>>> the >>>>>>>>>>>>> features >>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>> we have already agreed on the design. And >>> we >>>>> will >>>>>>>>>> continue >>>>>>>>>>> to >>>>>>>>>>>>>>>> discuss >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> uncertain design. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In fact, in addition to the design of >> APIs, >>>>> there >>>>>>> will >>>>>>>>> be >>>>>>>>>>>>> various >>>>>>>>>>>>>>>>>>>>>> performance optimization details, such as: >>>> table >>>>>>>>> Aggregate >>>>>>>>>>>>>> function >>>>>>>>>>>>>>>>>>>>>> emitValue will generate multiple >> calculation >>>>>>> results, >>>>>>>> in >>>>>>>>>>>> extreme >>>>>>>>>>>>>>>>>>> cases, >>>>>>>>>>>>>>>>>>>>>> each record will trigger a large number of >>>>> retract >>>>>>>>>> messages, >>>>>>>>>>>>> this >>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>> poor performance,so we will also optimize >>> the >>>>>>>> interface >>>>>>>>>>>> design, >>>>>>>>>>>>>> such >>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>> adding the emitWithRetractValue interface >> (I >>>>> have >>>>>>>>> updated >>>>>>>>>>> the >>>>>>>>>>>>>> google >>>>>>>>>>>>>>>>>>> doc) >>>>>>>>>>>>>>>>>>>>>> to allow the user to optionally perform >>>>>> incremental >>>>>>>>>>>>> calculations, >>>>>>>>>>>>>>>> thus >>>>>>>>>>>>>>>>>>>>>> avoiding a large number of retracts. >> Details >>>>> like >>>>>>> this >>>>>>>>> are >>>>>>>>>>>>>> difficult >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> fully discuss in the mail list, so I >>> recommend >>>>>>>> creating >>>>>>>>>>>>> JIRAs/FLIP >>>>>>>>>>>>>>>>>>> first, >>>>>>>>>>>>>>>>>>>>>> we develop designs that have been agreed >>> upon >>>>> and >>>>>>>>> continue >>>>>>>>>>> to >>>>>>>>>>>>>>>> discuss >>>>>>>>>>>>>>>>>>>>>> non-deterministic designs! What do you >>> think? >>>>>>>> @Fabian & >>>>>>>>>>>> Piotr & >>>>>>>>>>>>>>>>>>> XiaoWei >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> >>>>> 于2018年11月19日周一 >>>>>>>>>>> 上午12:07写道: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Fabian & Piotr, thanks for the >> feedback! >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I appreciate your concerns, both on >>> timestamp >>>>>>>>> attributes >>>>>>>>>> as >>>>>>>>>>>>> well >>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>> implicit group keys. At the same time, >> I'm >>>> also >>>>>>>>> concerned >>>>>>>>>>>> with >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>>>>>> approach of allowing Expression* as >>>> parameters, >>>>>>>>>> especially >>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>> flatMap/flatAgg. So far, we never >> allowed a >>>>>> scalar >>>>>>>>>>> expression >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> appear >>>>>>>>>>>>>>>>>>>>>>> together with table expressions. With the >>>>>>> Expression* >>>>>>>>>>>> approach, >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>> happen for the parameters to >>> flatMap/flatAgg. >>>>>> I'm a >>>>>>>> bit >>>>>>>>>>>>> concerned >>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>> fully understand the consequences when we >>> try >>>>> to >>>>>>>> extend >>>>>>>>>> our >>>>>>>>>>>>>> system >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> future. I would be extra cautious in >> doing >>>>> this. >>>>>> To >>>>>>>>> avoid >>>>>>>>>>>>> this, I >>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>> implicit group key for flatAgg is safer. >>> For >>>>>>> flatMap, >>>>>>>>> if >>>>>>>>>>>> users >>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> keep >>>>>>>>>>>>>>>>>>>>>>> the rowtime column, he can use >>>> crossApply/join >>>>>>>> instead. >>>>>>>>>> So >>>>>>>>>>> we >>>>>>>>>>>>> are >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>> losing any real functionality here. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Also a clarification on the following >>>> example: >>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a >>>> group >>>>>>> key. >>>>>>>>>>>>>>>>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, >>>> 'col1, >>>>>>>> 'col2) >>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as >> 'rtime) >>>>>>>>>>>>>>>>>>>>>>> If we did not have the select clause in >>> this >>>>>>> example, >>>>>>>>> we >>>>>>>>>>> will >>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>> 'w as >>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>> regular column in the output. It should >> not >>>>>>> magically >>>>>>>>>>>>> disappear. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> The concern is not as strong for >>>>>>> Table.map/Table.agg >>>>>>>>>>> because >>>>>>>>>>>> we >>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>> mixing scalar and table expressions. But >> we >>>>> also >>>>>>> want >>>>>>>>> to >>>>>>>>>>> be a >>>>>>>>>>>>> bit >>>>>>>>>>>>>>>>>>>>>>> consistent with these methods. If we used >>>>>> implicit >>>>>>>>> group >>>>>>>>>>> keys >>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>> Table.flatAgg, we probably should do the >>> same >>>>> for >>>>>>>>>>> Table.agg. >>>>>>>>>>>>> Now >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>>> have to choose what to do with >> Table.map. I >>>> can >>>>>> see >>>>>>>>> good >>>>>>>>>>>>>> arguments >>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>> both sides. But starting with a single >>>>> Expression >>>>>>>> seems >>>>>>>>>>> safer >>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>> we can always extend to Expression* in >> the >>>>>> future. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> While thinking about this problem, it >>> appears >>>>>> that >>>>>>> we >>>>>>>>> may >>>>>>>>>>>> need >>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>> work >>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>> our handling of watermarks for SQL/Table >>> API. >>>>> Our >>>>>>>>> current >>>>>>>>>>> way >>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> propagating the watermarks from source >> all >>>> the >>>>>> way >>>>>>> to >>>>>>>>>> sink >>>>>>>>>>>>> might >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>> optimal. For example, after a tumbling >>>> window, >>>>>> the >>>>>>>>>>> watermark >>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>>>>> be advanced to just before the expiring >> of >>>> next >>>>>>>>> window. I >>>>>>>>>>>> think >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>> general, each operator may need to >> generate >>>> new >>>>>>>>>> watermarks >>>>>>>>>>>>>> instead >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> simply propagating them. Once we accept >>> that >>>>>>>> watermarks >>>>>>>>>> may >>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>>> during >>>>>>>>>>>>>>>>>>>>>>> the execution, it appears that the >>> timestamp >>>>>>> columns >>>>>>>>> may >>>>>>>>>>> also >>>>>>>>>>>>>>>>>>> change, as >>>>>>>>>>>>>>>>>>>>>>> long as we have some way to associate >>>> watermark >>>>>>> with >>>>>>>>> it. >>>>>>>>>> My >>>>>>>>>>>>>>>>>>> intuition is >>>>>>>>>>>>>>>>>>>>>>> that once we have a through solution for >>> the >>>>>>>> watermark >>>>>>>>>>> issue, >>>>>>>>>>>>> we >>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>> able to solve the problem we encountered >>> for >>>>>>>> Table.map >>>>>>>>>> in a >>>>>>>>>>>>>> cleaner >>>>>>>>>>>>>>>>>>> way. >>>>>>>>>>>>>>>>>>>>>>> But this is a complex issue which >> deserves >>> a >>>>>>>> discussion >>>>>>>>>> on >>>>>>>>>>>> its >>>>>>>>>>>>>> own. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>> Xiaowei >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr >>>>> Nowojski < >>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Isn’t the problem of multiple >> expressions >>>>>> limited >>>>>>>> only >>>>>>>>>> to >>>>>>>>>>>>>>>> `flat***` >>>>>>>>>>>>>>>>>>>>>>>> functions and to be more specific only >> to >>>>> having >>>>>>> two >>>>>>>>> (or >>>>>>>>>>>> more) >>>>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>> table functions passed as an >> expressions? >>>>>>>>>>>>>> `.flatAgg(TableAggA('a), >>>>>>>>>>>>>>>>>>>>>>>> scalarFunction1(‘b), >> scalarFunction2(‘c))` >>>>> seems >>>>>>> to >>>>>>>> be >>>>>>>>>>> well >>>>>>>>>>>>>>>> defined >>>>>>>>>>>>>>>>>>>>>>>> (duplicate result of every scalar >> function >>>> to >>>>>>> every >>>>>>>>>>> record. >>>>>>>>>>>> Or >>>>>>>>>>>>>> am >>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>> missing >>>>>>>>>>>>>>>>>>>>>>>> something? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Another remark, I would be in favour of >>> not >>>>>> using >>>>>>>>>>>>> abbreviations >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>> naming >>>>>>>>>>>>>>>>>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> >>>>>>> `flatAggregate`. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian >> Hueske < >>>>>>>>>>> fhue...@gmail.com >>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I said before, that I think that the >>>> append() >>>>>>>> method >>>>>>>>> is >>>>>>>>>>>>> better >>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>>>>> implicitly forwarding keys, but still, >> I >>>>>> believe >>>>>>> it >>>>>>>>>> adds >>>>>>>>>>>>>>>>>>> unnecessary >>>>>>>>>>>>>>>>>>>>>>>> boiler >>>>>>>>>>>>>>>>>>>>>>>>> plate code. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Moreover, I haven't seen a convincing >>>>> argument >>>>>>> why >>>>>>>>>>>>>>>> map(Expression*) >>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>> worse than map(Expression). In either >>> case >>>> we >>>>>>> need >>>>>>>> to >>>>>>>>>> do >>>>>>>>>>>> all >>>>>>>>>>>>>>>> kinds >>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>> checks to prevent invalid use of >>> functions. >>>>>>>>>>>>>>>>>>>>>>>>> If the method is not correctly used, we >>> can >>>>>> emit >>>>>>> a >>>>>>>>> good >>>>>>>>>>>> error >>>>>>>>>>>>>>>>>>>>> message >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> documenting map(Expression*) will be >>> easier >>>>>> than >>>>>>>>>>>>>>>>>>>>>>>> map(append(Expression*)), >>>>>>>>>>>>>>>>>>>>>>>>> in my opinion. >>>>>>>>>>>>>>>>>>>>>>>>> I think we should not add unnessary >>> syntax >>>>>> unless >>>>>>>>> there >>>>>>>>>>> is >>>>>>>>>>>> a >>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>>>> reason >>>>>>>>>>>>>>>>>>>>>>>>> and to be honest, I haven't seen this >>>> reason >>>>>> yet. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regarding the groupBy.agg() method, I >>> think >>>>> it >>>>>>>> should >>>>>>>>>>>> behave >>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>>> other method, i.e., not do any implicit >>>>>>> forwarding. >>>>>>>>>>>>>>>>>>>>>>>>> Let's take the example of the windowed >>>> group >>>>>> by, >>>>>>>> that >>>>>>>>>> you >>>>>>>>>>>>>> posted >>>>>>>>>>>>>>>>>>>>>>> before. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be >> a >>>>> group >>>>>>> key. >>>>>>>>>>>>>>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, >>>> 'col2) >>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as >> 'rtime) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> What happens if 'w.rowtime is not >>> selected? >>>>>> What >>>>>>> is >>>>>>>>> the >>>>>>>>>>>> data >>>>>>>>>>>>>> type >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> field 'w in the resulting Table? Is it >> a >>>>>> regular >>>>>>>>> field >>>>>>>>>> at >>>>>>>>>>>> all >>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>> system field that disappears if it is >> not >>>>>>> selected? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> IMO, the following syntax is shorter, >>> more >>>>>>>> explicit, >>>>>>>>>> and >>>>>>>>>>>>> better >>>>>>>>>>>>>>>>>>>>>>>>> aligned >>>>>>>>>>>>>>>>>>>>>>>>> with the regular window.groupBy.select >>>>>>> aggregations >>>>>>>>>> that >>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>> supported >>>>>>>>>>>>>>>>>>>>>>>>> today. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be >> a >>>>> group >>>>>>> key. >>>>>>>>>>>>>>>>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, >>>> agg('a)) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr >>> schrieb >>>>>>> jincheng >>>>>>>>>> sun < >>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian/Xiaowei, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I am very sorry for my late reply! >> Glad >>> to >>>>> see >>>>>>>> your >>>>>>>>>>> reply, >>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> sounds >>>>>>>>>>>>>>>>>>>>>>>>>> pretty good! >>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the approach with >> append() >>>>> which >>>>>>> can >>>>>>>>>>> clearly >>>>>>>>>>>>>>>> defined >>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> result schema is better which Fabian >>>>>> mentioned. >>>>>>>>>>>>>>>>>>>>>>>>>> In addition and append() and also >>> contains >>>>>>>> non-time >>>>>>>>>>>>>> attributes, >>>>>>>>>>>>>>>>>>>>>>>>>> e.g.: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime) >>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(append(udf('name), 'address, >>>>>>>>>> 'rowtime).as('col1, >>>>>>>>>>>>>> 'col2, >>>>>>>>>>>>>>>>>>>>>>>>>> 'address, 'rowtime) >>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on >> 'rowtime >>>> as >>>>>> 'w) >>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'address) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> In this way the append() is very >> useful, >>>> and >>>>>> the >>>>>>>>>>> behavior >>>>>>>>>>>> is >>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>>>>> similar >>>>>>>>>>>>>>>>>>>>>>>>>> to withForwardedFields() in DataSet. >>>>>>>>>>>>>>>>>>>>>>>>>> So +1 to using append() approach for >> the >>>>>>>>>>> map()&flatmap()! >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> But how about the agg() and flatAgg()? >>> In >>>>>>>>> agg/flatAgg >>>>>>>>>>>> case I >>>>>>>>>>>>>>>> agree >>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei's approach that define the >> keys >>> to >>>>> be >>>>>>>>> implied >>>>>>>>>> in >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> result >>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>> and appears at the beginning, for >>> example >>>> as >>>>>>>>> follows: >>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should >> be a >>>>> group >>>>>>>> key. >>>>>>>>>>>>>>>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, >>>> 'col2) >>>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as >>> 'rtime) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> What to you think? @Fabian @Xiaowei >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> >>>>>> 于2018年11月9日周五 >>>>>>>>>>> 下午6:35写道: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the summary! >>>>>>>>>>>>>>>>>>>>>>>>>>> I like the approach with append() >>> better >>>>> than >>>>>>> the >>>>>>>>>>>> implicit >>>>>>>>>>>>>>>>>>>>>>>>>>> forwarding >>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>>>> clearly indicates which fields are >>>>> forwarded. >>>>>>>>>>>>>>>>>>>>>>>>>>> However, I don't see much benefit >> over >>>> the >>>>>>>>>>>>>> flatMap(Expression*) >>>>>>>>>>>>>>>>>>>>>>>> variant, >>>>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>> we would still need to analyze the >> full >>>>>>>> expression >>>>>>>>>> tree >>>>>>>>>>>> to >>>>>>>>>>>>>>>> ensure >>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>>>>>>>>> most (or exactly?) one Scalar / >>>>> TableFunction >>>>>>> is >>>>>>>>>> used. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>> Fabian >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr >>> schrieb >>>>>>>> jincheng >>>>>>>>>> sun >>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> We are discussing very detailed >>> content >>>>>> about >>>>>>>> this >>>>>>>>>>>>> proposal. >>>>>>>>>>>>>>>> We >>>>>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>> trying >>>>>>>>>>>>>>>>>>>>>>>>>>>> to design the API in many aspects >>>>>>>> (functionality, >>>>>>>>>>>>>>>> compatibility, >>>>>>>>>>>>>>>>>>>>>>> ease >>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>> use, etc.). I think this is a very >>> good >>>>>>> process. >>>>>>>>>> Only >>>>>>>>>>>>> such a >>>>>>>>>>>>>>>>>>>>>>> detailed >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion, In order to develop PR >>> more >>>>>>> clearly >>>>>>>>> and >>>>>>>>>>>>> smoothly >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> later >>>>>>>>>>>>>>>>>>>>>>>>>>>> stage. I am very grateful to @Fabian >>> and >>>>>>>> @Xiaowei >>>>>>>>>> for >>>>>>>>>>>>>>>> sharing a >>>>>>>>>>>>>>>>>>>>>>>>>>>> lot >>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>> good ideas. >>>>>>>>>>>>>>>>>>>>>>>>>>>> About the definition of method >>>> signatures >>>>> I >>>>>>> want >>>>>>>>> to >>>>>>>>>>>> share >>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>>>>>>> points >>>>>>>>>>>>>>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>>>>>>>>>>>>> which I am discussing with fabian in >>>>> google >>>>>>> doc >>>>>>>>> (not >>>>>>>>>>> yet >>>>>>>>>>>>>>>>>>>>>>>>>>>> completed), >>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Assume we have a table: >>>>>>>>>>>>>>>>>>>>>>>>>>>> val tab = util.addTable[(Long, >>>>>>>> String)]("MyTable", >>>>>>>>>>>> 'long, >>>>>>>>>>>>>>>>>>>>> 'string, >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'proctime.proctime) >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Approach 1: >>>>>>>>>>>>>>>>>>>>>>>>>>>> case1: Map follows Source Table >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result = >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(udf('string)).as('proctime, >>>> 'col1, >>>>>>>>> 'col2)// >>>>>>>>>>>>> proctime >>>>>>>>>>>>>>>>>>>>>>> implied >>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>> the output >>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on >>>> 'proctime >>>>> as >>>>>>> 'w) >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian >>>>>> mentioned >>>>>>>>>> above) >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result = >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w >> should >>>> be >>>>> a >>>>>>>> group >>>>>>>>>> key. >>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, >>> 'w, >>>>>>> 'col1, >>>>>>>>>> 'col2) >>>>>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as >>>>> 'rtime) >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s >>>> approach, >>>>>>> which >>>>>>>>> the >>>>>>>>>>>> result >>>>>>>>>>>>>>>>>>> schema >>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>> clearly defined, but add a built-in >>>> append >>>>>>> UDF. >>>>>>>>> That >>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface >> only >>>>>> accept >>>>>>>> one >>>>>>>>>>>>>> Expression. >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result = >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(append(udf('string), 'long, >>>>>>> 'proctime)) >>>>>>>> as >>>>>>>>>>>> ('col1, >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'col2, >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'long, 'proctime) >>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on >>>> 'proctime >>>>>> as >>>>>>>> 'w) >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Note: Append is a special UDF for >>>> built-in >>>>>>> that >>>>>>>>> can >>>>>>>>>>> pass >>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>>>>>> column. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> So, May be we can defined the as >>>>>>>>>>> table.map(Expression) >>>>>>>>>>>>>>>> first, >>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>>>>>> necessary, we can extend to >>>>>>>> table.map(Expression*) >>>>>>>>>> in >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> future >>>>>>>>>>>>>>>>>>>>>>>>>>>> ? >>>>>>>>>>>>>>>>>>>>>>>> Of >>>>>>>>>>>>>>>>>>>>>>>>>>>> course, I also hope that we can do >>> more >>>>>>>> perfection >>>>>>>>>> in >>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal >>>>>>>>>>>>>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> >>>>>>>> 于2018年11月7日周三 >>>>>>>>>>>>> 下午11:45写道: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that the key question you >>>> raised >>>>> is >>>>>>> if >>>>>>>> we >>>>>>>>>>> allow >>>>>>>>>>>>>> extra >>>>>>>>>>>>>>>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the methods >> map/flatMap/agg/flatAgg. >>> I >>>>> can >>>>>>> see >>>>>>>>> why >>>>>>>>>>>>> allowing >>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>>>>>>>>>>> appear >>>>>>>>>>>>>>>>>>>>>>>>>>>>> more convenient in some cases. >>> However, >>>>> it >>>>>>>> might >>>>>>>>>> also >>>>>>>>>>>>> cause >>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>>> confusions >>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we do that. For example, do we >>> allow >>>>>>>> multiple >>>>>>>>>> UDFs >>>>>>>>>>>> in >>>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>>>>>>>>>>> expressions? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we do, the semantics may be >> weird >>> to >>>>>>> define, >>>>>>>>>> e.g. >>>>>>>>>>>> what >>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> table.groupBy('k).flatAgg(TableAggA('a), >>>>>>>>>>> TableAggB('b)) >>>>>>>>>>>>>> mean? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even >>>>>>>>>>>>>>>>>>>>>>>>>>> though >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not allowing it may appear less >>>> powerful, >>>>>> but >>>>>>>> it >>>>>>>>>> can >>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>> things >>>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>>>>> intuitive too. In the case of >>>>> agg/flatAgg, >>>>>> we >>>>>>>> can >>>>>>>>>>>> define >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> keys >>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implied in the result table and >>> appears >>>>> at >>>>>>> the >>>>>>>>>>>> beginning. >>>>>>>>>>>>>> You >>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>> use a >>>>>>>>>>>>>>>>>>>>>>>>>>>>> select method if you want to modify >>>> this >>>>>>>>> behavior. >>>>>>>>>> I >>>>>>>>>>>>> think >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>> eventually >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we will have some API which allows >>>> other >>>>>>>>>> expressions >>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters, but I think it's better >>> to >>>> do >>>>>>> that >>>>>>>>>> after >>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> introduce >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> concept of nested tables. A lot of >>>> things >>>>>> we >>>>>>>>>>> suggested >>>>>>>>>>>>> here >>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>>> considered as special cases of >> that. >>>> But >>>>>>> things >>>>>>>>> are >>>>>>>>>>>> much >>>>>>>>>>>>>>>>>>> simpler >>>>>>>>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave that to later. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM >> Fabian >>>>>> Hueske >>>>>>> < >>>>>>>>>>>>>>>>>>> fhue...@gmail.com >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re emit: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think we should start with a >> well >>>>>>> understood >>>>>>>>>>>> semantics >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> full >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replacement. This is how the other >>> agg >>>>>>>> functions >>>>>>>>>>> work. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As was said before, there are open >>>>>> questions >>>>>>>>>>> regarding >>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>> append >>>>>>>>>>>>>>>>>>>>>>>>>> mode >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (checkpointing, whether supporting >>>>>>> retractions >>>>>>>>> or >>>>>>>>>>> not >>>>>>>>>>>>> and >>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>> yes >>>>>>>>>>>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare them, ...). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Since this seems to be an >>>> optimization, >>>>>> I'd >>>>>>>>>> postpone >>>>>>>>>>>> it. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re grouping keys: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we should >>> automatically >>>>> add >>>>>>> them >>>>>>>>>>> because >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> result >>>>>>>>>>>>>>>>>>>>>>>>>>>> schema >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would not be intuitive. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would they be added at the >> beginning >>>> of >>>>>> the >>>>>>>>> tuple >>>>>>>>>> or >>>>>>>>>>>> at >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> end? >>>>>>>>>>>>>>>>>>>>>>>>>> What >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields of windows would >> be >>>>> added? >>>>>>> In >>>>>>>>>> which >>>>>>>>>>>>> order >>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, we could support syntax >>> like >>>>>> this: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val t: Table = ??? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> t >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble ... as 'w) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('a, 'b) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), >>>> 'w.end >>>>> as >>>>>>>>> 'wend, >>>>>>>>>>>>>> 'w.rowtime >>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'rtime) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The result schema would be clearly >>>>> defined >>>>>>> as >>>>>>>>> [b, >>>>>>>>>> a, >>>>>>>>>>>> f1, >>>>>>>>>>>>>> f2, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ..., >>>>>>>>>>>>>>>>>>>>>>>>>> fn, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wend, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the >>> result >>>>>>>>> attributes >>>>>>>>>> of >>>>>>>>>>>> the >>>>>>>>>>>>>>>> UDF. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re Multi-staged evaluation: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think this should be an >>> optimization >>>>>> that >>>>>>>> can >>>>>>>>> be >>>>>>>>>>>>> applied >>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> UDF >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implements the merge() method. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr >>>>> schrieb >>>>>>>>> Shaoxuan >>>>>>>>>>>> Wang >>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wshaox...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> : >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi xiaowei, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I agree with you that the >>>>> semantics >>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>> TableAggregateFunction >>>>>>>>>>>>>>>>>>>>>>>>>>>> emit >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> much more complex than >>>>> AggregateFunction. >>>>>>> The >>>>>>>>>>>>> fundamental >>>>>>>>>>>>>>>>>>>>>>>>>>> difference >>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that TableAggregateFunction >> emits a >>>>>> "table" >>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>>> AggregateFunction >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outputs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (a column of) a "row". In the >> case >>> of >>>>>>>>>>>> AggregateFunction >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mode which is “replacing” >> (complete >>>>>>> update). >>>>>>>>> But >>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableAggregateFunction, it could >> be >>>>>>>> incremental >>>>>>>>>>> (only >>>>>>>>>>>>>> emit >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> updated >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results) update or complete >> update >>>>>> (always >>>>>>>> emit >>>>>>>>>> the >>>>>>>>>>>>>> entire >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “emit" is triggered). From the >>>>>> performance >>>>>>>>>>>>> perspective, >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use incremental update. But we >> need >>>>>> review >>>>>>>> and >>>>>>>>>>> design >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>> carefully, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> especially taking into account >> the >>>>> cases >>>>>> of >>>>>>>> the >>>>>>>>>>>>> failover >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (instead >>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> back-up the ACC it may also needs >>> to >>>>>>> remember >>>>>>>>> the >>>>>>>>>>>> emit >>>>>>>>>>>>>>>>>>> offset) >>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> retractions, as the semantics of >>>>>>>>>>>> TableAggregateFunction >>>>>>>>>>>>>>>> emit >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> than other UDFs. TableFunction >> also >>>>>> emits a >>>>>>>>>> table, >>>>>>>>>>>> but >>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> worry this due to the nature of >>>>>> stateless. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shaoxuan >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM >>>> Xiaowei >>>>>>> Jiang >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <xiaow...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for adding the public >>>>>> interfaces! I >>>>>>>>> think >>>>>>>>>>>> that >>>>>>>>>>>>>>>> it's a >>>>>>>>>>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> start. There are a few points >> that >>>> we >>>>>> need >>>>>>>> to >>>>>>>>>> have >>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - TableAggregateFunction - this >>> is a >>>>>> very >>>>>>>>>> complex >>>>>>>>>>>>> beast, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> definitely >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most complex user defined >> objects >>> we >>>>>>>>> introduced >>>>>>>>>> so >>>>>>>>>>>>> far. >>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite some interesting questions >>>> here. >>>>>> For >>>>>>>>>>> example, >>>>>>>>>>>> do >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multi-staged TableAggregate in >>> this >>>>>> case? >>>>>>>> What >>>>>>>>>> is >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> semantics >>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit? Is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it amendments to the previous >>>> output, >>>>> or >>>>>>>>>> replacing >>>>>>>>>>>>> it? I >>>>>>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> subject itself is worth a >>> discussion >>>>> to >>>>>>> make >>>>>>>>>> sure >>>>>>>>>>> we >>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> details >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the >>> group >>>>> keys >>>>>>>>>>>> automatically >>>>>>>>>>>>>>>>>>>>>>>>>> appear >>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output? how about the case of >>>>> windowing >>>>>>>>>>> aggregation? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM >>>>> jincheng >>>>>>> sun >>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, Xiaowei, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss >>> of >>>>>> Table >>>>>>>> API >>>>>>>>>>>>>> Enhancement >>>>>>>>>>>>>>>>>>>>>>>>>>> Outline >>>>>>>>>>>>>>>>>>>>>>>>>>>> ! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I quickly looked at the overall >>>>>> content, >>>>>>>>> these >>>>>>>>>>> are >>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>>>>>>>>> expressions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offline discussions. But from >> the >>>>>> points >>>>>>> of >>>>>>>>> my >>>>>>>>>>>> view, >>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> usage of public interfaces that >>> we >>>>> will >>>>>>>>>> introduce >>>>>>>>>>>> in >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>> propose. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added the following usage >>>> description >>>>>> of >>>>>>>>>>> interface >>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>> operators >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> google doc: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Map Operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map operator is a new operator >> of >>>>>> Table, >>>>>>>> Map >>>>>>>>>>>> operator >>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scalar function, and can return >>>>>>>> multi-column. >>>>>>>>>> The >>>>>>>>>>>>> usage >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .map(fun: >> ScalarFunction).as(‘a, >>>> ‘b, >>>>>> ‘c) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. FlatMap Operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FaltMap operator is a new >>> operator >>>> of >>>>>>>> Table, >>>>>>>>>>>> FlatMap >>>>>>>>>>>>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a table function, and can >> return >>>>>>> multi-row. >>>>>>>>> The >>>>>>>>>>>> usage >>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatMap(fun: >>>> TableFunction).as(‘a, >>>>>> ‘b, >>>>>>>> ‘c) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Agg Operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Agg operator is a new operator >> of >>>>>>>>>>>> Table/GroupedTable, >>>>>>>>>>>>>>>> Agg >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply a aggregate function, and >>> can >>>>>>> return >>>>>>>>>>>>>> multi-column. >>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>>>>>> usage >>>>>>>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> follows: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave >>>>> groupBy-Clause >>>>>>> out >>>>>>>> to >>>>>>>>>>>> define >>>>>>>>>>>>>>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .agg(fun: >>>> AggregateFunction).as(‘a, >>>>>> ‘b, >>>>>>>> ‘c) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. FlatAgg Operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FlatAgg operator is a new >>> operator >>>> of >>>>>>>>>>>>>>>> Table/GroupedTable, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> FaltAgg >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator can apply a table >>>> aggregate >>>>>>>>> function, >>>>>>>>>>> and >>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multi-row. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The usage as follows: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave >>>>> groupBy-Clause >>>>>>> out >>>>>>>>> to >>>>>>>>>>>> define >>>>>>>>>>>>>>>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatAgg(fun: >>>>>>>>> TableAggregateFunction).as(‘a, >>>>>>>>>>> ‘b, >>>>>>>>>>>>> ‘c) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The behavior of table >> aggregates >>>> is >>>>>> most >>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> GroupReduceFunction >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> did, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which computed for a group of >>>>> elements, >>>>>>> and >>>>>>>>>>>> output a >>>>>>>>>>>>>>>> group >>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elements. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The TableAggregateFunction can >> be >>>>>> applied >>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>>>>>> GroupedTable.flatAgg() . >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface of >>> TableAggregateFunction >>>>>> has a >>>>>>>> lot >>>>>>>>>> of >>>>>>>>>>>>>> content, >>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> copy >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it here, Please look at the >>> detail >>>> in >>>>>>>> google >>>>>>>>>> doc: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will be very appreciate to >>> anyone >>>>> for >>>>>>>>>> reviewing >>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>> commenting. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> ----------------------------------------------------------------------------------- >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> *Rome was not built in one day* >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> ----------------------------------------------------------------------------------- >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>