Hi, OK, not supporting select('*) in the first version, sounds like a good first step, +1 for this.
However, I don't think that select('*) returning only the result columns of the agg function would be a significant break in semantics. Since aggregate()/flatAggregate() is the last command and (visibly) only forwards the result columns of the agg function, returning those for '* seems fine with me. The grouping columns (+window properties) are "side-passed" from the groupBy. So, IMO, both select('*) and select('k1, 'k2, 'w.rowtime, '*) have well defined semantics. But we can add these shortcuts later. Best, Fabian Am Mi., 28. Nov. 2018 um 11:13 Uhr schrieb jincheng sun < sunjincheng...@gmail.com>: > Hi Fabian, > > Thank you for listing the detailed example of forcing the use of select. > > If I didn't make it clear before, I would like to share my thoughts about > the group keys here: > > 1. agg/flatagg(Expression) keeps a single Expression; > 2. The way to force users to use select is as follows(As you mentioned): > > val tabX2 = tab.window(Tumble ... as 'w) > > .groupBy('w, 'k1, 'k2) > > .flatAgg(tableAgg('a)) > > .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) > > > 3. IMO. we do not support select "*" in window group table at the first > version. > But i am fine if you want support the shout cut,i.e.: > `select('w.rowtime, 'k1, 'k2, '*)` , which meant "*" does not include group > keys. > Just a little bit worried that "*" represents all columns in a > non-grouped table, and groupedTable does not represent all columns. > The semantics of “*” are not uniform in this case. > > What do you think? > > Thanks, > Jincheng > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午7:27写道: > > > I don't think we came to a conclusion yet. > > Are you suggesting that 'w would disappear if we do the following: > > > > val tabX = tab.window(Tumble ... as 'w) > > .groupBy('w, 'k1, 'k2) > > .flatAgg(tableAgg('a)) > > .select('*) > > > > Given that tableAgg returns [col1, col2], what would the schema of tabX > be? > > > > 1) [col1, col2]: fine with me > > 2) [k1, k2, col1, col2]: I'm very skeptical about this option because it > is > > IMO inconsistent. Users will ask, where did 'w go. > > 2) [w, k1, k2, col1, col2]: back to our original problem. What's the data > > type of 'w? > > > > If we go for option 1, we can still allow to select keys. > > > > val tabX2 = tab.window(Tumble ... as 'w) > > .groupBy('w, 'k1, 'k2) > > .flatAgg(tableAgg('a)) > > .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) > > > > The good thing about enforcing select is that the result of flatAgg is > not > > a Table, i.e., the definition of the operation is not completed yet. > > On the other hand, we would need to list all result attributes of the > table > > function if we want to select keys. We could to that with a short cut: > > > > val tabX3 = tab.window(Tumble ... as 'w) > > .groupBy('w, 'k1, 'k2) > > .flatAgg(tableAgg('a)) > > .select('w.rowtime, 'k1, 'k2, '*) > > > > > > Am Di., 27. Nov. 2018 um 10:46 Uhr schrieb jincheng sun < > > sunjincheng...@gmail.com>: > > > > > Thanks Fabian, if we enforcing select, as i said before user should > using > > > 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX' etc. In this way we > > > should not defined the type of 'w, we can keep the current way of using > > 'w. > > > I'll file the JIRA. and open the PR ASAP. > > > > > > Thanks, > > > Jincheng > > > > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午4:50写道: > > > > > > > I thought about it again. > > > > Enforcing select won't help us with the choice of how to represent > 'w. > > > > > > > > select('*) and > > > > select('a, 'b, 'w). > > > > > > > > would still be valid expressions and we need to decide how 'w is > > > > represented. > > > > As I said before, Tuple, Row, and Map have disadvantages because the > > > syntax > > > > 'w.rowtime or 'w.end would not be supported. > > > > > > > > Am Di., 27. Nov. 2018 um 01:05 Uhr schrieb jincheng sun < > > > > sunjincheng...@gmail.com>: > > > > > > > > > Before we have a good support for nest-table, may be forcing the > use > > of > > > > > select is good way, at least not causing compatibility issues. > > > > > > > > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月26日周一 下午6:48写道: > > > > > > > > > > > I think the question is what is the data type of 'w. > > > > > > > > > > > > Until now, I assumed it would be a nested tuple (Row or Tuple). > > > > > > Accessing nested fields in Row, Tuple or Pojo is done with get, > > i.e., > > > > > > 'w.get("rowtime"). > > > > > > Using a Map would not help because the access would be 'w.at > > > > ("rowtime"). > > > > > > > > > > > > We can of course also enforce the select() if aggregate / > > > flatAggregate > > > > > do > > > > > > not return a Table but some kind of AggregatedTable that does not > > > > provide > > > > > > any other function than select(). > > > > > > > > > > > > Am Mo., 26. Nov. 2018 um 01:12 Uhr schrieb jincheng sun < > > > > > > sunjincheng...@gmail.com>: > > > > > > > > > > > > > Yes,I agree the problem is needs attention. IMO. It depends on > > how > > > we > > > > > > > define the ‘w type. The way you above defines the 'w type as a > > > tuple. > > > > > If > > > > > > > you serialize 'w to a Map, the compatibility will be better. > Even > > > > more > > > > > we > > > > > > > can define ‘w as a special type. UDF and Sink can't be used > > > directly. > > > > > > Must > > > > > > > use 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX', and I > > will > > > be > > > > > > very > > > > > > > grateful if you can share your solution to this problem, and > we > > > also > > > > > can > > > > > > > discuss it carefully in the PR to be opened. What to you think? > > > > > > > > > > > > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午6:21写道: > > > > > > > > > > > > > > > Something like: > > > > > > > > > > > > > > > > val x = tab.window(Tumble ... as 'w) > > > > > > > > .groupBy('w, 'k1, 'k2) > > > > > > > > .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > > > > > > > > > > > > > > > > x.insertInto("sinkTable") // fails because result schema has > > > > changed > > > > > > from > > > > > > > > ((start, end, rowtime), k1, k2, col1, col2) to ((start, end, > > > > rowtime, > > > > > > > > newProperty), k1, k2, col1, col2) > > > > > > > > x.select(myUdf('w)) // fails because UDF expects (start, end, > > > > > rowtime) > > > > > > > and > > > > > > > > not (start, end, rowtime, newProperty) > > > > > > > > > > > > > > > > Basically, every time when the composite type 'w is used as a > > > > whole. > > > > > > > > > > > > > > > > > > > > > > > > Am Fr., 23. Nov. 2018 um 10:45 Uhr schrieb jincheng sun < > > > > > > > > sunjincheng...@gmail.com>: > > > > > > > > > > > > > > > > > Hi Fabian, > > > > > > > > > > > > > > > > > > I don't fully understand the question you mentioned: > > > > > > > > > > > > > > > > > > Any query that relies on the composite type with three > fields > > > > will > > > > > > fail > > > > > > > > > > > > > > > > > > after adding a forth field. > > > > > > > > > > > > > > > > > > > > > > > > > > > I am appreciate if you can give some detail examples ? > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > JIncheng > > > > > > > > > > > > > > > > > > > > > > > > > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午4:41写道: > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > My concerns are about the case when there is no > additional > > > > > select() > > > > > > > > > method, > > > > > > > > > > i.e., > > > > > > > > > > > > > > > > > > > > tab.window(Tumble ... as 'w) > > > > > > > > > > .groupBy('w, 'k1, 'k2) > > > > > > > > > > .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > > > > > > > > > > > > > > > > > > > > In this case, 'w is a composite field consisting of three > > > > fields > > > > > > > (end, > > > > > > > > > > start, rowtime). > > > > > > > > > > Once we add a new property, it would need to be added to > > the > > > > > > > composite > > > > > > > > > > type. > > > > > > > > > > Any query that relies on the composite type with three > > fields > > > > > will > > > > > > > fail > > > > > > > > > > after adding a forth field. > > > > > > > > > > > > > > > > > > > > Best, Fabian > > > > > > > > > > > > > > > > > > > > Am Fr., 23. Nov. 2018 um 02:01 Uhr schrieb jincheng sun < > > > > > > > > > > sunjincheng...@gmail.com>: > > > > > > > > > > > > > > > > > > > > > Thanks Fabian, > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for your feedback, and very important and > > > > > necessary > > > > > > > > design > > > > > > > > > > > reminders! > > > > > > > > > > > > > > > > > > > > > > Yes, your are right! Spark is the specified grouping > > > columns > > > > > > > > displayed > > > > > > > > > > > before 1.3, but the grouping columns are implicitly > > passed > > > in > > > > > > > > spark1.4 > > > > > > > > > > and > > > > > > > > > > > later. The reason for changing this behavior is that > due > > to > > > > the > > > > > > > user > > > > > > > > > > > feedback. Although implicit delivery will have the > > > drawbacks > > > > > you > > > > > > > > > > mentioned, > > > > > > > > > > > this approach is really convenient for the user. > > > > > > > > > > > I agree that grouping on windows we have to pay > attention > > > to > > > > > the > > > > > > > > > handling > > > > > > > > > > > of the window's properties, because we may introduce > new > > > > window > > > > > > > > > property. > > > > > > > > > > > So, from the points of view, We delay the processing of > > the > > > > > > window > > > > > > > > > > > property, ie: we pass the complex type 'w on the > > tableAPI, > > > > and > > > > > > > > provide > > > > > > > > > > > different property method operations in the SELECT > > > according > > > > to > > > > > > the > > > > > > > > > type > > > > > > > > > > of > > > > > > > > > > > 'w, such as: 'w.start, 'w.end, 'w.xxx , in the TableAPI > > > will > > > > > > limit > > > > > > > > and > > > > > > > > > > > verify the attribute operations that 'w has. An example > > is > > > as > > > > > > > > follows: > > > > > > > > > > > > > > > > > > > > > > tab.window(Tumble ... as 'w) > > > > > > > > > > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > > > > > > > > > > .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, > 'col2) > > > // > > > > 'w > > > > > > is > > > > > > > > > > > composite field > > > > > > > > > > > .select('k1, 'col1, 'w.rowtime as 'ts, 'w.xxx as > 'xx) > > > // > > > > In > > > > > > > > select > > > > > > > > > we > > > > > > > > > > > will limit and verify that ’w.xx is allowed > > > > > > > > > > > > > > > > > > > > > > I am not sure if I fully understand your concerns, if > > there > > > > any > > > > > > > > > > understand > > > > > > > > > > > are mistakes, please correct me. Any feedback is > > > appreciate! > > > > > > > > > > > > > > > > > > > > > > Bests, > > > > > > > > > > > Jincheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月22日周四 > > > 下午10:13写道: > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > First of all, it is correct that the > > flatMap(Expression*) > > > > and > > > > > > > > > > > > flatAggregate(Expression*) methods would mix scalar > and > > > > table > > > > > > > > values. > > > > > > > > > > > > This would be a new concept that is not present in > the > > > > > current > > > > > > > API. > > > > > > > > > > > > From my point of view, the semantics are quite clear, > > > but I > > > > > > > > > understand > > > > > > > > > > > that > > > > > > > > > > > > others are more careful and worry about future > > > extensions. > > > > > > > > > > > > > > > > > > > > > > > > I am fine with going for single expression arguments > > for > > > > > map() > > > > > > > and > > > > > > > > > > > > flatMap(). We can later expand them to Expression* if > > we > > > > feel > > > > > > the > > > > > > > > > need > > > > > > > > > > > and > > > > > > > > > > > > are more comfortable about the implications. > > > > > > > > > > > > Whenever, a time attribute needs to be forwarded, > users > > > can > > > > > > fall > > > > > > > > back > > > > > > > > > > to > > > > > > > > > > > > join(TableFunction) as Xiaowei mentioned. > > > > > > > > > > > > So we restrict the usability of the new methods but > > don't > > > > > lose > > > > > > > > > > > > functionality and don't prevent future extensions. > > > > > > > > > > > > > > > > > > > > > > > > The aggregate() and flatAggregate() case is more > > > difficult > > > > > > > because > > > > > > > > > > > implicit > > > > > > > > > > > > forwarding of grouping fields cannot be changed later > > > > without > > > > > > > > > breaking > > > > > > > > > > > the > > > > > > > > > > > > API. > > > > > > > > > > > > There are other APIs (e.g., Spark) that also > implicitly > > > > > forward > > > > > > > the > > > > > > > > > > > > grouping columns. So this is not uncommon. > > > > > > > > > > > > However, I personally don't like that approach, > because > > > it > > > > is > > > > > > > > > implicit > > > > > > > > > > > and > > > > > > > > > > > > introduces a new behavior that is not present in the > > > > current > > > > > > API. > > > > > > > > > > > > > > > > > > > > > > > > One thing to consider here is the handling of > grouping > > on > > > > > > > windows. > > > > > > > > > > > > If I understood Xiaowei correctly, a composite field > > that > > > > is > > > > > > > named > > > > > > > > > like > > > > > > > > > > > the > > > > > > > > > > > > window alias (e.g., 'w) would be implicitly added to > > the > > > > > result > > > > > > > of > > > > > > > > > > > > aggregate() or flatAggregate(). > > > > > > > > > > > > The composite field would have fields like (start, > end, > > > > > > rowtime) > > > > > > > or > > > > > > > > > > > (start, > > > > > > > > > > > > end, proctime) depending on the window type. > > > > > > > > > > > > If we would ever introduce a fourth window property, > we > > > > might > > > > > > > break > > > > > > > > > > > > existing queries. > > > > > > > > > > > > Is this something that we should worry about? > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Fabian > > > > > > > > > > > > > > > > > > > > > > > > Am Do., 22. Nov. 2018 um 14:03 Uhr schrieb Piotr > > > Nowojski < > > > > > > > > > > > > pi...@data-artisans.com>: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jincheng, > > > > > > > > > > > > > > > > > > > > > > > > > > #1) ok, got it. > > > > > > > > > > > > > > > > > > > > > > > > > > #3) > > > > > > > > > > > > > > From points of my view I we can using > > > > > > > > > > > > > > `Expression`, and after the discussion decided to > > use > > > > > > > > > Expression*, > > > > > > > > > > > then > > > > > > > > > > > > > > improve it. In any case, we can use Expression, > and > > > > there > > > > > > is > > > > > > > an > > > > > > > > > > > > > opportunity > > > > > > > > > > > > > > to become Expression* (compatibility). If we use > > > > > > Expression* > > > > > > > > > > > directly, > > > > > > > > > > > > it > > > > > > > > > > > > > > is difficult for us to become Expression, which > > will > > > > > break > > > > > > > the > > > > > > > > > > > > > > compatibility between versions. What do you > think? > > > > > > > > > > > > > > > > > > > > > > > > > > I don’t think that’s the case here. If we start > with > > > > single > > > > > > > param > > > > > > > > > > > > > `flatMap(Expression)`, it will need implicit > columns > > to > > > > be > > > > > > > > present > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > result, which: > > > > > > > > > > > > > > > > > > > > > > > > > > a) IMO it brakes SQL convention (that’s why I’m > > against > > > > > this) > > > > > > > > > > > > > b) we can not later easily introduce > > > > `flatMap(Expression*)` > > > > > > > > without > > > > > > > > > > > those > > > > > > > > > > > > > implicit columns, without braking the compatibility > > or > > > at > > > > > > least > > > > > > > > > > without > > > > > > > > > > > > > making `flatMap(Expression*)` and > > `flatMap(Expression)` > > > > > > > terribly > > > > > > > > > > > > > inconsistent. > > > > > > > > > > > > > > > > > > > > > > > > > > To elaborate on (a). It’s not nice if our own API > is > > > > > > > inconsistent > > > > > > > > > and > > > > > > > > > > > it > > > > > > > > > > > > > sometimes behaves one way and sometimes another > way: > > > > > > > > > > > > > > > > > > > > > > > > > > > table.groupBy(‘k).select(scalarAggregateFunction(‘v)) > > > => > > > > > > single > > > > > > > > > > column > > > > > > > > > > > > > result, just the output of > `scalarAggregateFunction` > > > > > > > > > > > > > vs > > > > > > > > > > > > > > > > > table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v)) > > > > > > => > > > > > > > > both > > > > > > > > > > > > result > > > > > > > > > > > > > of `tableAggregateFunction` plus key (and an > optional > > > > > window > > > > > > > > > context > > > > > > > > > > ?) > > > > > > > > > > > > > > > > > > > > > > > > > > Thus I think we have to now decide which way we > want > > to > > > > > jump, > > > > > > > > since > > > > > > > > > > > later > > > > > > > > > > > > > will be too late. Or again, am I missing something? > > :) > > > > > > > > > > > > > > > > > > > > > > > > > > Piotrek > > > > > > > > > > > > > > > > > > > > > > > > > > > On 22 Nov 2018, at 02:07, jincheng sun < > > > > > > > > sunjincheng...@gmail.com > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Piotrek, > > > > > > > > > > > > > > #1)We have unbounded and bounded group window > > > > aggregate, > > > > > > for > > > > > > > > > > > unbounded > > > > > > > > > > > > > case > > > > > > > > > > > > > > we should early fire the result with retract > > message, > > > > we > > > > > > can > > > > > > > > not > > > > > > > > > > > using > > > > > > > > > > > > > > watermark, because unbounded aggregate never > > > finished. > > > > > (for > > > > > > > > > > > improvement > > > > > > > > > > > > > we > > > > > > > > > > > > > > can introduce micro-batch in feature), for > bounded > > > > > window > > > > > > we > > > > > > > > > never > > > > > > > > > > > > > support > > > > > > > > > > > > > > early fire, so we do not need retract. > > > > > > > > > > > > > > #3) About validation of > > > `table.select(F(‘a).unnest(), > > > > > ‘b, > > > > > > > > > > > > > > G(‘c).unnest())/table.flatMap(F(‘a), ‘b, > > > scalarG(‘c))` > > > > > > Fabian > > > > > > > > had > > > > > > > > > > > > > mentioned > > > > > > > > > > > > > > above, please look at the prior mail. For > > > > > > > > `table.flatMap(F(‘a), > > > > > > > > > > ‘b, > > > > > > > > > > > > > > scalarG(‘c))` that we concerned, i.e.: we should > > > > discuss > > > > > > the > > > > > > > > > issue > > > > > > > > > > > of > > > > > > > > > > > > > > `Expression*` vs `Expression`. From points of my > > > view I > > > > > we > > > > > > > can > > > > > > > > > > using > > > > > > > > > > > > > > `Expression`, and after the discussion decided to > > use > > > > > > > > > Expression*, > > > > > > > > > > > then > > > > > > > > > > > > > > improve it. In any case, we can use Expression, > and > > > > there > > > > > > is > > > > > > > an > > > > > > > > > > > > > opportunity > > > > > > > > > > > > > > to become Expression* (compatibility). If we use > > > > > > Expression* > > > > > > > > > > > directly, > > > > > > > > > > > > it > > > > > > > > > > > > > > is difficult for us to become Expression, which > > will > > > > > break > > > > > > > the > > > > > > > > > > > > > > compatibility between versions. What do you > think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > If there anything not clearly, welcome any > > > > > > > > feedback!Agains,thanks > > > > > > > > > > for > > > > > > > > > > > > > share > > > > > > > > > > > > > > your thoughts! > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jincheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > Piotr Nowojski <pi...@data-artisans.com> > > > > 于2018年11月21日周三 > > > > > > > > > 下午9:37写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hi Jincheng, > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> #1) No,watermark solves the issue of the late > > > event. > > > > > > Here, > > > > > > > > the > > > > > > > > > > > > > >> performance > > > > > > > > > > > > > >>> problem is caused by the update emit mode. > i.e.: > > > When > > > > > > > current > > > > > > > > > > > > > calculation > > > > > > > > > > > > > >>> result is output, the previous calculation > result > > > > needs > > > > > > to > > > > > > > be > > > > > > > > > > > > > retracted. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Hmm, yes I missed this. For time-windowed cases > > > (some > > > > > > > > > > > > > >> aggregate/flatAggregate cases) emitting only on > > > > > watermark > > > > > > > > should > > > > > > > > > > > solve > > > > > > > > > > > > > the > > > > > > > > > > > > > >> problem. For non time windowed cases it would > > reduce > > > > the > > > > > > > > amount > > > > > > > > > of > > > > > > > > > > > > > >> retractions, right? Or am I still missing > > something? > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> #3)I still hope to keep the simplicity that > > select > > > > only > > > > > > > > support > > > > > > > > > > > > > projected > > > > > > > > > > > > > >>> scalar, we can hardly tell the semantics of > > > > > > > > > > tab.select(flatmap('a), > > > > > > > > > > > > 'b, > > > > > > > > > > > > > >>> flatmap('d)). > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest()) > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Could be rejected during some validation phase. > On > > > the > > > > > > other > > > > > > > > > hand: > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> table.select(F(‘a).unnest(), ‘b, scalarG(‘c)) > > > > > > > > > > > > > >> or > > > > > > > > > > > > > >> table.flatMap(F(‘a), ‘b, scalarG(‘c)) > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Could work and be more or less a syntax sugar > for > > > > cross > > > > > > > apply. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Piotrek > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> On 21 Nov 2018, at 12:16, jincheng sun < > > > > > > > > > sunjincheng...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Hi shaoxuan & Hequn, > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Thanks for your suggestion,I'll file the JIRAs > > > later. > > > > > > > > > > > > > >>> We can prepare PRs while continuing to move > > forward > > > > the > > > > > > > > ongoing > > > > > > > > > > > > > >> discussion. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Regards, > > > > > > > > > > > > > >>> Jincheng > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> jincheng sun <sunjincheng...@gmail.com> > > > > 于2018年11月21日周三 > > > > > > > > > 下午7:07写道: > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>>> Hi Piotrek, > > > > > > > > > > > > > >>>> Thanks for your feedback, and thanks for > share > > > your > > > > > > > > thoughts! > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> #1) No,watermark solves the issue of the late > > > event. > > > > > > Here, > > > > > > > > the > > > > > > > > > > > > > >> performance > > > > > > > > > > > > > >>>> problem is caused by the update emit mode. > i.e.: > > > > When > > > > > > > > current > > > > > > > > > > > > > >> calculation > > > > > > > > > > > > > >>>> result is output, the previous calculation > > result > > > > > needs > > > > > > to > > > > > > > > be > > > > > > > > > > > > > retracted. > > > > > > > > > > > > > >>>> #2) As I mentioned above we should continue > the > > > > > > discussion > > > > > > > > > until > > > > > > > > > > > we > > > > > > > > > > > > > >> solve > > > > > > > > > > > > > >>>> the problems raised by Xiaowei and Fabian. > > > > > > > > > > > > > >>>> #3)I still hope to keep the simplicity that > > select > > > > > only > > > > > > > > > support > > > > > > > > > > > > > >> projected > > > > > > > > > > > > > >>>> scalar, we can hardly tell the semantics of > > > > > > > > > > > tab.select(flatmap('a), > > > > > > > > > > > > > 'b, > > > > > > > > > > > > > >>>> flatmap('d)). > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> Thanks, > > > > > > > > > > > > > >>>> Jincheng > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> Piotr Nowojski <pi...@data-artisans.com> > > > > > 于2018年11月21日周三 > > > > > > > > > > 下午5:24写道: > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>>> Hi, > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> 1. > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>>> In fact, in addition to the design of APIs, > > > there > > > > > will > > > > > > > be > > > > > > > > > > > various > > > > > > > > > > > > > >>>>>> performance optimization details, such as: > > table > > > > > > > Aggregate > > > > > > > > > > > > function > > > > > > > > > > > > > >>>>>> emitValue will generate multiple calculation > > > > > results, > > > > > > in > > > > > > > > > > extreme > > > > > > > > > > > > > >> cases, > > > > > > > > > > > > > >>>>>> each record will trigger a large number of > > > retract > > > > > > > > messages, > > > > > > > > > > > this > > > > > > > > > > > > > will > > > > > > > > > > > > > >>>>> have > > > > > > > > > > > > > >>>>>> poor performance > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> Can this be solved/mitigated by emitting the > > > > results > > > > > > only > > > > > > > > on > > > > > > > > > > > > > >> watermarks? > > > > > > > > > > > > > >>>>> I think that was the path that we decided to > > take > > > > > both > > > > > > > for > > > > > > > > > > > Temporal > > > > > > > > > > > > > >> Joins > > > > > > > > > > > > > >>>>> and upsert stream conversion. I know that > this > > > > > > increases > > > > > > > > the > > > > > > > > > > > > latency > > > > > > > > > > > > > >> and > > > > > > > > > > > > > >>>>> there is a place for a future global > > setting/user > > > > > > > > preference > > > > > > > > > > > “emit > > > > > > > > > > > > > the > > > > > > > > > > > > > >> data > > > > > > > > > > > > > >>>>> ASAP mode”, but emitting only on watermarks > > seems > > > > to > > > > > me > > > > > > > as > > > > > > > > a > > > > > > > > > > > > > >> better/more > > > > > > > > > > > > > >>>>> sane default. > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> 2. > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> With respect to the API discussion and > implicit > > > > > > columns. > > > > > > > > The > > > > > > > > > > > > problem > > > > > > > > > > > > > >> for > > > > > > > > > > > > > >>>>> me so far is I’m not sure if I like the > > > > additionally > > > > > > > > > complexity > > > > > > > > > > > of > > > > > > > > > > > > > >>>>> `append()` solution, while implicit columns > are > > > > > > > definitely > > > > > > > > > not > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > >>>>> spirit of SQL. Neither joins nor aggregations > > add > > > > > extra > > > > > > > > > > > unexpected > > > > > > > > > > > > > >> columns > > > > > > > > > > > > > >>>>> to the result without asking. This definitely > > can > > > > be > > > > > > > > > confusing > > > > > > > > > > > for > > > > > > > > > > > > > the > > > > > > > > > > > > > >>>>> users since it brakes the convention. Thus I > > > would > > > > > lean > > > > > > > > > towards > > > > > > > > > > > > > >> Fabian’s > > > > > > > > > > > > > >>>>> proposal of multi-argument `map(Expression*)` > > > from > > > > > > those > > > > > > > 3 > > > > > > > > > > > options. > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> 3. > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> Another topic is that I’m not 100% convinced > > that > > > > we > > > > > > > should > > > > > > > > > be > > > > > > > > > > > > adding > > > > > > > > > > > > > >> new > > > > > > > > > > > > > >>>>> api functions for `map`,`aggregate`,`flatMap` > > and > > > > > > > > > > > `flatAggregate`. > > > > > > > > > > > > I > > > > > > > > > > > > > >> think > > > > > > > > > > > > > >>>>> the same could be achieved by changing > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> table.map(F('x)) > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> into > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> table.select(F('x)).unnest() > > > > > > > > > > > > > >>>>> or > > > > > > > > > > > > > >>>>> table.select(F('x).unnest()) > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> Where `unnest()` means unnest row/tuple type > > > into a > > > > > > > > columnar > > > > > > > > > > > table. > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> table.flatMap(F('x)) > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> Could be on the other hand also handled by > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> table.select(F('x)) > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> By correctly deducing that F(x) is a multi > row > > > > output > > > > > > > > > function > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> Same might apply to `aggregate(F('x))`, but > > this > > > > > maybe > > > > > > > > could > > > > > > > > > be > > > > > > > > > > > > > >> replaced > > > > > > > > > > > > > >>>>> by: > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> table.groupBy(…).select(F('x).unnest()) > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> Adding scalar functions should also be > > possible: > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> table.groupBy('k).select(F('x).unnest(), ‘k) > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> Maybe such approach would allow us to > implement > > > the > > > > > > same > > > > > > > > > > features > > > > > > > > > > > > in > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >>>>> SQL as well? > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>> Piotrek > > > > > > > > > > > > > >>>>> > > > > > > > > > > > > > >>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng < > > > > > > > > chenghe...@gmail.com > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >>>>>> > > > > > > > > > > > > > >>>>>> Hi, > > > > > > > > > > > > > >>>>>> > > > > > > > > > > > > > >>>>>> Thank you all for the great proposal and > > > > discussion! > > > > > > > > > > > > > >>>>>> I also prefer to move on to the next step, > so > > +1 > > > > for > > > > > > > > opening > > > > > > > > > > the > > > > > > > > > > > > > JIRAs > > > > > > > > > > > > > >>>>> to > > > > > > > > > > > > > >>>>>> start the work. > > > > > > > > > > > > > >>>>>> We can have more detailed discussion there. > > Btw, > > > > we > > > > > > can > > > > > > > > > start > > > > > > > > > > > with > > > > > > > > > > > > > >> JIRAs > > > > > > > > > > > > > >>>>>> which we have agreed on. > > > > > > > > > > > > > >>>>>> > > > > > > > > > > > > > >>>>>> Best, > > > > > > > > > > > > > >>>>>> Hequn > > > > > > > > > > > > > >>>>>> > > > > > > > > > > > > > >>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan > > Wang < > > > > > > > > > > > > wshaox...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > >>>>> wrote: > > > > > > > > > > > > > >>>>>> > > > > > > > > > > > > > >>>>>>> +1. I agree that we should open the JIRAs > to > > > > start > > > > > > the > > > > > > > > > work. > > > > > > > > > > We > > > > > > > > > > > > may > > > > > > > > > > > > > >>>>>>> have better ideas on the flavor of the > > > interface > > > > > when > > > > > > > > > > > > > >> implement/review > > > > > > > > > > > > > >>>>>>> the code. > > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > > >>>>>>> Regards, > > > > > > > > > > > > > >>>>>>> shaoxuan > > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > > >>>>>>> > > > > > > > > > > > > > >>>>>>> On 11/20/18, jincheng sun < > > > > > sunjincheng...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > >>>>>>>> Hi all, > > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > > >>>>>>>> Thanks all for the feedback. > > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > > >>>>>>>> @Piotr About not using abbreviations > naming, > > > > +1,I > > > > > > > like > > > > > > > > > > > > > >>>>>>>> your proposal!Currently both DataSet and > > > > > DataStream > > > > > > > API > > > > > > > > > are > > > > > > > > > > > > using > > > > > > > > > > > > > >>>>>>>> `aggregate`, > > > > > > > > > > > > > >>>>>>>> BTW,I find other language also not using > > > > > > abbreviations > > > > > > > > > > > > naming,such > > > > > > > > > > > > > >> as > > > > > > > > > > > > > >>>>> R. > > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > > >>>>>>>> Sometimes the interface of the API is > really > > > > > > difficult > > > > > > > > to > > > > > > > > > > > > perfect, > > > > > > > > > > > > > >> we > > > > > > > > > > > > > >>>>>>> need > > > > > > > > > > > > > >>>>>>>> to spend a lot of time thinking and > feedback > > > > from > > > > > a > > > > > > > > large > > > > > > > > > > > number > > > > > > > > > > > > > of > > > > > > > > > > > > > >>>>>>> users, > > > > > > > > > > > > > >>>>>>>> and constantly improve, but for backward > > > > > > compatibility > > > > > > > > > > issues, > > > > > > > > > > > > we > > > > > > > > > > > > > >>>>> have to > > > > > > > > > > > > > >>>>>>>> adopt the most conservative approach when > > > > > designing > > > > > > > the > > > > > > > > > > API(Of > > > > > > > > > > > > > >>>>> course, I > > > > > > > > > > > > > >>>>>>> am > > > > > > > > > > > > > >>>>>>>> more in favor of developing more rich > > > features, > > > > > when > > > > > > > we > > > > > > > > > > > discuss > > > > > > > > > > > > > >>>>> clearly). > > > > > > > > > > > > > >>>>>>>> Therefore, I propose to divide the > function > > > > > > > > implementation > > > > > > > > > > of > > > > > > > > > > > > > >>>>>>>> map/faltMap/agg/flatAgg into basic > functions > > > of > > > > > > JIRAs > > > > > > > > and > > > > > > > > > > > JIRAs > > > > > > > > > > > > > that > > > > > > > > > > > > > >>>>>>>> support time attributes and groupKeys. We > > can > > > > > > develop > > > > > > > > the > > > > > > > > > > > > features > > > > > > > > > > > > > >>>>> which > > > > > > > > > > > > > >>>>>>>> we have already agreed on the design. And > > we > > > > will > > > > > > > > > continue > > > > > > > > > > to > > > > > > > > > > > > > >> discuss > > > > > > > > > > > > > >>>>>>> the > > > > > > > > > > > > > >>>>>>>> uncertain design. > > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > > >>>>>>>> In fact, in addition to the design of > APIs, > > > > there > > > > > > will > > > > > > > > be > > > > > > > > > > > > various > > > > > > > > > > > > > >>>>>>>> performance optimization details, such as: > > > table > > > > > > > > Aggregate > > > > > > > > > > > > > function > > > > > > > > > > > > > >>>>>>>> emitValue will generate multiple > calculation > > > > > > results, > > > > > > > in > > > > > > > > > > > extreme > > > > > > > > > > > > > >>>>> cases, > > > > > > > > > > > > > >>>>>>>> each record will trigger a large number of > > > > retract > > > > > > > > > messages, > > > > > > > > > > > > this > > > > > > > > > > > > > >> will > > > > > > > > > > > > > >>>>>>> have > > > > > > > > > > > > > >>>>>>>> poor performance,so we will also optimize > > the > > > > > > > interface > > > > > > > > > > > design, > > > > > > > > > > > > > such > > > > > > > > > > > > > >>>>> as > > > > > > > > > > > > > >>>>>>>> adding the emitWithRetractValue interface > (I > > > > have > > > > > > > > updated > > > > > > > > > > the > > > > > > > > > > > > > google > > > > > > > > > > > > > >>>>> doc) > > > > > > > > > > > > > >>>>>>>> to allow the user to optionally perform > > > > > incremental > > > > > > > > > > > > calculations, > > > > > > > > > > > > > >> thus > > > > > > > > > > > > > >>>>>>>> avoiding a large number of retracts. > Details > > > > like > > > > > > this > > > > > > > > are > > > > > > > > > > > > > difficult > > > > > > > > > > > > > >>>>> to > > > > > > > > > > > > > >>>>>>>> fully discuss in the mail list, so I > > recommend > > > > > > > creating > > > > > > > > > > > > JIRAs/FLIP > > > > > > > > > > > > > >>>>> first, > > > > > > > > > > > > > >>>>>>>> we develop designs that have been agreed > > upon > > > > and > > > > > > > > continue > > > > > > > > > > to > > > > > > > > > > > > > >> discuss > > > > > > > > > > > > > >>>>>>>> non-deterministic designs! What do you > > think? > > > > > > > @Fabian & > > > > > > > > > > > Piotr & > > > > > > > > > > > > > >>>>> XiaoWei > > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > > >>>>>>>> Best, > > > > > > > > > > > > > >>>>>>>> Jincheng > > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > > >>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> > > > > 于2018年11月19日周一 > > > > > > > > > > 上午12:07写道: > > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > > >>>>>>>>> Hi Fabian & Piotr, thanks for the > feedback! > > > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > > > >>>>>>>>> I appreciate your concerns, both on > > timestamp > > > > > > > > attributes > > > > > > > > > as > > > > > > > > > > > > well > > > > > > > > > > > > > as > > > > > > > > > > > > > >>>>> on > > > > > > > > > > > > > >>>>>>>>> implicit group keys. At the same time, > I'm > > > also > > > > > > > > concerned > > > > > > > > > > > with > > > > > > > > > > > > > the > > > > > > > > > > > > > >>>>>>>>> proposed > > > > > > > > > > > > > >>>>>>>>> approach of allowing Expression* as > > > parameters, > > > > > > > > > especially > > > > > > > > > > > for > > > > > > > > > > > > > >>>>>>>>> flatMap/flatAgg. So far, we never > allowed a > > > > > scalar > > > > > > > > > > expression > > > > > > > > > > > > to > > > > > > > > > > > > > >>>>> appear > > > > > > > > > > > > > >>>>>>>>> together with table expressions. With the > > > > > > Expression* > > > > > > > > > > > approach, > > > > > > > > > > > > > >> this > > > > > > > > > > > > > >>>>>>> will > > > > > > > > > > > > > >>>>>>>>> happen for the parameters to > > flatMap/flatAgg. > > > > > I'm a > > > > > > > bit > > > > > > > > > > > > concerned > > > > > > > > > > > > > >> on > > > > > > > > > > > > > >>>>> if > > > > > > > > > > > > > >>>>>>>>> we > > > > > > > > > > > > > >>>>>>>>> fully understand the consequences when we > > try > > > > to > > > > > > > extend > > > > > > > > > our > > > > > > > > > > > > > system > > > > > > > > > > > > > >> in > > > > > > > > > > > > > >>>>>>> the > > > > > > > > > > > > > >>>>>>>>> future. I would be extra cautious in > doing > > > > this. > > > > > To > > > > > > > > avoid > > > > > > > > > > > > this, I > > > > > > > > > > > > > >>>>> think > > > > > > > > > > > > > >>>>>>>>> an > > > > > > > > > > > > > >>>>>>>>> implicit group key for flatAgg is safer. > > For > > > > > > flatMap, > > > > > > > > if > > > > > > > > > > > users > > > > > > > > > > > > > want > > > > > > > > > > > > > >>>>> to > > > > > > > > > > > > > >>>>>>>>> keep > > > > > > > > > > > > > >>>>>>>>> the rowtime column, he can use > > > crossApply/join > > > > > > > instead. > > > > > > > > > So > > > > > > > > > > we > > > > > > > > > > > > are > > > > > > > > > > > > > >> not > > > > > > > > > > > > > >>>>>>>>> losing any real functionality here. > > > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > > > >>>>>>>>> Also a clarification on the following > > > example: > > > > > > > > > > > > > >>>>>>>>> tab.window(Tumble ... as 'w) > > > > > > > > > > > > > >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a > > > group > > > > > > key. > > > > > > > > > > > > > >>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, > > > 'col1, > > > > > > > 'col2) > > > > > > > > > > > > > >>>>>>>>> .select('k1, 'col1, 'w.rowtime as > 'rtime) > > > > > > > > > > > > > >>>>>>>>> If we did not have the select clause in > > this > > > > > > example, > > > > > > > > we > > > > > > > > > > will > > > > > > > > > > > > > have > > > > > > > > > > > > > >>>>> 'w as > > > > > > > > > > > > > >>>>>>>>> a > > > > > > > > > > > > > >>>>>>>>> regular column in the output. It should > not > > > > > > magically > > > > > > > > > > > > disappear. > > > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > > > >>>>>>>>> The concern is not as strong for > > > > > > Table.map/Table.agg > > > > > > > > > > because > > > > > > > > > > > we > > > > > > > > > > > > > are > > > > > > > > > > > > > >>>>> not > > > > > > > > > > > > > >>>>>>>>> mixing scalar and table expressions. But > we > > > > also > > > > > > want > > > > > > > > to > > > > > > > > > > be a > > > > > > > > > > > > bit > > > > > > > > > > > > > >>>>>>>>> consistent with these methods. If we used > > > > > implicit > > > > > > > > group > > > > > > > > > > keys > > > > > > > > > > > > for > > > > > > > > > > > > > >>>>>>>>> Table.flatAgg, we probably should do the > > same > > > > for > > > > > > > > > > Table.agg. > > > > > > > > > > > > Now > > > > > > > > > > > > > we > > > > > > > > > > > > > >>>>> only > > > > > > > > > > > > > >>>>>>>>> have to choose what to do with > Table.map. I > > > can > > > > > see > > > > > > > > good > > > > > > > > > > > > > arguments > > > > > > > > > > > > > >>>>> from > > > > > > > > > > > > > >>>>>>>>> both sides. But starting with a single > > > > Expression > > > > > > > seems > > > > > > > > > > safer > > > > > > > > > > > > > >> because > > > > > > > > > > > > > >>>>>>>>> that > > > > > > > > > > > > > >>>>>>>>> we can always extend to Expression* in > the > > > > > future. > > > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > > > >>>>>>>>> While thinking about this problem, it > > appears > > > > > that > > > > > > we > > > > > > > > may > > > > > > > > > > > need > > > > > > > > > > > > > more > > > > > > > > > > > > > >>>>> work > > > > > > > > > > > > > >>>>>>>>> in > > > > > > > > > > > > > >>>>>>>>> our handling of watermarks for SQL/Table > > API. > > > > Our > > > > > > > > current > > > > > > > > > > way > > > > > > > > > > > > of > > > > > > > > > > > > > >>>>>>>>> propagating the watermarks from source > all > > > the > > > > > way > > > > > > to > > > > > > > > > sink > > > > > > > > > > > > might > > > > > > > > > > > > > >> not > > > > > > > > > > > > > >>>>> be > > > > > > > > > > > > > >>>>>>>>> optimal. For example, after a tumbling > > > window, > > > > > the > > > > > > > > > > watermark > > > > > > > > > > > > can > > > > > > > > > > > > > >>>>>>> actually > > > > > > > > > > > > > >>>>>>>>> be advanced to just before the expiring > of > > > next > > > > > > > > window. I > > > > > > > > > > > think > > > > > > > > > > > > > >> that > > > > > > > > > > > > > >>>>> in > > > > > > > > > > > > > >>>>>>>>> general, each operator may need to > generate > > > new > > > > > > > > > watermarks > > > > > > > > > > > > > instead > > > > > > > > > > > > > >> of > > > > > > > > > > > > > >>>>>>>>> simply propagating them. Once we accept > > that > > > > > > > watermarks > > > > > > > > > may > > > > > > > > > > > > > change > > > > > > > > > > > > > >>>>>>> during > > > > > > > > > > > > > >>>>>>>>> the execution, it appears that the > > timestamp > > > > > > columns > > > > > > > > may > > > > > > > > > > also > > > > > > > > > > > > > >>>>> change, as > > > > > > > > > > > > > >>>>>>>>> long as we have some way to associate > > > watermark > > > > > > with > > > > > > > > it. > > > > > > > > > My > > > > > > > > > > > > > >>>>> intuition is > > > > > > > > > > > > > >>>>>>>>> that once we have a through solution for > > the > > > > > > > watermark > > > > > > > > > > issue, > > > > > > > > > > > > we > > > > > > > > > > > > > >> may > > > > > > > > > > > > > >>>>> be > > > > > > > > > > > > > >>>>>>>>> able to solve the problem we encountered > > for > > > > > > > Table.map > > > > > > > > > in a > > > > > > > > > > > > > cleaner > > > > > > > > > > > > > >>>>> way. > > > > > > > > > > > > > >>>>>>>>> But this is a complex issue which > deserves > > a > > > > > > > discussion > > > > > > > > > on > > > > > > > > > > > its > > > > > > > > > > > > > own. > > > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > > > >>>>>>>>> Regards, > > > > > > > > > > > > > >>>>>>>>> Xiaowei > > > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > > > >>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr > > > > Nowojski < > > > > > > > > > > > > > >>>>>>> pi...@data-artisans.com> > > > > > > > > > > > > > >>>>>>>>> wrote: > > > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>> Hi, > > > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>> Isn’t the problem of multiple > expressions > > > > > limited > > > > > > > only > > > > > > > > > to > > > > > > > > > > > > > >> `flat***` > > > > > > > > > > > > > >>>>>>>>>> functions and to be more specific only > to > > > > having > > > > > > two > > > > > > > > (or > > > > > > > > > > > more) > > > > > > > > > > > > > >>>>>>>>>> different > > > > > > > > > > > > > >>>>>>>>>> table functions passed as an > expressions? > > > > > > > > > > > > > `.flatAgg(TableAggA('a), > > > > > > > > > > > > > >>>>>>>>>> scalarFunction1(‘b), > scalarFunction2(‘c))` > > > > seems > > > > > > to > > > > > > > be > > > > > > > > > > well > > > > > > > > > > > > > >> defined > > > > > > > > > > > > > >>>>>>>>>> (duplicate result of every scalar > function > > > to > > > > > > every > > > > > > > > > > record. > > > > > > > > > > > Or > > > > > > > > > > > > > am > > > > > > > > > > > > > >> I > > > > > > > > > > > > > >>>>>>>>> missing > > > > > > > > > > > > > >>>>>>>>>> something? > > > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>> Another remark, I would be in favour of > > not > > > > > using > > > > > > > > > > > > abbreviations > > > > > > > > > > > > > >> and > > > > > > > > > > > > > >>>>>>>>> naming > > > > > > > > > > > > > >>>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> > > > > > > `flatAggregate`. > > > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>> Piotrek > > > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian > Hueske < > > > > > > > > > > fhue...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> Hi Jincheng, > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> I said before, that I think that the > > > append() > > > > > > > method > > > > > > > > is > > > > > > > > > > > > better > > > > > > > > > > > > > >> than > > > > > > > > > > > > > >>>>>>>>>>> implicitly forwarding keys, but still, > I > > > > > believe > > > > > > it > > > > > > > > > adds > > > > > > > > > > > > > >>>>> unnecessary > > > > > > > > > > > > > >>>>>>>>>> boiler > > > > > > > > > > > > > >>>>>>>>>>> plate code. > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> Moreover, I haven't seen a convincing > > > > argument > > > > > > why > > > > > > > > > > > > > >> map(Expression*) > > > > > > > > > > > > > >>>>>>>>>>> is > > > > > > > > > > > > > >>>>>>>>>>> worse than map(Expression). In either > > case > > > we > > > > > > need > > > > > > > to > > > > > > > > > do > > > > > > > > > > > all > > > > > > > > > > > > > >> kinds > > > > > > > > > > > > > >>>>>>> of > > > > > > > > > > > > > >>>>>>>>>>> checks to prevent invalid use of > > functions. > > > > > > > > > > > > > >>>>>>>>>>> If the method is not correctly used, we > > can > > > > > emit > > > > > > a > > > > > > > > good > > > > > > > > > > > error > > > > > > > > > > > > > >>>>>>> message > > > > > > > > > > > > > >>>>>>>>> and > > > > > > > > > > > > > >>>>>>>>>>> documenting map(Expression*) will be > > easier > > > > > than > > > > > > > > > > > > > >>>>>>>>>> map(append(Expression*)), > > > > > > > > > > > > > >>>>>>>>>>> in my opinion. > > > > > > > > > > > > > >>>>>>>>>>> I think we should not add unnessary > > syntax > > > > > unless > > > > > > > > there > > > > > > > > > > is > > > > > > > > > > > a > > > > > > > > > > > > > good > > > > > > > > > > > > > >>>>>>>>> reason > > > > > > > > > > > > > >>>>>>>>>>> and to be honest, I haven't seen this > > > reason > > > > > yet. > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> Regarding the groupBy.agg() method, I > > think > > > > it > > > > > > > should > > > > > > > > > > > behave > > > > > > > > > > > > > just > > > > > > > > > > > > > >>>>>>>>>>> like > > > > > > > > > > > > > >>>>>>>>>> any > > > > > > > > > > > > > >>>>>>>>>>> other method, i.e., not do any implicit > > > > > > forwarding. > > > > > > > > > > > > > >>>>>>>>>>> Let's take the example of the windowed > > > group > > > > > by, > > > > > > > that > > > > > > > > > you > > > > > > > > > > > > > posted > > > > > > > > > > > > > >>>>>>>>> before. > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> tab.window(Tumble ... as 'w) > > > > > > > > > > > > > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be > a > > > > group > > > > > > key. > > > > > > > > > > > > > >>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, > > > 'col2) > > > > > > > > > > > > > >>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as > 'rtime) > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> What happens if 'w.rowtime is not > > selected? > > > > > What > > > > > > is > > > > > > > > the > > > > > > > > > > > data > > > > > > > > > > > > > type > > > > > > > > > > > > > >>>>> of > > > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>> field 'w in the resulting Table? Is it > a > > > > > regular > > > > > > > > field > > > > > > > > > at > > > > > > > > > > > all > > > > > > > > > > > > > or > > > > > > > > > > > > > >>>>>>> just > > > > > > > > > > > > > >>>>>>>>>>> a > > > > > > > > > > > > > >>>>>>>>>>> system field that disappears if it is > not > > > > > > selected? > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> IMO, the following syntax is shorter, > > more > > > > > > > explicit, > > > > > > > > > and > > > > > > > > > > > > better > > > > > > > > > > > > > >>>>>>>>>>> aligned > > > > > > > > > > > > > >>>>>>>>>>> with the regular window.groupBy.select > > > > > > aggregations > > > > > > > > > that > > > > > > > > > > > are > > > > > > > > > > > > > >>>>>>>>>>> supported > > > > > > > > > > > > > >>>>>>>>>>> today. > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> tab.window(Tumble ... as 'w) > > > > > > > > > > > > > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be > a > > > > group > > > > > > key. > > > > > > > > > > > > > >>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, > > > agg('a)) > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> Best, Fabian > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr > > schrieb > > > > > > jincheng > > > > > > > > > sun < > > > > > > > > > > > > > >>>>>>>>>>> sunjincheng...@gmail.com>: > > > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>> Hi Fabian/Xiaowei, > > > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>> I am very sorry for my late reply! > Glad > > to > > > > see > > > > > > > your > > > > > > > > > > reply, > > > > > > > > > > > > and > > > > > > > > > > > > > >>>>>>>>>>>> sounds > > > > > > > > > > > > > >>>>>>>>>>>> pretty good! > > > > > > > > > > > > > >>>>>>>>>>>> I agree that the approach with > append() > > > > which > > > > > > can > > > > > > > > > > clearly > > > > > > > > > > > > > >> defined > > > > > > > > > > > > > >>>>>>>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>>> result schema is better which Fabian > > > > > mentioned. > > > > > > > > > > > > > >>>>>>>>>>>> In addition and append() and also > > contains > > > > > > > non-time > > > > > > > > > > > > > attributes, > > > > > > > > > > > > > >>>>>>>>>>>> e.g.: > > > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime) > > > > > > > > > > > > > >>>>>>>>>>>> tab.map(append(udf('name), 'address, > > > > > > > > > 'rowtime).as('col1, > > > > > > > > > > > > > 'col2, > > > > > > > > > > > > > >>>>>>>>>>>> 'address, 'rowtime) > > > > > > > > > > > > > >>>>>>>>>>>> .window(Tumble over 5.millis on > 'rowtime > > > as > > > > > 'w) > > > > > > > > > > > > > >>>>>>>>>>>> .groupBy('w, 'address) > > > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>> In this way the append() is very > useful, > > > and > > > > > the > > > > > > > > > > behavior > > > > > > > > > > > is > > > > > > > > > > > > > >> very > > > > > > > > > > > > > >>>>>>>>>> similar > > > > > > > > > > > > > >>>>>>>>>>>> to withForwardedFields() in DataSet. > > > > > > > > > > > > > >>>>>>>>>>>> So +1 to using append() approach for > the > > > > > > > > > > map()&flatmap()! > > > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>> But how about the agg() and flatAgg()? > > In > > > > > > > > agg/flatAgg > > > > > > > > > > > case I > > > > > > > > > > > > > >> agree > > > > > > > > > > > > > >>>>>>>>>>>> Xiaowei's approach that define the > keys > > to > > > > be > > > > > > > > implied > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > >>>>> result > > > > > > > > > > > > > >>>>>>>>>> table > > > > > > > > > > > > > >>>>>>>>>>>> and appears at the beginning, for > > example > > > as > > > > > > > > follows: > > > > > > > > > > > > > >>>>>>>>>>>> tab.window(Tumble ... as 'w) > > > > > > > > > > > > > >>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should > be a > > > > group > > > > > > > key. > > > > > > > > > > > > > >>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, > > > 'col2) > > > > > > > > > > > > > >>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as > > 'rtime) > > > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>> What to you think? @Fabian @Xiaowei > > > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>> Thanks, > > > > > > > > > > > > > >>>>>>>>>>>> Jincheng > > > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> > > > > > 于2018年11月9日周五 > > > > > > > > > > 下午6:35写道: > > > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>> Hi Jincheng, > > > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>> Thanks for the summary! > > > > > > > > > > > > > >>>>>>>>>>>>> I like the approach with append() > > better > > > > than > > > > > > the > > > > > > > > > > > implicit > > > > > > > > > > > > > >>>>>>>>>>>>> forwarding > > > > > > > > > > > > > >>>>>>>>>> as > > > > > > > > > > > > > >>>>>>>>>>>> it > > > > > > > > > > > > > >>>>>>>>>>>>> clearly indicates which fields are > > > > forwarded. > > > > > > > > > > > > > >>>>>>>>>>>>> However, I don't see much benefit > over > > > the > > > > > > > > > > > > > flatMap(Expression*) > > > > > > > > > > > > > >>>>>>>>>> variant, > > > > > > > > > > > > > >>>>>>>>>>>> as > > > > > > > > > > > > > >>>>>>>>>>>>> we would still need to analyze the > full > > > > > > > expression > > > > > > > > > tree > > > > > > > > > > > to > > > > > > > > > > > > > >> ensure > > > > > > > > > > > > > >>>>>>>>> that > > > > > > > > > > > > > >>>>>>>>>> at > > > > > > > > > > > > > >>>>>>>>>>>>> most (or exactly?) one Scalar / > > > > TableFunction > > > > > > is > > > > > > > > > used. > > > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>> Best, > > > > > > > > > > > > > >>>>>>>>>>>>> Fabian > > > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr > > schrieb > > > > > > > jincheng > > > > > > > > > sun > > > > > > > > > > < > > > > > > > > > > > > > >>>>>>>>>>>>> sunjincheng...@gmail.com>: > > > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> Hi all, > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> We are discussing very detailed > > content > > > > > about > > > > > > > this > > > > > > > > > > > > proposal. > > > > > > > > > > > > > >> We > > > > > > > > > > > > > >>>>>>>>>>>>>> are > > > > > > > > > > > > > >>>>>>>>>>>>> trying > > > > > > > > > > > > > >>>>>>>>>>>>>> to design the API in many aspects > > > > > > > (functionality, > > > > > > > > > > > > > >> compatibility, > > > > > > > > > > > > > >>>>>>>>> ease > > > > > > > > > > > > > >>>>>>>>>>>> of > > > > > > > > > > > > > >>>>>>>>>>>>>> use, etc.). I think this is a very > > good > > > > > > process. > > > > > > > > > Only > > > > > > > > > > > > such a > > > > > > > > > > > > > >>>>>>>>> detailed > > > > > > > > > > > > > >>>>>>>>>>>>>> discussion, In order to develop PR > > more > > > > > > clearly > > > > > > > > and > > > > > > > > > > > > smoothly > > > > > > > > > > > > > >> in > > > > > > > > > > > > > >>>>>>>>>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>>> later > > > > > > > > > > > > > >>>>>>>>>>>>>> stage. I am very grateful to @Fabian > > and > > > > > > > @Xiaowei > > > > > > > > > for > > > > > > > > > > > > > >> sharing a > > > > > > > > > > > > > >>>>>>>>>>>>>> lot > > > > > > > > > > > > > >>>>>>>>>> of > > > > > > > > > > > > > >>>>>>>>>>>>>> good ideas. > > > > > > > > > > > > > >>>>>>>>>>>>>> About the definition of method > > > signatures > > > > I > > > > > > want > > > > > > > > to > > > > > > > > > > > share > > > > > > > > > > > > my > > > > > > > > > > > > > >>>>>>>>>>>>>> points > > > > > > > > > > > > > >>>>>>>>>>>> here > > > > > > > > > > > > > >>>>>>>>>>>>>> which I am discussing with fabian in > > > > google > > > > > > doc > > > > > > > > (not > > > > > > > > > > yet > > > > > > > > > > > > > >>>>>>>>>>>>>> completed), > > > > > > > > > > > > > >>>>>>>>>> as > > > > > > > > > > > > > >>>>>>>>>>>>>> follows: > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> Assume we have a table: > > > > > > > > > > > > > >>>>>>>>>>>>>> val tab = util.addTable[(Long, > > > > > > > String)]("MyTable", > > > > > > > > > > > 'long, > > > > > > > > > > > > > >>>>>>> 'string, > > > > > > > > > > > > > >>>>>>>>>>>>>> 'proctime.proctime) > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> Approach 1: > > > > > > > > > > > > > >>>>>>>>>>>>>> case1: Map follows Source Table > > > > > > > > > > > > > >>>>>>>>>>>>>> val result = > > > > > > > > > > > > > >>>>>>>>>>>>>> tab.map(udf('string)).as('proctime, > > > 'col1, > > > > > > > > 'col2)// > > > > > > > > > > > > proctime > > > > > > > > > > > > > >>>>>>>>> implied > > > > > > > > > > > > > >>>>>>>>>>>> in > > > > > > > > > > > > > >>>>>>>>>>>>>> the output > > > > > > > > > > > > > >>>>>>>>>>>>>> .window(Tumble over 5.millis on > > > 'proctime > > > > as > > > > > > 'w) > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian > > > > > mentioned > > > > > > > > > above) > > > > > > > > > > > > > >>>>>>>>>>>>>> val result = > > > > > > > > > > > > > >>>>>>>>>>>>>> tab.window(Tumble ... as 'w) > > > > > > > > > > > > > >>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w > should > > > be > > > > a > > > > > > > group > > > > > > > > > key. > > > > > > > > > > > > > >>>>>>>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, > > 'w, > > > > > > 'col1, > > > > > > > > > 'col2) > > > > > > > > > > > > > >>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as > > > > 'rtime) > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s > > > approach, > > > > > > which > > > > > > > > the > > > > > > > > > > > result > > > > > > > > > > > > > >>>>> schema > > > > > > > > > > > > > >>>>>>>>>> would > > > > > > > > > > > > > >>>>>>>>>>>>> be > > > > > > > > > > > > > >>>>>>>>>>>>>> clearly defined, but add a built-in > > > append > > > > > > UDF. > > > > > > > > That > > > > > > > > > > > make > > > > > > > > > > > > > >>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface > only > > > > > accept > > > > > > > one > > > > > > > > > > > > > Expression. > > > > > > > > > > > > > >>>>>>>>>>>>>> val result = > > > > > > > > > > > > > >>>>>>>>>>>>>> tab.map(append(udf('string), 'long, > > > > > > 'proctime)) > > > > > > > as > > > > > > > > > > > ('col1, > > > > > > > > > > > > > >>>>>>>>>>>>>> 'col2, > > > > > > > > > > > > > >>>>>>>>>>>>>> 'long, 'proctime) > > > > > > > > > > > > > >>>>>>>>>>>>>> .window(Tumble over 5.millis on > > > 'proctime > > > > > as > > > > > > > 'w) > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> Note: Append is a special UDF for > > > built-in > > > > > > that > > > > > > > > can > > > > > > > > > > pass > > > > > > > > > > > > > >> through > > > > > > > > > > > > > >>>>>>>>>>>>>> any > > > > > > > > > > > > > >>>>>>>>>>>>>> column. > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> So, May be we can defined the as > > > > > > > > > > table.map(Expression) > > > > > > > > > > > > > >> first, > > > > > > > > > > > > > >>>>>>> If > > > > > > > > > > > > > >>>>>>>>>>>>>> necessary, we can extend to > > > > > > > table.map(Expression*) > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > >>>>> future > > > > > > > > > > > > > >>>>>>>>>>>>>> ? > > > > > > > > > > > > > >>>>>>>>>> Of > > > > > > > > > > > > > >>>>>>>>>>>>>> course, I also hope that we can do > > more > > > > > > > perfection > > > > > > > > > in > > > > > > > > > > > this > > > > > > > > > > > > > >>>>>>>>>>>>>> proposal > > > > > > > > > > > > > >>>>>>>>>>>>> through > > > > > > > > > > > > > >>>>>>>>>>>>>> discussion. > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> Thanks, > > > > > > > > > > > > > >>>>>>>>>>>>>> Jincheng > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> > > > > > > > 于2018年11月7日周三 > > > > > > > > > > > > 下午11:45写道: > > > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>> Hi Fabian, > > > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>> I think that the key question you > > > raised > > > > is > > > > > > if > > > > > > > we > > > > > > > > > > allow > > > > > > > > > > > > > extra > > > > > > > > > > > > > >>>>>>>>>>>>> parameters > > > > > > > > > > > > > >>>>>>>>>>>>>> in > > > > > > > > > > > > > >>>>>>>>>>>>>>> the methods > map/flatMap/agg/flatAgg. > > I > > > > can > > > > > > see > > > > > > > > why > > > > > > > > > > > > allowing > > > > > > > > > > > > > >>>>> that > > > > > > > > > > > > > >>>>>>>>> may > > > > > > > > > > > > > >>>>>>>>>>>>>> appear > > > > > > > > > > > > > >>>>>>>>>>>>>>> more convenient in some cases. > > However, > > > > it > > > > > > > might > > > > > > > > > also > > > > > > > > > > > > cause > > > > > > > > > > > > > >>>>> some > > > > > > > > > > > > > >>>>>>>>>>>>>> confusions > > > > > > > > > > > > > >>>>>>>>>>>>>>> if we do that. For example, do we > > allow > > > > > > > multiple > > > > > > > > > UDFs > > > > > > > > > > > in > > > > > > > > > > > > > >> these > > > > > > > > > > > > > >>>>>>>>>>>>>> expressions? > > > > > > > > > > > > > >>>>>>>>>>>>>>> If we do, the semantics may be > weird > > to > > > > > > define, > > > > > > > > > e.g. > > > > > > > > > > > what > > > > > > > > > > > > > >> does > > > > > > > > > > > > > >>>>>>>>>>>>>>> > > > table.groupBy('k).flatAgg(TableAggA('a), > > > > > > > > > > TableAggB('b)) > > > > > > > > > > > > > mean? > > > > > > > > > > > > > >>>>>>>>>>>>>>> Even > > > > > > > > > > > > > >>>>>>>>>>>>> though > > > > > > > > > > > > > >>>>>>>>>>>>>>> not allowing it may appear less > > > powerful, > > > > > but > > > > > > > it > > > > > > > > > can > > > > > > > > > > > make > > > > > > > > > > > > > >>>>> things > > > > > > > > > > > > > >>>>>>>>> more > > > > > > > > > > > > > >>>>>>>>>>>>>>> intuitive too. In the case of > > > > agg/flatAgg, > > > > > we > > > > > > > can > > > > > > > > > > > define > > > > > > > > > > > > > the > > > > > > > > > > > > > >>>>>>> keys > > > > > > > > > > > > > >>>>>>>>> to > > > > > > > > > > > > > >>>>>>>>>>>> be > > > > > > > > > > > > > >>>>>>>>>>>>>>> implied in the result table and > > appears > > > > at > > > > > > the > > > > > > > > > > > beginning. > > > > > > > > > > > > > You > > > > > > > > > > > > > >>>>>>> can > > > > > > > > > > > > > >>>>>>>>>>>> use a > > > > > > > > > > > > > >>>>>>>>>>>>>>> select method if you want to modify > > > this > > > > > > > > behavior. > > > > > > > > > I > > > > > > > > > > > > think > > > > > > > > > > > > > >> that > > > > > > > > > > > > > >>>>>>>>>>>>>> eventually > > > > > > > > > > > > > >>>>>>>>>>>>>>> we will have some API which allows > > > other > > > > > > > > > expressions > > > > > > > > > > as > > > > > > > > > > > > > >>>>>>>>>>>>>>> additional > > > > > > > > > > > > > >>>>>>>>>>>>>>> parameters, but I think it's better > > to > > > do > > > > > > that > > > > > > > > > after > > > > > > > > > > we > > > > > > > > > > > > > >>>>>>> introduce > > > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>>>>>> concept of nested tables. A lot of > > > things > > > > > we > > > > > > > > > > suggested > > > > > > > > > > > > here > > > > > > > > > > > > > >> can > > > > > > > > > > > > > >>>>>>>>>>>>>>> be > > > > > > > > > > > > > >>>>>>>>>>>>>>> considered as special cases of > that. > > > But > > > > > > things > > > > > > > > are > > > > > > > > > > > much > > > > > > > > > > > > > >>>>> simpler > > > > > > > > > > > > > >>>>>>>>>>>>>>> if > > > > > > > > > > > > > >>>>>>>>>>>> we > > > > > > > > > > > > > >>>>>>>>>>>>>>> leave that to later. > > > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>> Regards, > > > > > > > > > > > > > >>>>>>>>>>>>>>> Xiaowei > > > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM > Fabian > > > > > Hueske > > > > > > < > > > > > > > > > > > > > >>>>> fhue...@gmail.com > > > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>> wrote: > > > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>> Hi, > > > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>> * Re emit: > > > > > > > > > > > > > >>>>>>>>>>>>>>>> I think we should start with a > well > > > > > > understood > > > > > > > > > > > semantics > > > > > > > > > > > > > of > > > > > > > > > > > > > >>>>>>> full > > > > > > > > > > > > > >>>>>>>>>>>>>>>> replacement. This is how the other > > agg > > > > > > > functions > > > > > > > > > > work. > > > > > > > > > > > > > >>>>>>>>>>>>>>>> As was said before, there are open > > > > > questions > > > > > > > > > > regarding > > > > > > > > > > > > an > > > > > > > > > > > > > >>>>>>> append > > > > > > > > > > > > > >>>>>>>>>>>> mode > > > > > > > > > > > > > >>>>>>>>>>>>>>>> (checkpointing, whether supporting > > > > > > retractions > > > > > > > > or > > > > > > > > > > not > > > > > > > > > > > > and > > > > > > > > > > > > > if > > > > > > > > > > > > > >>>>>>> yes > > > > > > > > > > > > > >>>>>>>>>>>> how > > > > > > > > > > > > > >>>>>>>>>>>>> to > > > > > > > > > > > > > >>>>>>>>>>>>>>>> declare them, ...). > > > > > > > > > > > > > >>>>>>>>>>>>>>>> Since this seems to be an > > > optimization, > > > > > I'd > > > > > > > > > postpone > > > > > > > > > > > it. > > > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>> * Re grouping keys: > > > > > > > > > > > > > >>>>>>>>>>>>>>>> I don't think we should > > automatically > > > > add > > > > > > them > > > > > > > > > > because > > > > > > > > > > > > the > > > > > > > > > > > > > >>>>>>>>>>>>>>>> result > > > > > > > > > > > > > >>>>>>>>>>>>>> schema > > > > > > > > > > > > > >>>>>>>>>>>>>>>> would not be intuitive. > > > > > > > > > > > > > >>>>>>>>>>>>>>>> Would they be added at the > beginning > > > of > > > > > the > > > > > > > > tuple > > > > > > > > > or > > > > > > > > > > > at > > > > > > > > > > > > > the > > > > > > > > > > > > > >>>>>>> end? > > > > > > > > > > > > > >>>>>>>>>>>> What > > > > > > > > > > > > > >>>>>>>>>>>>>>>> metadata fields of windows would > be > > > > added? > > > > > > In > > > > > > > > > which > > > > > > > > > > > > order > > > > > > > > > > > > > >>>>> would > > > > > > > > > > > > > >>>>>>>>>>>> they > > > > > > > > > > > > > >>>>>>>>>>>>> be > > > > > > > > > > > > > >>>>>>>>>>>>>>>> added? > > > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>> However, we could support syntax > > like > > > > > this: > > > > > > > > > > > > > >>>>>>>>>>>>>>>> val t: Table = ??? > > > > > > > > > > > > > >>>>>>>>>>>>>>>> t > > > > > > > > > > > > > >>>>>>>>>>>>>>>> .window(Tumble ... as 'w) > > > > > > > > > > > > > >>>>>>>>>>>>>>>> .groupBy('a, 'b) > > > > > > > > > > > > > >>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), > > > 'w.end > > > > as > > > > > > > > 'wend, > > > > > > > > > > > > > 'w.rowtime > > > > > > > > > > > > > >>>>>>> as > > > > > > > > > > > > > >>>>>>>>>>>>>> 'rtime) > > > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>> The result schema would be clearly > > > > defined > > > > > > as > > > > > > > > [b, > > > > > > > > > a, > > > > > > > > > > > f1, > > > > > > > > > > > > > f2, > > > > > > > > > > > > > >>>>>>>>>>>>>>>> ..., > > > > > > > > > > > > > >>>>>>>>>>>> fn, > > > > > > > > > > > > > >>>>>>>>>>>>>>> wend, > > > > > > > > > > > > > >>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the > > result > > > > > > > > attributes > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > > >> UDF. > > > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>> * Re Multi-staged evaluation: > > > > > > > > > > > > > >>>>>>>>>>>>>>>> I think this should be an > > optimization > > > > > that > > > > > > > can > > > > > > > > be > > > > > > > > > > > > applied > > > > > > > > > > > > > >> if > > > > > > > > > > > > > >>>>>>>>>>>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>>> UDF > > > > > > > > > > > > > >>>>>>>>>>>>>>>> implements the merge() method. > > > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>> Best, Fabian > > > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr > > > > schrieb > > > > > > > > Shaoxuan > > > > > > > > > > > Wang > > > > > > > > > > > > < > > > > > > > > > > > > > >>>>>>>>>>>>>>>> wshaox...@gmail.com > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> : > > > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi xiaowei, > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> Yes, I agree with you that the > > > > semantics > > > > > of > > > > > > > > > > > > > >>>>>>>>>>>> TableAggregateFunction > > > > > > > > > > > > > >>>>>>>>>>>>>> emit > > > > > > > > > > > > > >>>>>>>>>>>>>>>> is > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> much more complex than > > > > AggregateFunction. > > > > > > The > > > > > > > > > > > > fundamental > > > > > > > > > > > > > >>>>>>>>>>>>> difference > > > > > > > > > > > > > >>>>>>>>>>>>>> is > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> that TableAggregateFunction > emits a > > > > > "table" > > > > > > > > while > > > > > > > > > > > > > >>>>>>>>>>>> AggregateFunction > > > > > > > > > > > > > >>>>>>>>>>>>>>>> outputs > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> (a column of) a "row". In the > case > > of > > > > > > > > > > > AggregateFunction > > > > > > > > > > > > > it > > > > > > > > > > > > > >>>>>>> only > > > > > > > > > > > > > >>>>>>>>>>>> has > > > > > > > > > > > > > >>>>>>>>>>>>>> one > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> mode which is “replacing” > (complete > > > > > > update). > > > > > > > > But > > > > > > > > > > for > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> TableAggregateFunction, it could > be > > > > > > > incremental > > > > > > > > > > (only > > > > > > > > > > > > > emit > > > > > > > > > > > > > >>>>> the > > > > > > > > > > > > > >>>>>>>>>>>> new > > > > > > > > > > > > > >>>>>>>>>>>>>>>> updated > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> results) update or complete > update > > > > > (always > > > > > > > emit > > > > > > > > > the > > > > > > > > > > > > > entire > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> table > > > > > > > > > > > > > >>>>>>>>>>>>> when > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> “emit" is triggered). >From the > > > > > performance > > > > > > > > > > > > perspective, > > > > > > > > > > > > > we > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> might > > > > > > > > > > > > > >>>>>>>>>>>>>> want > > > > > > > > > > > > > >>>>>>>>>>>>>>> to > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> use incremental update. But we > need > > > > > review > > > > > > > and > > > > > > > > > > design > > > > > > > > > > > > > this > > > > > > > > > > > > > >>>>>>>>>>>>> carefully, > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> especially taking into account > the > > > > cases > > > > > of > > > > > > > the > > > > > > > > > > > > failover > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> (instead > > > > > > > > > > > > > >>>>>>>>>>>>> of > > > > > > > > > > > > > >>>>>>>>>>>>>>> just > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> back-up the ACC it may also needs > > to > > > > > > remember > > > > > > > > the > > > > > > > > > > > emit > > > > > > > > > > > > > >>>>> offset) > > > > > > > > > > > > > >>>>>>>>>>>> and > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> retractions, as the semantics of > > > > > > > > > > > TableAggregateFunction > > > > > > > > > > > > > >> emit > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> are > > > > > > > > > > > > > >>>>>>>>>>>>>>>> different > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> than other UDFs. TableFunction > also > > > > > emits a > > > > > > > > > table, > > > > > > > > > > > but > > > > > > > > > > > > it > > > > > > > > > > > > > >>>>> does > > > > > > > > > > > > > >>>>>>>>>>>> not > > > > > > > > > > > > > >>>>>>>>>>>>>> need > > > > > > > > > > > > > >>>>>>>>>>>>>>>> to > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> worry this due to the nature of > > > > > stateless. > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> Regards, > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> Shaoxuan > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM > > > Xiaowei > > > > > > Jiang > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> <xiaow...@gmail.com > > > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>> wrote: > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> Hi Jincheng, > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> Thanks for adding the public > > > > > interfaces! I > > > > > > > > think > > > > > > > > > > > that > > > > > > > > > > > > > >> it's a > > > > > > > > > > > > > >>>>>>>>>>>> very > > > > > > > > > > > > > >>>>>>>>>>>>>>> good > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> start. There are a few points > that > > > we > > > > > need > > > > > > > to > > > > > > > > > have > > > > > > > > > > > > more > > > > > > > > > > > > > >>>>>>>>>>>>>> discussions. > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> - TableAggregateFunction - this > > is a > > > > > very > > > > > > > > > complex > > > > > > > > > > > > beast, > > > > > > > > > > > > > >>>>>>>>>>>>>>> definitely > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> most complex user defined > objects > > we > > > > > > > > introduced > > > > > > > > > so > > > > > > > > > > > > far. > > > > > > > > > > > > > I > > > > > > > > > > > > > >>>>>>>>>>>>> think > > > > > > > > > > > > > >>>>>>>>>>>>>>>> there > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> are > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> quite some interesting questions > > > here. > > > > > For > > > > > > > > > > example, > > > > > > > > > > > do > > > > > > > > > > > > > we > > > > > > > > > > > > > >>>>>>>>>>>>> allow > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> multi-staged TableAggregate in > > this > > > > > case? > > > > > > > What > > > > > > > > > is > > > > > > > > > > > the > > > > > > > > > > > > > >>>>>>>>>>>>> semantics > > > > > > > > > > > > > >>>>>>>>>>>>>> of > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> emit? Is > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> it amendments to the previous > > > output, > > > > or > > > > > > > > > replacing > > > > > > > > > > > > it? I > > > > > > > > > > > > > >>>>>>>>>>>> think > > > > > > > > > > > > > >>>>>>>>>>>>>>> that > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> this > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> subject itself is worth a > > discussion > > > > to > > > > > > make > > > > > > > > > sure > > > > > > > > > > we > > > > > > > > > > > > get > > > > > > > > > > > > > >>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>>>>>> details > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> right. > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the > > group > > > > keys > > > > > > > > > > > automatically > > > > > > > > > > > > > >>>>>>>>>>>> appear > > > > > > > > > > > > > >>>>>>>>>>>>> in > > > > > > > > > > > > > >>>>>>>>>>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> output? how about the case of > > > > windowing > > > > > > > > > > aggregation? > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> Regards, > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> Xiaowei > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM > > > > jincheng > > > > > > sun > > > > > > > < > > > > > > > > > > > > > >>>>>>>>>>>>>>> sunjincheng...@gmail.com> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> wrote: > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Hi, Xiaowei, > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss > > of > > > > > Table > > > > > > > API > > > > > > > > > > > > > Enhancement > > > > > > > > > > > > > >>>>>>>>>>>>> Outline > > > > > > > > > > > > > >>>>>>>>>>>>>> ! > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> I quickly looked at the overall > > > > > content, > > > > > > > > these > > > > > > > > > > are > > > > > > > > > > > > good > > > > > > > > > > > > > >>>>>>>>>>>>>> expressions > > > > > > > > > > > > > >>>>>>>>>>>>>>>> of > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> our > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> offline discussions. But from > the > > > > > points > > > > > > of > > > > > > > > my > > > > > > > > > > > view, > > > > > > > > > > > > we > > > > > > > > > > > > > >>>>>>>>>>>> should > > > > > > > > > > > > > >>>>>>>>>>>>>> add > > > > > > > > > > > > > >>>>>>>>>>>>>>>> the > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> usage of public interfaces that > > we > > > > will > > > > > > > > > introduce > > > > > > > > > > > in > > > > > > > > > > > > > this > > > > > > > > > > > > > >>>>>>>>>>>>>> propose. > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> So, I > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> added the following usage > > > description > > > > > of > > > > > > > > > > interface > > > > > > > > > > > > and > > > > > > > > > > > > > >>>>>>>>>>>>> operators > > > > > > > > > > > > > >>>>>>>>>>>>>>> in > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> google doc: > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 1. Map Operator > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Map operator is a new operator > of > > > > > Table, > > > > > > > Map > > > > > > > > > > > operator > > > > > > > > > > > > > >> can > > > > > > > > > > > > > >>>>>>>>>>>>>>> apply a > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> scalar function, and can return > > > > > > > multi-column. > > > > > > > > > The > > > > > > > > > > > > usage > > > > > > > > > > > > > >> as > > > > > > > > > > > > > >>>>>>>>>>>>>> follows: > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> val res = tab > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .map(fun: > ScalarFunction).as(‘a, > > > ‘b, > > > > > ‘c) > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 2. FlatMap Operator > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> FaltMap operator is a new > > operator > > > of > > > > > > > Table, > > > > > > > > > > > FlatMap > > > > > > > > > > > > > >>>>>>>>>>>>> operator > > > > > > > > > > > > > >>>>>>>>>>>>>>> can > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> apply > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> a table function, and can > return > > > > > > multi-row. > > > > > > > > The > > > > > > > > > > > usage > > > > > > > > > > > > > as > > > > > > > > > > > > > >>>>>>>>>>>>> follows: > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> val res = tab > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .flatMap(fun: > > > TableFunction).as(‘a, > > > > > ‘b, > > > > > > > ‘c) > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 3. Agg Operator > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Agg operator is a new operator > of > > > > > > > > > > > Table/GroupedTable, > > > > > > > > > > > > > >> Agg > > > > > > > > > > > > > >>>>>>>>>>>>>>>> operator > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> can > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> apply a aggregate function, and > > can > > > > > > return > > > > > > > > > > > > > multi-column. > > > > > > > > > > > > > >>>>> The > > > > > > > > > > > > > >>>>>>>>>>>>>> usage > > > > > > > > > > > > > >>>>>>>>>>>>>>> as > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> follows: > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> val res = tab > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave > > > > groupBy-Clause > > > > > > out > > > > > > > to > > > > > > > > > > > define > > > > > > > > > > > > > >>>>>>>>>>>> global > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> aggregates > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .agg(fun: > > > AggregateFunction).as(‘a, > > > > > ‘b, > > > > > > > ‘c) > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 4. FlatAgg Operator > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> FlatAgg operator is a new > > operator > > > of > > > > > > > > > > > > > >> Table/GroupedTable, > > > > > > > > > > > > > >>>>>>>>>>>>>>> FaltAgg > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> operator can apply a table > > > aggregate > > > > > > > > function, > > > > > > > > > > and > > > > > > > > > > > > can > > > > > > > > > > > > > >>>>>>> return > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> multi-row. > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> The usage as follows: > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> val res = tab > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave > > > > groupBy-Clause > > > > > > out > > > > > > > > to > > > > > > > > > > > define > > > > > > > > > > > > > >>>>>>>>>>>>> global > > > > > > > > > > > > > >>>>>>>>>>>>>>>> table > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> aggregates > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .flatAgg(fun: > > > > > > > > TableAggregateFunction).as(‘a, > > > > > > > > > > ‘b, > > > > > > > > > > > > ‘c) > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> The behavior of table > aggregates > > > is > > > > > most > > > > > > > > like > > > > > > > > > > > > > >>>>>>>>>>>>>>>> GroupReduceFunction > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>> did, > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> which computed for a group of > > > > elements, > > > > > > and > > > > > > > > > > > output a > > > > > > > > > > > > > >> group > > > > > > > > > > > > > >>>>>>>>>>>> of > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> elements. > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> The TableAggregateFunction can > be > > > > > applied > > > > > > > on > > > > > > > > > > > > > >>>>>>>>>>>>>>> GroupedTable.flatAgg() . > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> The > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> interface of > > TableAggregateFunction > > > > > has a > > > > > > > lot > > > > > > > > > of > > > > > > > > > > > > > content, > > > > > > > > > > > > > >>>>> so > > > > > > > > > > > > > >>>>>>>>>>>> I > > > > > > > > > > > > > >>>>>>>>>>>>>>> don't > > > > > > > > > > > > > >>>>>>>>>>>>>>>>> copy > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> it here, Please look at the > > detail > > > in > > > > > > > google > > > > > > > > > dochttps://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> I will be very appreciate to > > anyone > > > > for > > > > > > > > > reviewing > > > > > > > > > > > and > > > > > > > > > > > > > >>>>>>>>>>>>> commenting. > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Best, > > > > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Jinchengome was not built in one day