Thanks Fabian&Piotrek, Your feedback sounds very good! So far we on the same page about how to handle group keys. I will update the google doc according our discussion and I'd like to convert it to a FLIP. Thus, it would be great if any of you can grant me the write access to Confluence. My Confluence ID is sunjincheng121.
Cheers, Jincheng Piotr Nowojski <pi...@data-artisans.com> 于2018年11月29日周四 下午6:48写道: > Hi Jincheng & Fabian, > > +1 From my point of view. > > I like this idea that you have to close `flatAggregate` with `select` > statement. In a way it will be consistent with normal `groupBy` and indeed > it solves the problem of mixing table and scalar functions. > > I would be against supporting `select(‘*)` that returns only `col1` and > `col2`. `select('*)` for me would mean “give me everything that you have” > and it would be very confusing for me that: > > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) > .flatAggregate(tableAgg('a)) > .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) > > would return more columns compared to: > > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) > .flatAggregate(tableAgg('a)) > .select(‘*) > > Also I would be fine with not supporting ever `.select(‘*)`. After all it > would be consistent with regular aggregation where this doesn’t make sense > and is not supported as well: > > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) > .select(‘*) > > Piotrek > > > On 29 Nov 2018, at 10:51, Fabian Hueske <fhue...@gmail.com> wrote: > > > > Hi, > > > > OK, not supporting select('*) in the first version, sounds like a good > > first step, +1 for this. > > > > However, I don't think that select('*) returning only the result columns > of > > the agg function would be a significant break in semantics. > > Since aggregate()/flatAggregate() is the last command and (visibly) only > > forwards the result columns of the agg function, returning those for '* > > seems fine with me. > > The grouping columns (+window properties) are "side-passed" from the > > groupBy. > > > > So, IMO, both select('*) and select('k1, 'k2, 'w.rowtime, '*) have well > > defined semantics. But we can add these shortcuts later. > > > > Best, > > Fabian > > > > > > Am Mi., 28. Nov. 2018 um 11:13 Uhr schrieb jincheng sun < > > sunjincheng...@gmail.com>: > > > >> Hi Fabian, > >> > >> Thank you for listing the detailed example of forcing the use of select. > >> > >> If I didn't make it clear before, I would like to share my thoughts > about > >> the group keys here: > >> > >> 1. agg/flatagg(Expression) keeps a single Expression; > >> 2. The way to force users to use select is as follows(As you > mentioned): > >> > >> val tabX2 = tab.window(Tumble ... as 'w) > >> > >> .groupBy('w, 'k1, 'k2) > >> > >> .flatAgg(tableAgg('a)) > >> > >> .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) > >> > >> > >> 3. IMO. we do not support select "*" in window group table at the > first > >> version. > >> But i am fine if you want support the shout cut,i.e.: > >> `select('w.rowtime, 'k1, 'k2, '*)` , which meant "*" does not include > group > >> keys. > >> Just a little bit worried that "*" represents all columns in a > >> non-grouped table, and groupedTable does not represent all columns. > >> The semantics of “*” are not uniform in this case. > >> > >> What do you think? > >> > >> Thanks, > >> Jincheng > >> > >> > >> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午7:27写道: > >> > >>> I don't think we came to a conclusion yet. > >>> Are you suggesting that 'w would disappear if we do the following: > >>> > >>> val tabX = tab.window(Tumble ... as 'w) > >>> .groupBy('w, 'k1, 'k2) > >>> .flatAgg(tableAgg('a)) > >>> .select('*) > >>> > >>> Given that tableAgg returns [col1, col2], what would the schema of tabX > >> be? > >>> > >>> 1) [col1, col2]: fine with me > >>> 2) [k1, k2, col1, col2]: I'm very skeptical about this option because > it > >> is > >>> IMO inconsistent. Users will ask, where did 'w go. > >>> 2) [w, k1, k2, col1, col2]: back to our original problem. What's the > data > >>> type of 'w? > >>> > >>> If we go for option 1, we can still allow to select keys. > >>> > >>> val tabX2 = tab.window(Tumble ... as 'w) > >>> .groupBy('w, 'k1, 'k2) > >>> .flatAgg(tableAgg('a)) > >>> .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) > >>> > >>> The good thing about enforcing select is that the result of flatAgg is > >> not > >>> a Table, i.e., the definition of the operation is not completed yet. > >>> On the other hand, we would need to list all result attributes of the > >> table > >>> function if we want to select keys. We could to that with a short cut: > >>> > >>> val tabX3 = tab.window(Tumble ... as 'w) > >>> .groupBy('w, 'k1, 'k2) > >>> .flatAgg(tableAgg('a)) > >>> .select('w.rowtime, 'k1, 'k2, '*) > >>> > >>> > >>> Am Di., 27. Nov. 2018 um 10:46 Uhr schrieb jincheng sun < > >>> sunjincheng...@gmail.com>: > >>> > >>>> Thanks Fabian, if we enforcing select, as i said before user should > >> using > >>>> 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX' etc. In this way > we > >>>> should not defined the type of 'w, we can keep the current way of > using > >>> 'w. > >>>> I'll file the JIRA. and open the PR ASAP. > >>>> > >>>> Thanks, > >>>> Jincheng > >>>> > >>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午4:50写道: > >>>> > >>>>> I thought about it again. > >>>>> Enforcing select won't help us with the choice of how to represent > >> 'w. > >>>>> > >>>>> select('*) and > >>>>> select('a, 'b, 'w). > >>>>> > >>>>> would still be valid expressions and we need to decide how 'w is > >>>>> represented. > >>>>> As I said before, Tuple, Row, and Map have disadvantages because the > >>>> syntax > >>>>> 'w.rowtime or 'w.end would not be supported. > >>>>> > >>>>> Am Di., 27. Nov. 2018 um 01:05 Uhr schrieb jincheng sun < > >>>>> sunjincheng...@gmail.com>: > >>>>> > >>>>>> Before we have a good support for nest-table, may be forcing the > >> use > >>> of > >>>>>> select is good way, at least not causing compatibility issues. > >>>>>> > >>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月26日周一 下午6:48写道: > >>>>>> > >>>>>>> I think the question is what is the data type of 'w. > >>>>>>> > >>>>>>> Until now, I assumed it would be a nested tuple (Row or Tuple). > >>>>>>> Accessing nested fields in Row, Tuple or Pojo is done with get, > >>> i.e., > >>>>>>> 'w.get("rowtime"). > >>>>>>> Using a Map would not help because the access would be 'w.at > >>>>> ("rowtime"). > >>>>>>> > >>>>>>> We can of course also enforce the select() if aggregate / > >>>> flatAggregate > >>>>>> do > >>>>>>> not return a Table but some kind of AggregatedTable that does not > >>>>> provide > >>>>>>> any other function than select(). > >>>>>>> > >>>>>>> Am Mo., 26. Nov. 2018 um 01:12 Uhr schrieb jincheng sun < > >>>>>>> sunjincheng...@gmail.com>: > >>>>>>> > >>>>>>>> Yes,I agree the problem is needs attention. IMO. It depends on > >>> how > >>>> we > >>>>>>>> define the ‘w type. The way you above defines the 'w type as a > >>>> tuple. > >>>>>> If > >>>>>>>> you serialize 'w to a Map, the compatibility will be better. > >> Even > >>>>> more > >>>>>> we > >>>>>>>> can define ‘w as a special type. UDF and Sink can't be used > >>>> directly. > >>>>>>> Must > >>>>>>>> use 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX', and I > >>> will > >>>> be > >>>>>>> very > >>>>>>>> grateful if you can share your solution to this problem, and > >> we > >>>> also > >>>>>> can > >>>>>>>> discuss it carefully in the PR to be opened. What to you think? > >>>>>>>> > >>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午6:21写道: > >>>>>>>> > >>>>>>>>> Something like: > >>>>>>>>> > >>>>>>>>> val x = tab.window(Tumble ... as 'w) > >>>>>>>>> .groupBy('w, 'k1, 'k2) > >>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >>>>>>>>> > >>>>>>>>> x.insertInto("sinkTable") // fails because result schema has > >>>>> changed > >>>>>>> from > >>>>>>>>> ((start, end, rowtime), k1, k2, col1, col2) to ((start, end, > >>>>> rowtime, > >>>>>>>>> newProperty), k1, k2, col1, col2) > >>>>>>>>> x.select(myUdf('w)) // fails because UDF expects (start, end, > >>>>>> rowtime) > >>>>>>>> and > >>>>>>>>> not (start, end, rowtime, newProperty) > >>>>>>>>> > >>>>>>>>> Basically, every time when the composite type 'w is used as a > >>>>> whole. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Am Fr., 23. Nov. 2018 um 10:45 Uhr schrieb jincheng sun < > >>>>>>>>> sunjincheng...@gmail.com>: > >>>>>>>>> > >>>>>>>>>> Hi Fabian, > >>>>>>>>>> > >>>>>>>>>> I don't fully understand the question you mentioned: > >>>>>>>>>> > >>>>>>>>>> Any query that relies on the composite type with three > >> fields > >>>>> will > >>>>>>> fail > >>>>>>>>>> > >>>>>>>>>> after adding a forth field. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I am appreciate if you can give some detail examples ? > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> JIncheng > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午4:41写道: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> My concerns are about the case when there is no > >> additional > >>>>>> select() > >>>>>>>>>> method, > >>>>>>>>>>> i.e., > >>>>>>>>>>> > >>>>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) > >>>>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >>>>>>>>>>> > >>>>>>>>>>> In this case, 'w is a composite field consisting of three > >>>>> fields > >>>>>>>> (end, > >>>>>>>>>>> start, rowtime). > >>>>>>>>>>> Once we add a new property, it would need to be added to > >>> the > >>>>>>>> composite > >>>>>>>>>>> type. > >>>>>>>>>>> Any query that relies on the composite type with three > >>> fields > >>>>>> will > >>>>>>>> fail > >>>>>>>>>>> after adding a forth field. > >>>>>>>>>>> > >>>>>>>>>>> Best, Fabian > >>>>>>>>>>> > >>>>>>>>>>> Am Fr., 23. Nov. 2018 um 02:01 Uhr schrieb jincheng sun < > >>>>>>>>>>> sunjincheng...@gmail.com>: > >>>>>>>>>>> > >>>>>>>>>>>> Thanks Fabian, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks a lot for your feedback, and very important and > >>>>>> necessary > >>>>>>>>> design > >>>>>>>>>>>> reminders! > >>>>>>>>>>>> > >>>>>>>>>>>> Yes, your are right! Spark is the specified grouping > >>>> columns > >>>>>>>>> displayed > >>>>>>>>>>>> before 1.3, but the grouping columns are implicitly > >>> passed > >>>> in > >>>>>>>>> spark1.4 > >>>>>>>>>>> and > >>>>>>>>>>>> later. The reason for changing this behavior is that > >> due > >>> to > >>>>> the > >>>>>>>> user > >>>>>>>>>>>> feedback. Although implicit delivery will have the > >>>> drawbacks > >>>>>> you > >>>>>>>>>>> mentioned, > >>>>>>>>>>>> this approach is really convenient for the user. > >>>>>>>>>>>> I agree that grouping on windows we have to pay > >> attention > >>>> to > >>>>>> the > >>>>>>>>>> handling > >>>>>>>>>>>> of the window's properties, because we may introduce > >> new > >>>>> window > >>>>>>>>>> property. > >>>>>>>>>>>> So, from the points of view, We delay the processing of > >>> the > >>>>>>> window > >>>>>>>>>>>> property, ie: we pass the complex type 'w on the > >>> tableAPI, > >>>>> and > >>>>>>>>> provide > >>>>>>>>>>>> different property method operations in the SELECT > >>>> according > >>>>> to > >>>>>>> the > >>>>>>>>>> type > >>>>>>>>>>> of > >>>>>>>>>>>> 'w, such as: 'w.start, 'w.end, 'w.xxx , in the TableAPI > >>>> will > >>>>>>> limit > >>>>>>>>> and > >>>>>>>>>>>> verify the attribute operations that 'w has. An example > >>> is > >>>> as > >>>>>>>>> follows: > >>>>>>>>>>>> > >>>>>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, > >> 'col2) > >>>> // > >>>>> 'w > >>>>>>> is > >>>>>>>>>>>> composite field > >>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'ts, 'w.xxx as > >> 'xx) > >>>> // > >>>>> In > >>>>>>>>> select > >>>>>>>>>> we > >>>>>>>>>>>> will limit and verify that ’w.xx is allowed > >>>>>>>>>>>> > >>>>>>>>>>>> I am not sure if I fully understand your concerns, if > >>> there > >>>>> any > >>>>>>>>>>> understand > >>>>>>>>>>>> are mistakes, please correct me. Any feedback is > >>>> appreciate! > >>>>>>>>>>>> > >>>>>>>>>>>> Bests, > >>>>>>>>>>>> Jincheng > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月22日周四 > >>>> 下午10:13写道: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>> > >>>>>>>>>>>>> First of all, it is correct that the > >>> flatMap(Expression*) > >>>>> and > >>>>>>>>>>>>> flatAggregate(Expression*) methods would mix scalar > >> and > >>>>> table > >>>>>>>>> values. > >>>>>>>>>>>>> This would be a new concept that is not present in > >> the > >>>>>> current > >>>>>>>> API. > >>>>>>>>>>>>> From my point of view, the semantics are quite clear, > >>>> but I > >>>>>>>>>> understand > >>>>>>>>>>>> that > >>>>>>>>>>>>> others are more careful and worry about future > >>>> extensions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I am fine with going for single expression arguments > >>> for > >>>>>> map() > >>>>>>>> and > >>>>>>>>>>>>> flatMap(). We can later expand them to Expression* if > >>> we > >>>>> feel > >>>>>>> the > >>>>>>>>>> need > >>>>>>>>>>>> and > >>>>>>>>>>>>> are more comfortable about the implications. > >>>>>>>>>>>>> Whenever, a time attribute needs to be forwarded, > >> users > >>>> can > >>>>>>> fall > >>>>>>>>> back > >>>>>>>>>>> to > >>>>>>>>>>>>> join(TableFunction) as Xiaowei mentioned. > >>>>>>>>>>>>> So we restrict the usability of the new methods but > >>> don't > >>>>>> lose > >>>>>>>>>>>>> functionality and don't prevent future extensions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The aggregate() and flatAggregate() case is more > >>>> difficult > >>>>>>>> because > >>>>>>>>>>>> implicit > >>>>>>>>>>>>> forwarding of grouping fields cannot be changed later > >>>>> without > >>>>>>>>>> breaking > >>>>>>>>>>>> the > >>>>>>>>>>>>> API. > >>>>>>>>>>>>> There are other APIs (e.g., Spark) that also > >> implicitly > >>>>>> forward > >>>>>>>> the > >>>>>>>>>>>>> grouping columns. So this is not uncommon. > >>>>>>>>>>>>> However, I personally don't like that approach, > >> because > >>>> it > >>>>> is > >>>>>>>>>> implicit > >>>>>>>>>>>> and > >>>>>>>>>>>>> introduces a new behavior that is not present in the > >>>>> current > >>>>>>> API. > >>>>>>>>>>>>> > >>>>>>>>>>>>> One thing to consider here is the handling of > >> grouping > >>> on > >>>>>>>> windows. > >>>>>>>>>>>>> If I understood Xiaowei correctly, a composite field > >>> that > >>>>> is > >>>>>>>> named > >>>>>>>>>> like > >>>>>>>>>>>> the > >>>>>>>>>>>>> window alias (e.g., 'w) would be implicitly added to > >>> the > >>>>>> result > >>>>>>>> of > >>>>>>>>>>>>> aggregate() or flatAggregate(). > >>>>>>>>>>>>> The composite field would have fields like (start, > >> end, > >>>>>>> rowtime) > >>>>>>>> or > >>>>>>>>>>>> (start, > >>>>>>>>>>>>> end, proctime) depending on the window type. > >>>>>>>>>>>>> If we would ever introduce a fourth window property, > >> we > >>>>> might > >>>>>>>> break > >>>>>>>>>>>>> existing queries. > >>>>>>>>>>>>> Is this something that we should worry about? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Fabian > >>>>>>>>>>>>> > >>>>>>>>>>>>> Am Do., 22. Nov. 2018 um 14:03 Uhr schrieb Piotr > >>>> Nowojski < > >>>>>>>>>>>>> pi...@data-artisans.com>: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Jincheng, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> #1) ok, got it. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> #3) > >>>>>>>>>>>>>>> From points of my view I we can using > >>>>>>>>>>>>>>> `Expression`, and after the discussion decided to > >>> use > >>>>>>>>>> Expression*, > >>>>>>>>>>>> then > >>>>>>>>>>>>>>> improve it. In any case, we can use Expression, > >> and > >>>>> there > >>>>>>> is > >>>>>>>> an > >>>>>>>>>>>>>> opportunity > >>>>>>>>>>>>>>> to become Expression* (compatibility). If we use > >>>>>>> Expression* > >>>>>>>>>>>> directly, > >>>>>>>>>>>>> it > >>>>>>>>>>>>>>> is difficult for us to become Expression, which > >>> will > >>>>>> break > >>>>>>>> the > >>>>>>>>>>>>>>> compatibility between versions. What do you > >> think? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I don’t think that’s the case here. If we start > >> with > >>>>> single > >>>>>>>> param > >>>>>>>>>>>>>> `flatMap(Expression)`, it will need implicit > >> columns > >>> to > >>>>> be > >>>>>>>>> present > >>>>>>>>>> in > >>>>>>>>>>>> the > >>>>>>>>>>>>>> result, which: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> a) IMO it brakes SQL convention (that’s why I’m > >>> against > >>>>>> this) > >>>>>>>>>>>>>> b) we can not later easily introduce > >>>>> `flatMap(Expression*)` > >>>>>>>>> without > >>>>>>>>>>>> those > >>>>>>>>>>>>>> implicit columns, without braking the compatibility > >>> or > >>>> at > >>>>>>> least > >>>>>>>>>>> without > >>>>>>>>>>>>>> making `flatMap(Expression*)` and > >>> `flatMap(Expression)` > >>>>>>>> terribly > >>>>>>>>>>>>>> inconsistent. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> To elaborate on (a). It’s not nice if our own API > >> is > >>>>>>>> inconsistent > >>>>>>>>>> and > >>>>>>>>>>>> it > >>>>>>>>>>>>>> sometimes behaves one way and sometimes another > >> way: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >> table.groupBy(‘k).select(scalarAggregateFunction(‘v)) > >>>> => > >>>>>>> single > >>>>>>>>>>> column > >>>>>>>>>>>>>> result, just the output of > >> `scalarAggregateFunction` > >>>>>>>>>>>>>> vs > >>>>>>>>>>>>>> > >>>>> table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v)) > >>>>>>> => > >>>>>>>>> both > >>>>>>>>>>>>> result > >>>>>>>>>>>>>> of `tableAggregateFunction` plus key (and an > >> optional > >>>>>> window > >>>>>>>>>> context > >>>>>>>>>>> ?) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thus I think we have to now decide which way we > >> want > >>> to > >>>>>> jump, > >>>>>>>>> since > >>>>>>>>>>>> later > >>>>>>>>>>>>>> will be too late. Or again, am I missing something? > >>> :) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 22 Nov 2018, at 02:07, jincheng sun < > >>>>>>>>> sunjincheng...@gmail.com > >>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>>>>>> #1)We have unbounded and bounded group window > >>>>> aggregate, > >>>>>>> for > >>>>>>>>>>>> unbounded > >>>>>>>>>>>>>> case > >>>>>>>>>>>>>>> we should early fire the result with retract > >>> message, > >>>>> we > >>>>>>> can > >>>>>>>>> not > >>>>>>>>>>>> using > >>>>>>>>>>>>>>> watermark, because unbounded aggregate never > >>>> finished. > >>>>>> (for > >>>>>>>>>>>> improvement > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>> can introduce micro-batch in feature), for > >> bounded > >>>>>> window > >>>>>>> we > >>>>>>>>>> never > >>>>>>>>>>>>>> support > >>>>>>>>>>>>>>> early fire, so we do not need retract. > >>>>>>>>>>>>>>> #3) About validation of > >>>> `table.select(F(‘a).unnest(), > >>>>>> ‘b, > >>>>>>>>>>>>>>> G(‘c).unnest())/table.flatMap(F(‘a), ‘b, > >>>> scalarG(‘c))` > >>>>>>> Fabian > >>>>>>>>> had > >>>>>>>>>>>>>> mentioned > >>>>>>>>>>>>>>> above, please look at the prior mail. For > >>>>>>>>> `table.flatMap(F(‘a), > >>>>>>>>>>> ‘b, > >>>>>>>>>>>>>>> scalarG(‘c))` that we concerned, i.e.: we should > >>>>> discuss > >>>>>>> the > >>>>>>>>>> issue > >>>>>>>>>>>> of > >>>>>>>>>>>>>>> `Expression*` vs `Expression`. From points of my > >>>> view I > >>>>>> we > >>>>>>>> can > >>>>>>>>>>> using > >>>>>>>>>>>>>>> `Expression`, and after the discussion decided to > >>> use > >>>>>>>>>> Expression*, > >>>>>>>>>>>> then > >>>>>>>>>>>>>>> improve it. In any case, we can use Expression, > >> and > >>>>> there > >>>>>>> is > >>>>>>>> an > >>>>>>>>>>>>>> opportunity > >>>>>>>>>>>>>>> to become Expression* (compatibility). If we use > >>>>>>> Expression* > >>>>>>>>>>>> directly, > >>>>>>>>>>>>> it > >>>>>>>>>>>>>>> is difficult for us to become Expression, which > >>> will > >>>>>> break > >>>>>>>> the > >>>>>>>>>>>>>>> compatibility between versions. What do you > >> think? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> If there anything not clearly, welcome any > >>>>>>>>> feedback!Agains,thanks > >>>>>>>>>>> for > >>>>>>>>>>>>>> share > >>>>>>>>>>>>>>> your thoughts! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> > >>>>> 于2018年11月21日周三 > >>>>>>>>>> 下午9:37写道: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jincheng, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> #1) No,watermark solves the issue of the late > >>>> event. > >>>>>>> Here, > >>>>>>>>> the > >>>>>>>>>>>>>>>> performance > >>>>>>>>>>>>>>>>> problem is caused by the update emit mode. > >> i.e.: > >>>> When > >>>>>>>> current > >>>>>>>>>>>>>> calculation > >>>>>>>>>>>>>>>>> result is output, the previous calculation > >> result > >>>>> needs > >>>>>>> to > >>>>>>>> be > >>>>>>>>>>>>>> retracted. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hmm, yes I missed this. For time-windowed cases > >>>> (some > >>>>>>>>>>>>>>>> aggregate/flatAggregate cases) emitting only on > >>>>>> watermark > >>>>>>>>> should > >>>>>>>>>>>> solve > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> problem. For non time windowed cases it would > >>> reduce > >>>>> the > >>>>>>>>> amount > >>>>>>>>>> of > >>>>>>>>>>>>>>>> retractions, right? Or am I still missing > >>> something? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> #3)I still hope to keep the simplicity that > >>> select > >>>>> only > >>>>>>>>> support > >>>>>>>>>>>>>> projected > >>>>>>>>>>>>>>>>> scalar, we can hardly tell the semantics of > >>>>>>>>>>> tab.select(flatmap('a), > >>>>>>>>>>>>> 'b, > >>>>>>>>>>>>>>>>> flatmap('d)). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest()) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Could be rejected during some validation phase. > >> On > >>>> the > >>>>>>> other > >>>>>>>>>> hand: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> table.select(F(‘a).unnest(), ‘b, scalarG(‘c)) > >>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>> table.flatMap(F(‘a), ‘b, scalarG(‘c)) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Could work and be more or less a syntax sugar > >> for > >>>>> cross > >>>>>>>> apply. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 21 Nov 2018, at 12:16, jincheng sun < > >>>>>>>>>> sunjincheng...@gmail.com > >>>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi shaoxuan & Hequn, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for your suggestion,I'll file the JIRAs > >>>> later. > >>>>>>>>>>>>>>>>> We can prepare PRs while continuing to move > >>> forward > >>>>> the > >>>>>>>>> ongoing > >>>>>>>>>>>>>>>> discussion. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> jincheng sun <sunjincheng...@gmail.com> > >>>>> 于2018年11月21日周三 > >>>>>>>>>> 下午7:07写道: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>>>>>>>>> Thanks for your feedback, and thanks for > >> share > >>>> your > >>>>>>>>> thoughts! > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> #1) No,watermark solves the issue of the late > >>>> event. > >>>>>>> Here, > >>>>>>>>> the > >>>>>>>>>>>>>>>> performance > >>>>>>>>>>>>>>>>>> problem is caused by the update emit mode. > >> i.e.: > >>>>> When > >>>>>>>>> current > >>>>>>>>>>>>>>>> calculation > >>>>>>>>>>>>>>>>>> result is output, the previous calculation > >>> result > >>>>>> needs > >>>>>>> to > >>>>>>>>> be > >>>>>>>>>>>>>> retracted. > >>>>>>>>>>>>>>>>>> #2) As I mentioned above we should continue > >> the > >>>>>>> discussion > >>>>>>>>>> until > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>> solve > >>>>>>>>>>>>>>>>>> the problems raised by Xiaowei and Fabian. > >>>>>>>>>>>>>>>>>> #3)I still hope to keep the simplicity that > >>> select > >>>>>> only > >>>>>>>>>> support > >>>>>>>>>>>>>>>> projected > >>>>>>>>>>>>>>>>>> scalar, we can hardly tell the semantics of > >>>>>>>>>>>> tab.select(flatmap('a), > >>>>>>>>>>>>>> 'b, > >>>>>>>>>>>>>>>>>> flatmap('d)). > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> > >>>>>> 于2018年11月21日周三 > >>>>>>>>>>> 下午5:24写道: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> In fact, in addition to the design of APIs, > >>>> there > >>>>>> will > >>>>>>>> be > >>>>>>>>>>>> various > >>>>>>>>>>>>>>>>>>>> performance optimization details, such as: > >>> table > >>>>>>>> Aggregate > >>>>>>>>>>>>> function > >>>>>>>>>>>>>>>>>>>> emitValue will generate multiple calculation > >>>>>> results, > >>>>>>> in > >>>>>>>>>>> extreme > >>>>>>>>>>>>>>>> cases, > >>>>>>>>>>>>>>>>>>>> each record will trigger a large number of > >>>> retract > >>>>>>>>> messages, > >>>>>>>>>>>> this > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>> poor performance > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Can this be solved/mitigated by emitting the > >>>>> results > >>>>>>> only > >>>>>>>>> on > >>>>>>>>>>>>>>>> watermarks? > >>>>>>>>>>>>>>>>>>> I think that was the path that we decided to > >>> take > >>>>>> both > >>>>>>>> for > >>>>>>>>>>>> Temporal > >>>>>>>>>>>>>>>> Joins > >>>>>>>>>>>>>>>>>>> and upsert stream conversion. I know that > >> this > >>>>>>> increases > >>>>>>>>> the > >>>>>>>>>>>>> latency > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> there is a place for a future global > >>> setting/user > >>>>>>>>> preference > >>>>>>>>>>>> “emit > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>>>>> ASAP mode”, but emitting only on watermarks > >>> seems > >>>>> to > >>>>>> me > >>>>>>>> as > >>>>>>>>> a > >>>>>>>>>>>>>>>> better/more > >>>>>>>>>>>>>>>>>>> sane default. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> With respect to the API discussion and > >> implicit > >>>>>>> columns. > >>>>>>>>> The > >>>>>>>>>>>>> problem > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> me so far is I’m not sure if I like the > >>>>> additionally > >>>>>>>>>> complexity > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>> `append()` solution, while implicit columns > >> are > >>>>>>>> definitely > >>>>>>>>>> not > >>>>>>>>>>> in > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> spirit of SQL. Neither joins nor aggregations > >>> add > >>>>>> extra > >>>>>>>>>>>> unexpected > >>>>>>>>>>>>>>>> columns > >>>>>>>>>>>>>>>>>>> to the result without asking. This definitely > >>> can > >>>>> be > >>>>>>>>>> confusing > >>>>>>>>>>>> for > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> users since it brakes the convention. Thus I > >>>> would > >>>>>> lean > >>>>>>>>>> towards > >>>>>>>>>>>>>>>> Fabian’s > >>>>>>>>>>>>>>>>>>> proposal of multi-argument `map(Expression*)` > >>>> from > >>>>>>> those > >>>>>>>> 3 > >>>>>>>>>>>> options. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Another topic is that I’m not 100% convinced > >>> that > >>>>> we > >>>>>>>> should > >>>>>>>>>> be > >>>>>>>>>>>>> adding > >>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>> api functions for `map`,`aggregate`,`flatMap` > >>> and > >>>>>>>>>>>> `flatAggregate`. > >>>>>>>>>>>>> I > >>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>> the same could be achieved by changing > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> table.map(F('x)) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> into > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> table.select(F('x)).unnest() > >>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>> table.select(F('x).unnest()) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Where `unnest()` means unnest row/tuple type > >>>> into a > >>>>>>>>> columnar > >>>>>>>>>>>> table. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> table.flatMap(F('x)) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Could be on the other hand also handled by > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> table.select(F('x)) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> By correctly deducing that F(x) is a multi > >> row > >>>>> output > >>>>>>>>>> function > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Same might apply to `aggregate(F('x))`, but > >>> this > >>>>>> maybe > >>>>>>>>> could > >>>>>>>>>> be > >>>>>>>>>>>>>>>> replaced > >>>>>>>>>>>>>>>>>>> by: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> table.groupBy(…).select(F('x).unnest()) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Adding scalar functions should also be > >>> possible: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> table.groupBy('k).select(F('x).unnest(), ‘k) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Maybe such approach would allow us to > >> implement > >>>> the > >>>>>>> same > >>>>>>>>>>> features > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> SQL as well? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng < > >>>>>>>>> chenghe...@gmail.com > >>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thank you all for the great proposal and > >>>>> discussion! > >>>>>>>>>>>>>>>>>>>> I also prefer to move on to the next step, > >> so > >>> +1 > >>>>> for > >>>>>>>>> opening > >>>>>>>>>>> the > >>>>>>>>>>>>>> JIRAs > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> start the work. > >>>>>>>>>>>>>>>>>>>> We can have more detailed discussion there. > >>> Btw, > >>>>> we > >>>>>>> can > >>>>>>>>>> start > >>>>>>>>>>>> with > >>>>>>>>>>>>>>>> JIRAs > >>>>>>>>>>>>>>>>>>>> which we have agreed on. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>> Hequn > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan > >>> Wang < > >>>>>>>>>>>>> wshaox...@gmail.com > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> +1. I agree that we should open the JIRAs > >> to > >>>>> start > >>>>>>> the > >>>>>>>>>> work. > >>>>>>>>>>> We > >>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>>>> have better ideas on the flavor of the > >>>> interface > >>>>>> when > >>>>>>>>>>>>>>>> implement/review > >>>>>>>>>>>>>>>>>>>>> the code. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>> shaoxuan > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 11/20/18, jincheng sun < > >>>>>> sunjincheng...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks all for the feedback. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> @Piotr About not using abbreviations > >> naming, > >>>>> +1,I > >>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>> your proposal!Currently both DataSet and > >>>>>> DataStream > >>>>>>>> API > >>>>>>>>>> are > >>>>>>>>>>>>> using > >>>>>>>>>>>>>>>>>>>>>> `aggregate`, > >>>>>>>>>>>>>>>>>>>>>> BTW,I find other language also not using > >>>>>>> abbreviations > >>>>>>>>>>>>> naming,such > >>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>> R. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Sometimes the interface of the API is > >> really > >>>>>>> difficult > >>>>>>>>> to > >>>>>>>>>>>>> perfect, > >>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>>>> to spend a lot of time thinking and > >> feedback > >>>>> from > >>>>>> a > >>>>>>>>> large > >>>>>>>>>>>> number > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>> users, > >>>>>>>>>>>>>>>>>>>>>> and constantly improve, but for backward > >>>>>>> compatibility > >>>>>>>>>>> issues, > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> have to > >>>>>>>>>>>>>>>>>>>>>> adopt the most conservative approach when > >>>>>> designing > >>>>>>>> the > >>>>>>>>>>> API(Of > >>>>>>>>>>>>>>>>>>> course, I > >>>>>>>>>>>>>>>>>>>>> am > >>>>>>>>>>>>>>>>>>>>>> more in favor of developing more rich > >>>> features, > >>>>>> when > >>>>>>>> we > >>>>>>>>>>>> discuss > >>>>>>>>>>>>>>>>>>> clearly). > >>>>>>>>>>>>>>>>>>>>>> Therefore, I propose to divide the > >> function > >>>>>>>>> implementation > >>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> map/faltMap/agg/flatAgg into basic > >> functions > >>>> of > >>>>>>> JIRAs > >>>>>>>>> and > >>>>>>>>>>>> JIRAs > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>> support time attributes and groupKeys. We > >>> can > >>>>>>> develop > >>>>>>>>> the > >>>>>>>>>>>>> features > >>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>> we have already agreed on the design. And > >>> we > >>>>> will > >>>>>>>>>> continue > >>>>>>>>>>> to > >>>>>>>>>>>>>>>> discuss > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> uncertain design. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> In fact, in addition to the design of > >> APIs, > >>>>> there > >>>>>>> will > >>>>>>>>> be > >>>>>>>>>>>>> various > >>>>>>>>>>>>>>>>>>>>>> performance optimization details, such as: > >>>> table > >>>>>>>>> Aggregate > >>>>>>>>>>>>>> function > >>>>>>>>>>>>>>>>>>>>>> emitValue will generate multiple > >> calculation > >>>>>>> results, > >>>>>>>> in > >>>>>>>>>>>> extreme > >>>>>>>>>>>>>>>>>>> cases, > >>>>>>>>>>>>>>>>>>>>>> each record will trigger a large number of > >>>>> retract > >>>>>>>>>> messages, > >>>>>>>>>>>>> this > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>> poor performance,so we will also optimize > >>> the > >>>>>>>> interface > >>>>>>>>>>>> design, > >>>>>>>>>>>>>> such > >>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>> adding the emitWithRetractValue interface > >> (I > >>>>> have > >>>>>>>>> updated > >>>>>>>>>>> the > >>>>>>>>>>>>>> google > >>>>>>>>>>>>>>>>>>> doc) > >>>>>>>>>>>>>>>>>>>>>> to allow the user to optionally perform > >>>>>> incremental > >>>>>>>>>>>>> calculations, > >>>>>>>>>>>>>>>> thus > >>>>>>>>>>>>>>>>>>>>>> avoiding a large number of retracts. > >> Details > >>>>> like > >>>>>>> this > >>>>>>>>> are > >>>>>>>>>>>>>> difficult > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> fully discuss in the mail list, so I > >>> recommend > >>>>>>>> creating > >>>>>>>>>>>>> JIRAs/FLIP > >>>>>>>>>>>>>>>>>>> first, > >>>>>>>>>>>>>>>>>>>>>> we develop designs that have been agreed > >>> upon > >>>>> and > >>>>>>>>> continue > >>>>>>>>>>> to > >>>>>>>>>>>>>>>> discuss > >>>>>>>>>>>>>>>>>>>>>> non-deterministic designs! What do you > >>> think? > >>>>>>>> @Fabian & > >>>>>>>>>>>> Piotr & > >>>>>>>>>>>>>>>>>>> XiaoWei > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> > >>>>> 于2018年11月19日周一 > >>>>>>>>>>> 上午12:07写道: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi Fabian & Piotr, thanks for the > >> feedback! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I appreciate your concerns, both on > >>> timestamp > >>>>>>>>> attributes > >>>>>>>>>> as > >>>>>>>>>>>>> well > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>> implicit group keys. At the same time, > >> I'm > >>>> also > >>>>>>>>> concerned > >>>>>>>>>>>> with > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> proposed > >>>>>>>>>>>>>>>>>>>>>>> approach of allowing Expression* as > >>>> parameters, > >>>>>>>>>> especially > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>> flatMap/flatAgg. So far, we never > >> allowed a > >>>>>> scalar > >>>>>>>>>>> expression > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> appear > >>>>>>>>>>>>>>>>>>>>>>> together with table expressions. With the > >>>>>>> Expression* > >>>>>>>>>>>> approach, > >>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>> happen for the parameters to > >>> flatMap/flatAgg. > >>>>>> I'm a > >>>>>>>> bit > >>>>>>>>>>>>> concerned > >>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>> fully understand the consequences when we > >>> try > >>>>> to > >>>>>>>> extend > >>>>>>>>>> our > >>>>>>>>>>>>>> system > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> future. I would be extra cautious in > >> doing > >>>>> this. > >>>>>> To > >>>>>>>>> avoid > >>>>>>>>>>>>> this, I > >>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>> implicit group key for flatAgg is safer. > >>> For > >>>>>>> flatMap, > >>>>>>>>> if > >>>>>>>>>>>> users > >>>>>>>>>>>>>> want > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> keep > >>>>>>>>>>>>>>>>>>>>>>> the rowtime column, he can use > >>>> crossApply/join > >>>>>>>> instead. > >>>>>>>>>> So > >>>>>>>>>>> we > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>> losing any real functionality here. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Also a clarification on the following > >>>> example: > >>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a > >>>> group > >>>>>>> key. > >>>>>>>>>>>>>>>>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, > >>>> 'col1, > >>>>>>>> 'col2) > >>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as > >> 'rtime) > >>>>>>>>>>>>>>>>>>>>>>> If we did not have the select clause in > >>> this > >>>>>>> example, > >>>>>>>>> we > >>>>>>>>>>> will > >>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>> 'w as > >>>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>> regular column in the output. It should > >> not > >>>>>>> magically > >>>>>>>>>>>>> disappear. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> The concern is not as strong for > >>>>>>> Table.map/Table.agg > >>>>>>>>>>> because > >>>>>>>>>>>> we > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>> mixing scalar and table expressions. But > >> we > >>>>> also > >>>>>>> want > >>>>>>>>> to > >>>>>>>>>>> be a > >>>>>>>>>>>>> bit > >>>>>>>>>>>>>>>>>>>>>>> consistent with these methods. If we used > >>>>>> implicit > >>>>>>>>> group > >>>>>>>>>>> keys > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>> Table.flatAgg, we probably should do the > >>> same > >>>>> for > >>>>>>>>>>> Table.agg. > >>>>>>>>>>>>> Now > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>>> have to choose what to do with > >> Table.map. I > >>>> can > >>>>>> see > >>>>>>>>> good > >>>>>>>>>>>>>> arguments > >>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>> both sides. But starting with a single > >>>>> Expression > >>>>>>>> seems > >>>>>>>>>>> safer > >>>>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> we can always extend to Expression* in > >> the > >>>>>> future. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> While thinking about this problem, it > >>> appears > >>>>>> that > >>>>>>> we > >>>>>>>>> may > >>>>>>>>>>>> need > >>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>> work > >>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>> our handling of watermarks for SQL/Table > >>> API. > >>>>> Our > >>>>>>>>> current > >>>>>>>>>>> way > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>> propagating the watermarks from source > >> all > >>>> the > >>>>>> way > >>>>>>> to > >>>>>>>>>> sink > >>>>>>>>>>>>> might > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>> optimal. For example, after a tumbling > >>>> window, > >>>>>> the > >>>>>>>>>>> watermark > >>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>> actually > >>>>>>>>>>>>>>>>>>>>>>> be advanced to just before the expiring > >> of > >>>> next > >>>>>>>>> window. I > >>>>>>>>>>>> think > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>> general, each operator may need to > >> generate > >>>> new > >>>>>>>>>> watermarks > >>>>>>>>>>>>>> instead > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>> simply propagating them. Once we accept > >>> that > >>>>>>>> watermarks > >>>>>>>>>> may > >>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>>>> during > >>>>>>>>>>>>>>>>>>>>>>> the execution, it appears that the > >>> timestamp > >>>>>>> columns > >>>>>>>>> may > >>>>>>>>>>> also > >>>>>>>>>>>>>>>>>>> change, as > >>>>>>>>>>>>>>>>>>>>>>> long as we have some way to associate > >>>> watermark > >>>>>>> with > >>>>>>>>> it. > >>>>>>>>>> My > >>>>>>>>>>>>>>>>>>> intuition is > >>>>>>>>>>>>>>>>>>>>>>> that once we have a through solution for > >>> the > >>>>>>>> watermark > >>>>>>>>>>> issue, > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>> able to solve the problem we encountered > >>> for > >>>>>>>> Table.map > >>>>>>>>>> in a > >>>>>>>>>>>>>> cleaner > >>>>>>>>>>>>>>>>>>> way. > >>>>>>>>>>>>>>>>>>>>>>> But this is a complex issue which > >> deserves > >>> a > >>>>>>>> discussion > >>>>>>>>>> on > >>>>>>>>>>>> its > >>>>>>>>>>>>>> own. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>> Xiaowei > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr > >>>>> Nowojski < > >>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Isn’t the problem of multiple > >> expressions > >>>>>> limited > >>>>>>>> only > >>>>>>>>>> to > >>>>>>>>>>>>>>>> `flat***` > >>>>>>>>>>>>>>>>>>>>>>>> functions and to be more specific only > >> to > >>>>> having > >>>>>>> two > >>>>>>>>> (or > >>>>>>>>>>>> more) > >>>>>>>>>>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>>>>> table functions passed as an > >> expressions? > >>>>>>>>>>>>>> `.flatAgg(TableAggA('a), > >>>>>>>>>>>>>>>>>>>>>>>> scalarFunction1(‘b), > >> scalarFunction2(‘c))` > >>>>> seems > >>>>>>> to > >>>>>>>> be > >>>>>>>>>>> well > >>>>>>>>>>>>>>>> defined > >>>>>>>>>>>>>>>>>>>>>>>> (duplicate result of every scalar > >> function > >>>> to > >>>>>>> every > >>>>>>>>>>> record. > >>>>>>>>>>>> Or > >>>>>>>>>>>>>> am > >>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>> missing > >>>>>>>>>>>>>>>>>>>>>>>> something? > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Another remark, I would be in favour of > >>> not > >>>>>> using > >>>>>>>>>>>>> abbreviations > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>> naming > >>>>>>>>>>>>>>>>>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> > >>>>>>> `flatAggregate`. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian > >> Hueske < > >>>>>>>>>>> fhue...@gmail.com > >>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I said before, that I think that the > >>>> append() > >>>>>>>> method > >>>>>>>>> is > >>>>>>>>>>>>> better > >>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>>>>>>>> implicitly forwarding keys, but still, > >> I > >>>>>> believe > >>>>>>> it > >>>>>>>>>> adds > >>>>>>>>>>>>>>>>>>> unnecessary > >>>>>>>>>>>>>>>>>>>>>>>> boiler > >>>>>>>>>>>>>>>>>>>>>>>>> plate code. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Moreover, I haven't seen a convincing > >>>>> argument > >>>>>>> why > >>>>>>>>>>>>>>>> map(Expression*) > >>>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>> worse than map(Expression). In either > >>> case > >>>> we > >>>>>>> need > >>>>>>>> to > >>>>>>>>>> do > >>>>>>>>>>>> all > >>>>>>>>>>>>>>>> kinds > >>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>> checks to prevent invalid use of > >>> functions. > >>>>>>>>>>>>>>>>>>>>>>>>> If the method is not correctly used, we > >>> can > >>>>>> emit > >>>>>>> a > >>>>>>>>> good > >>>>>>>>>>>> error > >>>>>>>>>>>>>>>>>>>>> message > >>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>> documenting map(Expression*) will be > >>> easier > >>>>>> than > >>>>>>>>>>>>>>>>>>>>>>>> map(append(Expression*)), > >>>>>>>>>>>>>>>>>>>>>>>>> in my opinion. > >>>>>>>>>>>>>>>>>>>>>>>>> I think we should not add unnessary > >>> syntax > >>>>>> unless > >>>>>>>>> there > >>>>>>>>>>> is > >>>>>>>>>>>> a > >>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>>>>> reason > >>>>>>>>>>>>>>>>>>>>>>>>> and to be honest, I haven't seen this > >>>> reason > >>>>>> yet. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Regarding the groupBy.agg() method, I > >>> think > >>>>> it > >>>>>>>> should > >>>>>>>>>>>> behave > >>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>>>>>>>>> other method, i.e., not do any implicit > >>>>>>> forwarding. > >>>>>>>>>>>>>>>>>>>>>>>>> Let's take the example of the windowed > >>>> group > >>>>>> by, > >>>>>>>> that > >>>>>>>>>> you > >>>>>>>>>>>>>> posted > >>>>>>>>>>>>>>>>>>>>>>> before. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be > >> a > >>>>> group > >>>>>>> key. > >>>>>>>>>>>>>>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, > >>>> 'col2) > >>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as > >> 'rtime) > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> What happens if 'w.rowtime is not > >>> selected? > >>>>>> What > >>>>>>> is > >>>>>>>>> the > >>>>>>>>>>>> data > >>>>>>>>>>>>>> type > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> field 'w in the resulting Table? Is it > >> a > >>>>>> regular > >>>>>>>>> field > >>>>>>>>>> at > >>>>>>>>>>>> all > >>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>> system field that disappears if it is > >> not > >>>>>>> selected? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> IMO, the following syntax is shorter, > >>> more > >>>>>>>> explicit, > >>>>>>>>>> and > >>>>>>>>>>>>> better > >>>>>>>>>>>>>>>>>>>>>>>>> aligned > >>>>>>>>>>>>>>>>>>>>>>>>> with the regular window.groupBy.select > >>>>>>> aggregations > >>>>>>>>>> that > >>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>>> supported > >>>>>>>>>>>>>>>>>>>>>>>>> today. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be > >> a > >>>>> group > >>>>>>> key. > >>>>>>>>>>>>>>>>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, > >>>> agg('a)) > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr > >>> schrieb > >>>>>>> jincheng > >>>>>>>>>> sun < > >>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian/Xiaowei, > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I am very sorry for my late reply! > >> Glad > >>> to > >>>>> see > >>>>>>>> your > >>>>>>>>>>> reply, > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>> sounds > >>>>>>>>>>>>>>>>>>>>>>>>>> pretty good! > >>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the approach with > >> append() > >>>>> which > >>>>>>> can > >>>>>>>>>>> clearly > >>>>>>>>>>>>>>>> defined > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> result schema is better which Fabian > >>>>>> mentioned. > >>>>>>>>>>>>>>>>>>>>>>>>>> In addition and append() and also > >>> contains > >>>>>>>> non-time > >>>>>>>>>>>>>> attributes, > >>>>>>>>>>>>>>>>>>>>>>>>>> e.g.: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime) > >>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(append(udf('name), 'address, > >>>>>>>>>> 'rowtime).as('col1, > >>>>>>>>>>>>>> 'col2, > >>>>>>>>>>>>>>>>>>>>>>>>>> 'address, 'rowtime) > >>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on > >> 'rowtime > >>>> as > >>>>>> 'w) > >>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'address) > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> In this way the append() is very > >> useful, > >>>> and > >>>>>> the > >>>>>>>>>>> behavior > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>>> similar > >>>>>>>>>>>>>>>>>>>>>>>>>> to withForwardedFields() in DataSet. > >>>>>>>>>>>>>>>>>>>>>>>>>> So +1 to using append() approach for > >> the > >>>>>>>>>>> map()&flatmap()! > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> But how about the agg() and flatAgg()? > >>> In > >>>>>>>>> agg/flatAgg > >>>>>>>>>>>> case I > >>>>>>>>>>>>>>>> agree > >>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei's approach that define the > >> keys > >>> to > >>>>> be > >>>>>>>>> implied > >>>>>>>>>> in > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> result > >>>>>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>> and appears at the beginning, for > >>> example > >>>> as > >>>>>>>>> follows: > >>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should > >> be a > >>>>> group > >>>>>>>> key. > >>>>>>>>>>>>>>>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, > >>>> 'col2) > >>>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as > >>> 'rtime) > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> What to you think? @Fabian @Xiaowei > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> > >>>>>> 于2018年11月9日周五 > >>>>>>>>>>> 下午6:35写道: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng, > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the summary! > >>>>>>>>>>>>>>>>>>>>>>>>>>> I like the approach with append() > >>> better > >>>>> than > >>>>>>> the > >>>>>>>>>>>> implicit > >>>>>>>>>>>>>>>>>>>>>>>>>>> forwarding > >>>>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>>> clearly indicates which fields are > >>>>> forwarded. > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, I don't see much benefit > >> over > >>>> the > >>>>>>>>>>>>>> flatMap(Expression*) > >>>>>>>>>>>>>>>>>>>>>>>> variant, > >>>>>>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>> we would still need to analyze the > >> full > >>>>>>>> expression > >>>>>>>>>> tree > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>>>>>>>>>>> most (or exactly?) one Scalar / > >>>>> TableFunction > >>>>>>> is > >>>>>>>>>> used. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>> Fabian > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr > >>> schrieb > >>>>>>>> jincheng > >>>>>>>>>> sun > >>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> We are discussing very detailed > >>> content > >>>>>> about > >>>>>>>> this > >>>>>>>>>>>>> proposal. > >>>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>>>>> trying > >>>>>>>>>>>>>>>>>>>>>>>>>>>> to design the API in many aspects > >>>>>>>> (functionality, > >>>>>>>>>>>>>>>> compatibility, > >>>>>>>>>>>>>>>>>>>>>>> ease > >>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>> use, etc.). I think this is a very > >>> good > >>>>>>> process. > >>>>>>>>>> Only > >>>>>>>>>>>>> such a > >>>>>>>>>>>>>>>>>>>>>>> detailed > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion, In order to develop PR > >>> more > >>>>>>> clearly > >>>>>>>>> and > >>>>>>>>>>>>> smoothly > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>>>>>>>>>>>>> stage. I am very grateful to @Fabian > >>> and > >>>>>>>> @Xiaowei > >>>>>>>>>> for > >>>>>>>>>>>>>>>> sharing a > >>>>>>>>>>>>>>>>>>>>>>>>>>>> lot > >>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>> good ideas. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> About the definition of method > >>>> signatures > >>>>> I > >>>>>>> want > >>>>>>>>> to > >>>>>>>>>>>> share > >>>>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>>>>>>>>>> points > >>>>>>>>>>>>>>>>>>>>>>>>>> here > >>>>>>>>>>>>>>>>>>>>>>>>>>>> which I am discussing with fabian in > >>>>> google > >>>>>>> doc > >>>>>>>>> (not > >>>>>>>>>>> yet > >>>>>>>>>>>>>>>>>>>>>>>>>>>> completed), > >>>>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>>> follows: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Assume we have a table: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> val tab = util.addTable[(Long, > >>>>>>>> String)]("MyTable", > >>>>>>>>>>>> 'long, > >>>>>>>>>>>>>>>>>>>>> 'string, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'proctime.proctime) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Approach 1: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> case1: Map follows Source Table > >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result = > >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(udf('string)).as('proctime, > >>>> 'col1, > >>>>>>>>> 'col2)// > >>>>>>>>>>>>> proctime > >>>>>>>>>>>>>>>>>>>>>>> implied > >>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the output > >>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on > >>>> 'proctime > >>>>> as > >>>>>>> 'w) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian > >>>>>> mentioned > >>>>>>>>>> above) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result = > >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w > >> should > >>>> be > >>>>> a > >>>>>>>> group > >>>>>>>>>> key. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, > >>> 'w, > >>>>>>> 'col1, > >>>>>>>>>> 'col2) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as > >>>>> 'rtime) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s > >>>> approach, > >>>>>>> which > >>>>>>>>> the > >>>>>>>>>>>> result > >>>>>>>>>>>>>>>>>>> schema > >>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>>> clearly defined, but add a built-in > >>>> append > >>>>>>> UDF. > >>>>>>>>> That > >>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface > >> only > >>>>>> accept > >>>>>>>> one > >>>>>>>>>>>>>> Expression. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result = > >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(append(udf('string), 'long, > >>>>>>> 'proctime)) > >>>>>>>> as > >>>>>>>>>>>> ('col1, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'col2, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'long, 'proctime) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on > >>>> 'proctime > >>>>>> as > >>>>>>>> 'w) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Note: Append is a special UDF for > >>>> built-in > >>>>>>> that > >>>>>>>>> can > >>>>>>>>>>> pass > >>>>>>>>>>>>>>>> through > >>>>>>>>>>>>>>>>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>>>>>>>>>>>> column. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> So, May be we can defined the as > >>>>>>>>>>> table.map(Expression) > >>>>>>>>>>>>>>>> first, > >>>>>>>>>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>>>>>>>>>>> necessary, we can extend to > >>>>>>>> table.map(Expression*) > >>>>>>>>>> in > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> future > >>>>>>>>>>>>>>>>>>>>>>>>>>>> ? > >>>>>>>>>>>>>>>>>>>>>>>> Of > >>>>>>>>>>>>>>>>>>>>>>>>>>>> course, I also hope that we can do > >>> more > >>>>>>>> perfection > >>>>>>>>>> in > >>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal > >>>>>>>>>>>>>>>>>>>>>>>>>>> through > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> > >>>>>>>> 于2018年11月7日周三 > >>>>>>>>>>>>> 下午11:45写道: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that the key question you > >>>> raised > >>>>> is > >>>>>>> if > >>>>>>>> we > >>>>>>>>>>> allow > >>>>>>>>>>>>>> extra > >>>>>>>>>>>>>>>>>>>>>>>>>>> parameters > >>>>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the methods > >> map/flatMap/agg/flatAgg. > >>> I > >>>>> can > >>>>>>> see > >>>>>>>>> why > >>>>>>>>>>>>> allowing > >>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>>>>>>>>>>> appear > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> more convenient in some cases. > >>> However, > >>>>> it > >>>>>>>> might > >>>>>>>>>> also > >>>>>>>>>>>>> cause > >>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>>>>> confusions > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we do that. For example, do we > >>> allow > >>>>>>>> multiple > >>>>>>>>>> UDFs > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>>>>>>>>> expressions? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we do, the semantics may be > >> weird > >>> to > >>>>>>> define, > >>>>>>>>>> e.g. > >>>>>>>>>>>> what > >>>>>>>>>>>>>>>> does > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> table.groupBy('k).flatAgg(TableAggA('a), > >>>>>>>>>>> TableAggB('b)) > >>>>>>>>>>>>>> mean? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even > >>>>>>>>>>>>>>>>>>>>>>>>>>> though > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not allowing it may appear less > >>>> powerful, > >>>>>> but > >>>>>>>> it > >>>>>>>>>> can > >>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>>> things > >>>>>>>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> intuitive too. In the case of > >>>>> agg/flatAgg, > >>>>>> we > >>>>>>>> can > >>>>>>>>>>>> define > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> keys > >>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implied in the result table and > >>> appears > >>>>> at > >>>>>>> the > >>>>>>>>>>>> beginning. > >>>>>>>>>>>>>> You > >>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>> use a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> select method if you want to modify > >>>> this > >>>>>>>>> behavior. > >>>>>>>>>> I > >>>>>>>>>>>>> think > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>> eventually > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we will have some API which allows > >>>> other > >>>>>>>>>> expressions > >>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters, but I think it's better > >>> to > >>>> do > >>>>>>> that > >>>>>>>>>> after > >>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>> introduce > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> concept of nested tables. A lot of > >>>> things > >>>>>> we > >>>>>>>>>>> suggested > >>>>>>>>>>>>> here > >>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> considered as special cases of > >> that. > >>>> But > >>>>>>> things > >>>>>>>>> are > >>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>>> simpler > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave that to later. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM > >> Fabian > >>>>>> Hueske > >>>>>>> < > >>>>>>>>>>>>>>>>>>> fhue...@gmail.com > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re emit: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think we should start with a > >> well > >>>>>>> understood > >>>>>>>>>>>> semantics > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>> full > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replacement. This is how the other > >>> agg > >>>>>>>> functions > >>>>>>>>>>> work. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As was said before, there are open > >>>>>> questions > >>>>>>>>>>> regarding > >>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>> append > >>>>>>>>>>>>>>>>>>>>>>>>>> mode > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (checkpointing, whether supporting > >>>>>>> retractions > >>>>>>>>> or > >>>>>>>>>>> not > >>>>>>>>>>>>> and > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>> yes > >>>>>>>>>>>>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare them, ...). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Since this seems to be an > >>>> optimization, > >>>>>> I'd > >>>>>>>>>> postpone > >>>>>>>>>>>> it. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re grouping keys: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we should > >>> automatically > >>>>> add > >>>>>>> them > >>>>>>>>>>> because > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> result > >>>>>>>>>>>>>>>>>>>>>>>>>>>> schema > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would not be intuitive. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would they be added at the > >> beginning > >>>> of > >>>>>> the > >>>>>>>>> tuple > >>>>>>>>>> or > >>>>>>>>>>>> at > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> end? > >>>>>>>>>>>>>>>>>>>>>>>>>> What > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields of windows would > >> be > >>>>> added? > >>>>>>> In > >>>>>>>>>> which > >>>>>>>>>>>>> order > >>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, we could support syntax > >>> like > >>>>>> this: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val t: Table = ??? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> t > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble ... as 'w) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('a, 'b) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), > >>>> 'w.end > >>>>> as > >>>>>>>>> 'wend, > >>>>>>>>>>>>>> 'w.rowtime > >>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'rtime) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The result schema would be clearly > >>>>> defined > >>>>>>> as > >>>>>>>>> [b, > >>>>>>>>>> a, > >>>>>>>>>>>> f1, > >>>>>>>>>>>>>> f2, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ..., > >>>>>>>>>>>>>>>>>>>>>>>>>> fn, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wend, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the > >>> result > >>>>>>>>> attributes > >>>>>>>>>> of > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> UDF. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re Multi-staged evaluation: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think this should be an > >>> optimization > >>>>>> that > >>>>>>>> can > >>>>>>>>> be > >>>>>>>>>>>>> applied > >>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> UDF > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implements the merge() method. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr > >>>>> schrieb > >>>>>>>>> Shaoxuan > >>>>>>>>>>>> Wang > >>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wshaox...@gmail.com > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> : > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi xiaowei, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I agree with you that the > >>>>> semantics > >>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>> TableAggregateFunction > >>>>>>>>>>>>>>>>>>>>>>>>>>>> emit > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> much more complex than > >>>>> AggregateFunction. > >>>>>>> The > >>>>>>>>>>>>> fundamental > >>>>>>>>>>>>>>>>>>>>>>>>>>> difference > >>>>>>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that TableAggregateFunction > >> emits a > >>>>>> "table" > >>>>>>>>> while > >>>>>>>>>>>>>>>>>>>>>>>>>> AggregateFunction > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outputs > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (a column of) a "row". In the > >> case > >>> of > >>>>>>>>>>>> AggregateFunction > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mode which is “replacing” > >> (complete > >>>>>>> update). > >>>>>>>>> But > >>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableAggregateFunction, it could > >> be > >>>>>>>> incremental > >>>>>>>>>>> (only > >>>>>>>>>>>>>> emit > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> updated > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results) update or complete > >> update > >>>>>> (always > >>>>>>>> emit > >>>>>>>>>> the > >>>>>>>>>>>>>> entire > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “emit" is triggered). From the > >>>>>> performance > >>>>>>>>>>>>> perspective, > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>>>>>>>>>>> want > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use incremental update. But we > >> need > >>>>>> review > >>>>>>>> and > >>>>>>>>>>> design > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>> carefully, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> especially taking into account > >> the > >>>>> cases > >>>>>> of > >>>>>>>> the > >>>>>>>>>>>>> failover > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (instead > >>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> back-up the ACC it may also needs > >>> to > >>>>>>> remember > >>>>>>>>> the > >>>>>>>>>>>> emit > >>>>>>>>>>>>>>>>>>> offset) > >>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> retractions, as the semantics of > >>>>>>>>>>>> TableAggregateFunction > >>>>>>>>>>>>>>>> emit > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> than other UDFs. TableFunction > >> also > >>>>>> emits a > >>>>>>>>>> table, > >>>>>>>>>>>> but > >>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>> does > >>>>>>>>>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> worry this due to the nature of > >>>>>> stateless. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shaoxuan > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM > >>>> Xiaowei > >>>>>>> Jiang > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <xiaow...@gmail.com > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for adding the public > >>>>>> interfaces! I > >>>>>>>>> think > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>> it's a > >>>>>>>>>>>>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> start. There are a few points > >> that > >>>> we > >>>>>> need > >>>>>>>> to > >>>>>>>>>> have > >>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - TableAggregateFunction - this > >>> is a > >>>>>> very > >>>>>>>>>> complex > >>>>>>>>>>>>> beast, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> definitely > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most complex user defined > >> objects > >>> we > >>>>>>>>> introduced > >>>>>>>>>> so > >>>>>>>>>>>>> far. > >>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite some interesting questions > >>>> here. > >>>>>> For > >>>>>>>>>>> example, > >>>>>>>>>>>> do > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multi-staged TableAggregate in > >>> this > >>>>>> case? > >>>>>>>> What > >>>>>>>>>> is > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>> semantics > >>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit? Is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it amendments to the previous > >>>> output, > >>>>> or > >>>>>>>>>> replacing > >>>>>>>>>>>>> it? I > >>>>>>>>>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> subject itself is worth a > >>> discussion > >>>>> to > >>>>>>> make > >>>>>>>>>> sure > >>>>>>>>>>> we > >>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> details > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the > >>> group > >>>>> keys > >>>>>>>>>>>> automatically > >>>>>>>>>>>>>>>>>>>>>>>>>> appear > >>>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output? how about the case of > >>>>> windowing > >>>>>>>>>>> aggregation? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM > >>>>> jincheng > >>>>>>> sun > >>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, Xiaowei, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss > >>> of > >>>>>> Table > >>>>>>>> API > >>>>>>>>>>>>>> Enhancement > >>>>>>>>>>>>>>>>>>>>>>>>>>> Outline > >>>>>>>>>>>>>>>>>>>>>>>>>>>> ! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I quickly looked at the overall > >>>>>> content, > >>>>>>>>> these > >>>>>>>>>>> are > >>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>>>>>>>>>> expressions > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offline discussions. But from > >> the > >>>>>> points > >>>>>>> of > >>>>>>>>> my > >>>>>>>>>>>> view, > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>>>>> add > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> usage of public interfaces that > >>> we > >>>>> will > >>>>>>>>>> introduce > >>>>>>>>>>>> in > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>> propose. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added the following usage > >>>> description > >>>>>> of > >>>>>>>>>>> interface > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>> operators > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> google doc: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Map Operator > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map operator is a new operator > >> of > >>>>>> Table, > >>>>>>>> Map > >>>>>>>>>>>> operator > >>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scalar function, and can return > >>>>>>>> multi-column. > >>>>>>>>>> The > >>>>>>>>>>>>> usage > >>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>>> follows: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .map(fun: > >> ScalarFunction).as(‘a, > >>>> ‘b, > >>>>>> ‘c) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. FlatMap Operator > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FaltMap operator is a new > >>> operator > >>>> of > >>>>>>>> Table, > >>>>>>>>>>>> FlatMap > >>>>>>>>>>>>>>>>>>>>>>>>>>> operator > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a table function, and can > >> return > >>>>>>> multi-row. > >>>>>>>>> The > >>>>>>>>>>>> usage > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>> follows: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatMap(fun: > >>>> TableFunction).as(‘a, > >>>>>> ‘b, > >>>>>>>> ‘c) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Agg Operator > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Agg operator is a new operator > >> of > >>>>>>>>>>>> Table/GroupedTable, > >>>>>>>>>>>>>>>> Agg > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply a aggregate function, and > >>> can > >>>>>>> return > >>>>>>>>>>>>>> multi-column. > >>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>>>>>>> usage > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> follows: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave > >>>>> groupBy-Clause > >>>>>>> out > >>>>>>>> to > >>>>>>>>>>>> define > >>>>>>>>>>>>>>>>>>>>>>>>>> global > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .agg(fun: > >>>> AggregateFunction).as(‘a, > >>>>>> ‘b, > >>>>>>>> ‘c) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. FlatAgg Operator > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FlatAgg operator is a new > >>> operator > >>>> of > >>>>>>>>>>>>>>>> Table/GroupedTable, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> FaltAgg > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator can apply a table > >>>> aggregate > >>>>>>>>> function, > >>>>>>>>>>> and > >>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multi-row. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The usage as follows: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave > >>>>> groupBy-Clause > >>>>>>> out > >>>>>>>>> to > >>>>>>>>>>>> define > >>>>>>>>>>>>>>>>>>>>>>>>>>> global > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatAgg(fun: > >>>>>>>>> TableAggregateFunction).as(‘a, > >>>>>>>>>>> ‘b, > >>>>>>>>>>>>> ‘c) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The behavior of table > >> aggregates > >>>> is > >>>>>> most > >>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> GroupReduceFunction > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> did, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which computed for a group of > >>>>> elements, > >>>>>>> and > >>>>>>>>>>>> output a > >>>>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elements. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The TableAggregateFunction can > >> be > >>>>>> applied > >>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> GroupedTable.flatAgg() . > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface of > >>> TableAggregateFunction > >>>>>> has a > >>>>>>>> lot > >>>>>>>>>> of > >>>>>>>>>>>>>> content, > >>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> copy > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it here, Please look at the > >>> detail > >>>> in > >>>>>>>> google > >>>>>>>>>> doc: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will be very appreciate to > >>> anyone > >>>>> for > >>>>>>>>>> reviewing > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>> commenting. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > ----------------------------------------------------------------------------------- > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> *Rome was not built in one day* > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > ----------------------------------------------------------------------------------- > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >