Hi Jincheng,

Sounds good!
You should have Wiki permissions now.

Thanks, Fabian

Am Do., 29. Nov. 2018 um 14:46 Uhr schrieb jincheng sun <
sunjincheng...@gmail.com>:

> Thanks Fabian&Piotrek,
>
> Your feedback sounds very good!
> So far we on the same page about how to handle group keys. I will update
> the google doc according  our discussion and  I'd like to convert it to a
> FLIP. Thus, it would be great if  any of you  can grant me the write access
> to Confluence. My Confluence ID is sunjincheng121.
>
> Cheers,
> Jincheng
>
> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月29日周四 下午6:48写道:
>
> > Hi Jincheng & Fabian,
> >
> > +1 From my point of view.
> >
> > I like this idea that you have to close `flatAggregate` with `select`
> > statement. In a way it will be consistent with normal `groupBy` and
> indeed
> > it solves the problem of mixing table and scalar functions.
> >
> > I would be against supporting `select(‘*)` that returns only `col1` and
> > `col2`. `select('*)` for me would mean “give me everything that you have”
> > and it would be very confusing for me that:
> >
> >   tab.window(Tumble ... as 'w)
> >    .groupBy('w, 'k1, 'k2)
> >    .flatAggregate(tableAgg('a))
> >    .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)
> >
> > would return more columns compared to:
> >
> >   tab.window(Tumble ... as 'w)
> >    .groupBy('w, 'k1, 'k2)
> >    .flatAggregate(tableAgg('a))
> >    .select(‘*)
> >
> > Also I would be fine with not supporting ever `.select(‘*)`. After all it
> > would be consistent with regular aggregation where this doesn’t make
> sense
> > and is not supported as well:
> >
> >   tab.window(Tumble ... as 'w)
> >    .groupBy('w, 'k1, 'k2)
> >    .select(‘*)
> >
> > Piotrek
> >
> > > On 29 Nov 2018, at 10:51, Fabian Hueske <fhue...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > OK, not supporting select('*) in the first version, sounds like a good
> > > first step, +1 for this.
> > >
> > > However, I don't think that select('*) returning only the result
> columns
> > of
> > > the agg function would be a significant break in semantics.
> > > Since aggregate()/flatAggregate() is the last command and (visibly)
> only
> > > forwards the result columns of the agg function, returning those for '*
> > > seems fine with me.
> > > The grouping columns (+window properties) are "side-passed" from the
> > > groupBy.
> > >
> > > So, IMO, both select('*) and select('k1, 'k2, 'w.rowtime, '*) have well
> > > defined semantics. But we can add these shortcuts later.
> > >
> > > Best,
> > > Fabian
> > >
> > >
> > > Am Mi., 28. Nov. 2018 um 11:13 Uhr schrieb jincheng sun <
> > > sunjincheng...@gmail.com>:
> > >
> > >> Hi Fabian,
> > >>
> > >> Thank you for listing the detailed example of forcing the use of
> select.
> > >>
> > >> If I didn't make it clear before, I would like to share my thoughts
> > about
> > >> the group keys here:
> > >>
> > >>  1. agg/flatagg(Expression) keeps a single Expression;
> > >>  2. The way to force users to use select is as follows(As you
> > mentioned):
> > >>
> > >>   val tabX2 = tab.window(Tumble ... as 'w)
> > >>
> > >>    .groupBy('w, 'k1, 'k2)
> > >>
> > >>    .flatAgg(tableAgg('a))
> > >>
> > >>    .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)
> > >>
> > >>
> > >> 3.   IMO. we do not support select "*" in window group table at the
> > first
> > >> version.
> > >>      But i am fine if you want support the shout cut,i.e.:
> > >> `select('w.rowtime, 'k1, 'k2, '*)` , which meant "*" does not include
> > group
> > >> keys.
> > >>     Just a little bit worried that "*" represents all columns in a
> > >> non-grouped table, and groupedTable does not represent all columns.
> > >>     The semantics of “*” are not uniform in this case.
> > >>
> > >> What do you think?
> > >>
> > >> Thanks,
> > >> Jincheng
> > >>
> > >>
> > >> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午7:27写道:
> > >>
> > >>> I don't think we came to a conclusion yet.
> > >>> Are you suggesting that 'w would disappear if we do the following:
> > >>>
> > >>> val tabX = tab.window(Tumble ... as 'w)
> > >>>    .groupBy('w, 'k1, 'k2)
> > >>>    .flatAgg(tableAgg('a))
> > >>>    .select('*)
> > >>>
> > >>> Given that tableAgg returns [col1, col2], what would the schema of
> tabX
> > >> be?
> > >>>
> > >>> 1) [col1, col2]: fine with me
> > >>> 2) [k1, k2, col1, col2]: I'm very skeptical about this option because
> > it
> > >> is
> > >>> IMO inconsistent. Users will ask, where did 'w go.
> > >>> 2) [w, k1, k2, col1, col2]: back to our original problem. What's the
> > data
> > >>> type of 'w?
> > >>>
> > >>> If we go for option 1, we can still allow to select keys.
> > >>>
> > >>> val tabX2 = tab.window(Tumble ... as 'w)
> > >>>    .groupBy('w, 'k1, 'k2)
> > >>>    .flatAgg(tableAgg('a))
> > >>>    .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)
> > >>>
> > >>> The good thing about enforcing select is that the result of flatAgg
> is
> > >> not
> > >>> a Table, i.e., the definition of the operation is not completed yet.
> > >>> On the other hand, we would need to list all result attributes of the
> > >> table
> > >>> function if we want to select keys. We could to that with a short
> cut:
> > >>>
> > >>> val tabX3 = tab.window(Tumble ... as 'w)
> > >>>    .groupBy('w, 'k1, 'k2)
> > >>>    .flatAgg(tableAgg('a))
> > >>>    .select('w.rowtime, 'k1, 'k2, '*)
> > >>>
> > >>>
> > >>> Am Di., 27. Nov. 2018 um 10:46 Uhr schrieb jincheng sun <
> > >>> sunjincheng...@gmail.com>:
> > >>>
> > >>>> Thanks Fabian, if we enforcing select, as i said before user should
> > >> using
> > >>>> 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX' etc.  In this way
> > we
> > >>>> should not defined the type of 'w, we can keep the current way of
> > using
> > >>> 'w.
> > >>>> I'll file the JIRA. and open the PR ASAP.
> > >>>>
> > >>>> Thanks,
> > >>>> Jincheng
> > >>>>
> > >>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午4:50写道:
> > >>>>
> > >>>>> I thought about it again.
> > >>>>> Enforcing select won't help us with the choice of how to represent
> > >> 'w.
> > >>>>>
> > >>>>> select('*) and
> > >>>>> select('a, 'b, 'w).
> > >>>>>
> > >>>>> would still be valid expressions and we need to decide how 'w is
> > >>>>> represented.
> > >>>>> As I said before, Tuple, Row, and Map have disadvantages because
> the
> > >>>> syntax
> > >>>>> 'w.rowtime or 'w.end would not be supported.
> > >>>>>
> > >>>>> Am Di., 27. Nov. 2018 um 01:05 Uhr schrieb jincheng sun <
> > >>>>> sunjincheng...@gmail.com>:
> > >>>>>
> > >>>>>> Before we have a good support for nest-table, may be forcing the
> > >> use
> > >>> of
> > >>>>>> select is good way, at least not causing compatibility issues.
> > >>>>>>
> > >>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月26日周一 下午6:48写道:
> > >>>>>>
> > >>>>>>> I think the question is what is the data type of 'w.
> > >>>>>>>
> > >>>>>>> Until now, I assumed it would be a nested tuple (Row or Tuple).
> > >>>>>>> Accessing nested fields in Row, Tuple or Pojo is done with get,
> > >>> i.e.,
> > >>>>>>> 'w.get("rowtime").
> > >>>>>>> Using a Map would not help because the access would be 'w.at
> > >>>>> ("rowtime").
> > >>>>>>>
> > >>>>>>> We can of course also enforce the select() if aggregate /
> > >>>> flatAggregate
> > >>>>>> do
> > >>>>>>> not return a Table but some kind of AggregatedTable that does not
> > >>>>> provide
> > >>>>>>> any other function than select().
> > >>>>>>>
> > >>>>>>> Am Mo., 26. Nov. 2018 um 01:12 Uhr schrieb jincheng sun <
> > >>>>>>> sunjincheng...@gmail.com>:
> > >>>>>>>
> > >>>>>>>> Yes,I agree the problem is needs attention. IMO. It depends on
> > >>> how
> > >>>> we
> > >>>>>>>> define the ‘w type. The way you above defines the 'w type as a
> > >>>> tuple.
> > >>>>>> If
> > >>>>>>>> you serialize 'w to a Map, the compatibility will be better.
> > >> Even
> > >>>>> more
> > >>>>>> we
> > >>>>>>>> can define ‘w as a special type. UDF and Sink can't be used
> > >>>> directly.
> > >>>>>>> Must
> > >>>>>>>> use 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX', and I
> > >>> will
> > >>>> be
> > >>>>>>> very
> > >>>>>>>> grateful if you can share your solution to this problem,  and
> > >> we
> > >>>> also
> > >>>>>> can
> > >>>>>>>> discuss it carefully in the PR to be opened. What to you think?
> > >>>>>>>>
> > >>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午6:21写道:
> > >>>>>>>>
> > >>>>>>>>> Something like:
> > >>>>>>>>>
> > >>>>>>>>> val x = tab.window(Tumble ... as 'w)
> > >>>>>>>>>    .groupBy('w, 'k1, 'k2)
> > >>>>>>>>>    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > >>>>>>>>>
> > >>>>>>>>> x.insertInto("sinkTable") // fails because result schema has
> > >>>>> changed
> > >>>>>>> from
> > >>>>>>>>> ((start, end, rowtime), k1, k2, col1, col2) to ((start, end,
> > >>>>> rowtime,
> > >>>>>>>>> newProperty), k1, k2, col1, col2)
> > >>>>>>>>> x.select(myUdf('w)) // fails because UDF expects (start, end,
> > >>>>>> rowtime)
> > >>>>>>>> and
> > >>>>>>>>> not (start, end, rowtime, newProperty)
> > >>>>>>>>>
> > >>>>>>>>> Basically, every time when the composite type 'w is used as a
> > >>>>> whole.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Am Fr., 23. Nov. 2018 um 10:45 Uhr schrieb jincheng sun <
> > >>>>>>>>> sunjincheng...@gmail.com>:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Fabian,
> > >>>>>>>>>>
> > >>>>>>>>>> I don't fully understand the question you mentioned:
> > >>>>>>>>>>
> > >>>>>>>>>> Any query that relies on the composite type with three
> > >> fields
> > >>>>> will
> > >>>>>>> fail
> > >>>>>>>>>>
> > >>>>>>>>>> after adding a forth field.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> I am appreciate if you can give some detail examples ?
> > >>>>>>>>>>
> > >>>>>>>>>> Regards,
> > >>>>>>>>>> JIncheng
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午4:41写道:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi,
> > >>>>>>>>>>>
> > >>>>>>>>>>> My concerns are about the case when there is no
> > >> additional
> > >>>>>> select()
> > >>>>>>>>>> method,
> > >>>>>>>>>>> i.e.,
> > >>>>>>>>>>>
> > >>>>>>>>>>> tab.window(Tumble ... as 'w)
> > >>>>>>>>>>>    .groupBy('w, 'k1, 'k2)
> > >>>>>>>>>>>    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > >>>>>>>>>>>
> > >>>>>>>>>>> In this case, 'w is a composite field consisting of three
> > >>>>> fields
> > >>>>>>>> (end,
> > >>>>>>>>>>> start, rowtime).
> > >>>>>>>>>>> Once we add a new property, it would need to be added to
> > >>> the
> > >>>>>>>> composite
> > >>>>>>>>>>> type.
> > >>>>>>>>>>> Any query that relies on the composite type with three
> > >>> fields
> > >>>>>> will
> > >>>>>>>> fail
> > >>>>>>>>>>> after adding a forth field.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best, Fabian
> > >>>>>>>>>>>
> > >>>>>>>>>>> Am Fr., 23. Nov. 2018 um 02:01 Uhr schrieb jincheng sun <
> > >>>>>>>>>>> sunjincheng...@gmail.com>:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Thanks Fabian,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks a lot for your feedback, and very important and
> > >>>>>> necessary
> > >>>>>>>>> design
> > >>>>>>>>>>>> reminders!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Yes, your are right!  Spark is the specified grouping
> > >>>> columns
> > >>>>>>>>> displayed
> > >>>>>>>>>>>> before 1.3, but the grouping columns are implicitly
> > >>> passed
> > >>>> in
> > >>>>>>>>> spark1.4
> > >>>>>>>>>>> and
> > >>>>>>>>>>>> later. The reason for changing this behavior is that
> > >> due
> > >>> to
> > >>>>> the
> > >>>>>>>> user
> > >>>>>>>>>>>> feedback. Although implicit delivery will have the
> > >>>> drawbacks
> > >>>>>> you
> > >>>>>>>>>>> mentioned,
> > >>>>>>>>>>>> this approach is really convenient for the user.
> > >>>>>>>>>>>> I agree that grouping on windows we have to pay
> > >> attention
> > >>>> to
> > >>>>>> the
> > >>>>>>>>>> handling
> > >>>>>>>>>>>> of the window's properties, because we may introduce
> > >> new
> > >>>>> window
> > >>>>>>>>>> property.
> > >>>>>>>>>>>> So, from the points of view, We delay the processing of
> > >>> the
> > >>>>>>> window
> > >>>>>>>>>>>> property, ie: we pass the complex type 'w on the
> > >>> tableAPI,
> > >>>>> and
> > >>>>>>>>> provide
> > >>>>>>>>>>>> different property method operations in the SELECT
> > >>>> according
> > >>>>> to
> > >>>>>>> the
> > >>>>>>>>>> type
> > >>>>>>>>>>> of
> > >>>>>>>>>>>> 'w, such as: 'w.start, 'w.end, 'w.xxx , in the TableAPI
> > >>>> will
> > >>>>>>> limit
> > >>>>>>>>> and
> > >>>>>>>>>>>> verify the attribute operations that 'w has. An example
> > >>> is
> > >>>> as
> > >>>>>>>>> follows:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > >>>>>>>>>>>>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > >>>>>>>>>>>>    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1,
> > >> 'col2)
> > >>>> //
> > >>>>> 'w
> > >>>>>>> is
> > >>>>>>>>>>>> composite field
> > >>>>>>>>>>>>    .select('k1, 'col1, 'w.rowtime as 'ts, 'w.xxx as
> > >> 'xx)
> > >>>> //
> > >>>>> In
> > >>>>>>>>> select
> > >>>>>>>>>> we
> > >>>>>>>>>>>> will limit and verify  that ’w.xx is allowed
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I am not sure if I fully understand your concerns, if
> > >>> there
> > >>>>> any
> > >>>>>>>>>>> understand
> > >>>>>>>>>>>> are mistakes, please correct me. Any feedback is
> > >>>> appreciate!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Bests,
> > >>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月22日周四
> > >>>> 下午10:13写道:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> First of all, it is correct that the
> > >>> flatMap(Expression*)
> > >>>>> and
> > >>>>>>>>>>>>> flatAggregate(Expression*) methods would mix scalar
> > >> and
> > >>>>> table
> > >>>>>>>>> values.
> > >>>>>>>>>>>>> This would be a new concept that is not present in
> > >> the
> > >>>>>> current
> > >>>>>>>> API.
> > >>>>>>>>>>>>> From my point of view, the semantics are quite clear,
> > >>>> but I
> > >>>>>>>>>> understand
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>> others are more careful and worry about future
> > >>>> extensions.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I am fine with going for single expression arguments
> > >>> for
> > >>>>>> map()
> > >>>>>>>> and
> > >>>>>>>>>>>>> flatMap(). We can later expand them to Expression* if
> > >>> we
> > >>>>> feel
> > >>>>>>> the
> > >>>>>>>>>> need
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>> are more comfortable about the implications.
> > >>>>>>>>>>>>> Whenever, a time attribute needs to be forwarded,
> > >> users
> > >>>> can
> > >>>>>>> fall
> > >>>>>>>>> back
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>> join(TableFunction) as Xiaowei mentioned.
> > >>>>>>>>>>>>> So we restrict the usability of the new methods but
> > >>> don't
> > >>>>>> lose
> > >>>>>>>>>>>>> functionality and don't prevent future extensions.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The aggregate() and flatAggregate() case is more
> > >>>> difficult
> > >>>>>>>> because
> > >>>>>>>>>>>> implicit
> > >>>>>>>>>>>>> forwarding of grouping fields cannot be changed later
> > >>>>> without
> > >>>>>>>>>> breaking
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> API.
> > >>>>>>>>>>>>> There are other APIs (e.g., Spark) that also
> > >> implicitly
> > >>>>>> forward
> > >>>>>>>> the
> > >>>>>>>>>>>>> grouping columns. So this is not uncommon.
> > >>>>>>>>>>>>> However, I personally don't like that approach,
> > >> because
> > >>>> it
> > >>>>> is
> > >>>>>>>>>> implicit
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>> introduces a new behavior that is not present in the
> > >>>>> current
> > >>>>>>> API.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> One thing to consider here is the handling of
> > >> grouping
> > >>> on
> > >>>>>>>> windows.
> > >>>>>>>>>>>>> If I understood Xiaowei correctly, a composite field
> > >>> that
> > >>>>> is
> > >>>>>>>> named
> > >>>>>>>>>> like
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> window alias (e.g., 'w) would be implicitly added to
> > >>> the
> > >>>>>> result
> > >>>>>>>> of
> > >>>>>>>>>>>>> aggregate() or flatAggregate().
> > >>>>>>>>>>>>> The composite field would have fields like (start,
> > >> end,
> > >>>>>>> rowtime)
> > >>>>>>>> or
> > >>>>>>>>>>>> (start,
> > >>>>>>>>>>>>> end, proctime) depending on the window type.
> > >>>>>>>>>>>>> If we would ever introduce a fourth window property,
> > >> we
> > >>>>> might
> > >>>>>>>> break
> > >>>>>>>>>>>>> existing queries.
> > >>>>>>>>>>>>> Is this something that we should worry about?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>> Fabian
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Am Do., 22. Nov. 2018 um 14:03 Uhr schrieb Piotr
> > >>>> Nowojski <
> > >>>>>>>>>>>>> pi...@data-artisans.com>:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi Jincheng,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> #1) ok, got it.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> #3)
> > >>>>>>>>>>>>>>> From points of my view I we can using
> > >>>>>>>>>>>>>>> `Expression`, and after the discussion decided to
> > >>> use
> > >>>>>>>>>> Expression*,
> > >>>>>>>>>>>> then
> > >>>>>>>>>>>>>>> improve it. In any case, we can use Expression,
> > >> and
> > >>>>> there
> > >>>>>>> is
> > >>>>>>>> an
> > >>>>>>>>>>>>>> opportunity
> > >>>>>>>>>>>>>>> to become Expression* (compatibility). If we use
> > >>>>>>> Expression*
> > >>>>>>>>>>>> directly,
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>> is difficult for us to become Expression, which
> > >>> will
> > >>>>>> break
> > >>>>>>>> the
> > >>>>>>>>>>>>>>> compatibility between versions.  What do you
> > >> think?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I don’t think that’s the case here. If we start
> > >> with
> > >>>>> single
> > >>>>>>>> param
> > >>>>>>>>>>>>>> `flatMap(Expression)`, it will need implicit
> > >> columns
> > >>> to
> > >>>>> be
> > >>>>>>>>> present
> > >>>>>>>>>> in
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> result, which:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> a) IMO it brakes SQL convention (that’s why I’m
> > >>> against
> > >>>>>> this)
> > >>>>>>>>>>>>>> b) we can not later easily introduce
> > >>>>> `flatMap(Expression*)`
> > >>>>>>>>> without
> > >>>>>>>>>>>> those
> > >>>>>>>>>>>>>> implicit columns, without braking the compatibility
> > >>> or
> > >>>> at
> > >>>>>>> least
> > >>>>>>>>>>> without
> > >>>>>>>>>>>>>> making `flatMap(Expression*)` and
> > >>> `flatMap(Expression)`
> > >>>>>>>> terribly
> > >>>>>>>>>>>>>> inconsistent.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> To elaborate on (a). It’s not nice if our own API
> > >> is
> > >>>>>>>> inconsistent
> > >>>>>>>>>> and
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>> sometimes behaves one way and sometimes another
> > >> way:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >> table.groupBy(‘k).select(scalarAggregateFunction(‘v))
> > >>>> =>
> > >>>>>>> single
> > >>>>>>>>>>> column
> > >>>>>>>>>>>>>> result, just the output of
> > >> `scalarAggregateFunction`
> > >>>>>>>>>>>>>> vs
> > >>>>>>>>>>>>>>
> > >>>>> table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v))
> > >>>>>>> =>
> > >>>>>>>>> both
> > >>>>>>>>>>>>> result
> > >>>>>>>>>>>>>> of `tableAggregateFunction` plus key (and an
> > >> optional
> > >>>>>> window
> > >>>>>>>>>> context
> > >>>>>>>>>>> ?)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thus I think we have to now decide which way we
> > >> want
> > >>> to
> > >>>>>> jump,
> > >>>>>>>>> since
> > >>>>>>>>>>>> later
> > >>>>>>>>>>>>>> will be too late. Or again, am I missing something?
> > >>> :)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 22 Nov 2018, at 02:07, jincheng sun <
> > >>>>>>>>> sunjincheng...@gmail.com
> > >>>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi Piotrek,
> > >>>>>>>>>>>>>>> #1)We have unbounded and bounded group window
> > >>>>> aggregate,
> > >>>>>>> for
> > >>>>>>>>>>>> unbounded
> > >>>>>>>>>>>>>> case
> > >>>>>>>>>>>>>>> we should early fire the result with retract
> > >>> message,
> > >>>>> we
> > >>>>>>> can
> > >>>>>>>>> not
> > >>>>>>>>>>>> using
> > >>>>>>>>>>>>>>> watermark, because unbounded aggregate never
> > >>>> finished.
> > >>>>>> (for
> > >>>>>>>>>>>> improvement
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> can introduce micro-batch in feature),  for
> > >> bounded
> > >>>>>> window
> > >>>>>>> we
> > >>>>>>>>>> never
> > >>>>>>>>>>>>>> support
> > >>>>>>>>>>>>>>> early fire, so we do not need retract.
> > >>>>>>>>>>>>>>> #3)  About validation of
> > >>>> `table.select(F(‘a).unnest(),
> > >>>>>> ‘b,
> > >>>>>>>>>>>>>>> G(‘c).unnest())/table.flatMap(F(‘a), ‘b,
> > >>>> scalarG(‘c))`
> > >>>>>>> Fabian
> > >>>>>>>>> had
> > >>>>>>>>>>>>>> mentioned
> > >>>>>>>>>>>>>>> above, please look at the prior mail.  For
> > >>>>>>>>> `table.flatMap(F(‘a),
> > >>>>>>>>>>> ‘b,
> > >>>>>>>>>>>>>>> scalarG(‘c))` that we concerned, i.e.:  we should
> > >>>>> discuss
> > >>>>>>> the
> > >>>>>>>>>> issue
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>> `Expression*` vs `Expression`. From points of my
> > >>>> view I
> > >>>>>> we
> > >>>>>>>> can
> > >>>>>>>>>>> using
> > >>>>>>>>>>>>>>> `Expression`, and after the discussion decided to
> > >>> use
> > >>>>>>>>>> Expression*,
> > >>>>>>>>>>>> then
> > >>>>>>>>>>>>>>> improve it. In any case, we can use Expression,
> > >> and
> > >>>>> there
> > >>>>>>> is
> > >>>>>>>> an
> > >>>>>>>>>>>>>> opportunity
> > >>>>>>>>>>>>>>> to become Expression* (compatibility). If we use
> > >>>>>>> Expression*
> > >>>>>>>>>>>> directly,
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>> is difficult for us to become Expression, which
> > >>> will
> > >>>>>> break
> > >>>>>>>> the
> > >>>>>>>>>>>>>>> compatibility between versions.  What do you
> > >> think?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> If there anything not clearly, welcome any
> > >>>>>>>>> feedback!Agains,thanks
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>> share
> > >>>>>>>>>>>>>>> your thoughts!
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com>
> > >>>>> 于2018年11月21日周三
> > >>>>>>>>>> 下午9:37写道:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Jincheng,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> #1) No,watermark solves the issue of the late
> > >>>> event.
> > >>>>>>> Here,
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>> performance
> > >>>>>>>>>>>>>>>>> problem is caused by the update emit mode.
> > >> i.e.:
> > >>>> When
> > >>>>>>>> current
> > >>>>>>>>>>>>>> calculation
> > >>>>>>>>>>>>>>>>> result is output, the previous calculation
> > >> result
> > >>>>> needs
> > >>>>>>> to
> > >>>>>>>> be
> > >>>>>>>>>>>>>> retracted.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hmm, yes I missed this. For time-windowed cases
> > >>>> (some
> > >>>>>>>>>>>>>>>> aggregate/flatAggregate cases) emitting only on
> > >>>>>> watermark
> > >>>>>>>>> should
> > >>>>>>>>>>>> solve
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> problem. For non time windowed cases it would
> > >>> reduce
> > >>>>> the
> > >>>>>>>>> amount
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>>>> retractions, right? Or am I still missing
> > >>> something?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> #3)I still hope to keep the simplicity that
> > >>> select
> > >>>>> only
> > >>>>>>>>> support
> > >>>>>>>>>>>>>> projected
> > >>>>>>>>>>>>>>>>> scalar, we can hardly tell the semantics of
> > >>>>>>>>>>> tab.select(flatmap('a),
> > >>>>>>>>>>>>> 'b,
> > >>>>>>>>>>>>>>>>> flatmap('d)).
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest())
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Could be rejected during some validation phase.
> > >> On
> > >>>> the
> > >>>>>>> other
> > >>>>>>>>>> hand:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> table.select(F(‘a).unnest(), ‘b, scalarG(‘c))
> > >>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>> table.flatMap(F(‘a), ‘b, scalarG(‘c))
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Could work and be more or less a syntax sugar
> > >> for
> > >>>>> cross
> > >>>>>>>> apply.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On 21 Nov 2018, at 12:16, jincheng sun <
> > >>>>>>>>>> sunjincheng...@gmail.com
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi shaoxuan & Hequn,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks for your suggestion,I'll file the JIRAs
> > >>>> later.
> > >>>>>>>>>>>>>>>>> We can prepare PRs while continuing to move
> > >>> forward
> > >>>>> the
> > >>>>>>>>> ongoing
> > >>>>>>>>>>>>>>>> discussion.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> jincheng sun <sunjincheng...@gmail.com>
> > >>>>> 于2018年11月21日周三
> > >>>>>>>>>> 下午7:07写道:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi Piotrek,
> > >>>>>>>>>>>>>>>>>> Thanks for your feedback, and thanks for
> > >> share
> > >>>> your
> > >>>>>>>>> thoughts!
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> #1) No,watermark solves the issue of the late
> > >>>> event.
> > >>>>>>> Here,
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>> performance
> > >>>>>>>>>>>>>>>>>> problem is caused by the update emit mode.
> > >> i.e.:
> > >>>>> When
> > >>>>>>>>> current
> > >>>>>>>>>>>>>>>> calculation
> > >>>>>>>>>>>>>>>>>> result is output, the previous calculation
> > >>> result
> > >>>>>> needs
> > >>>>>>> to
> > >>>>>>>>> be
> > >>>>>>>>>>>>>> retracted.
> > >>>>>>>>>>>>>>>>>> #2) As I mentioned above we should continue
> > >> the
> > >>>>>>> discussion
> > >>>>>>>>>> until
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>> solve
> > >>>>>>>>>>>>>>>>>> the problems raised by Xiaowei and Fabian.
> > >>>>>>>>>>>>>>>>>> #3)I still hope to keep the simplicity that
> > >>> select
> > >>>>>> only
> > >>>>>>>>>> support
> > >>>>>>>>>>>>>>>> projected
> > >>>>>>>>>>>>>>>>>> scalar, we can hardly tell the semantics of
> > >>>>>>>>>>>> tab.select(flatmap('a),
> > >>>>>>>>>>>>>> 'b,
> > >>>>>>>>>>>>>>>>>> flatmap('d)).
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com>
> > >>>>>> 于2018年11月21日周三
> > >>>>>>>>>>> 下午5:24写道:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> In fact, in addition to the design of APIs,
> > >>>> there
> > >>>>>> will
> > >>>>>>>> be
> > >>>>>>>>>>>> various
> > >>>>>>>>>>>>>>>>>>>> performance optimization details, such as:
> > >>> table
> > >>>>>>>> Aggregate
> > >>>>>>>>>>>>> function
> > >>>>>>>>>>>>>>>>>>>> emitValue will generate multiple calculation
> > >>>>>> results,
> > >>>>>>> in
> > >>>>>>>>>>> extreme
> > >>>>>>>>>>>>>>>> cases,
> > >>>>>>>>>>>>>>>>>>>> each record will trigger a large number of
> > >>>> retract
> > >>>>>>>>> messages,
> > >>>>>>>>>>>> this
> > >>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>> poor performance
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Can this be solved/mitigated by emitting the
> > >>>>> results
> > >>>>>>> only
> > >>>>>>>>> on
> > >>>>>>>>>>>>>>>> watermarks?
> > >>>>>>>>>>>>>>>>>>> I think that was the path that we decided to
> > >>> take
> > >>>>>> both
> > >>>>>>>> for
> > >>>>>>>>>>>> Temporal
> > >>>>>>>>>>>>>>>> Joins
> > >>>>>>>>>>>>>>>>>>> and upsert stream conversion. I know that
> > >> this
> > >>>>>>> increases
> > >>>>>>>>> the
> > >>>>>>>>>>>>> latency
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>> there is a place for a future global
> > >>> setting/user
> > >>>>>>>>> preference
> > >>>>>>>>>>>> “emit
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> data
> > >>>>>>>>>>>>>>>>>>> ASAP mode”, but emitting only on watermarks
> > >>> seems
> > >>>>> to
> > >>>>>> me
> > >>>>>>>> as
> > >>>>>>>>> a
> > >>>>>>>>>>>>>>>> better/more
> > >>>>>>>>>>>>>>>>>>> sane default.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> With respect to the API discussion and
> > >> implicit
> > >>>>>>> columns.
> > >>>>>>>>> The
> > >>>>>>>>>>>>> problem
> > >>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>> me so far is I’m not sure if I like the
> > >>>>> additionally
> > >>>>>>>>>> complexity
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>> `append()` solution, while implicit columns
> > >> are
> > >>>>>>>> definitely
> > >>>>>>>>>> not
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> spirit of SQL. Neither joins nor aggregations
> > >>> add
> > >>>>>> extra
> > >>>>>>>>>>>> unexpected
> > >>>>>>>>>>>>>>>> columns
> > >>>>>>>>>>>>>>>>>>> to the result without asking. This definitely
> > >>> can
> > >>>>> be
> > >>>>>>>>>> confusing
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> users since it brakes the convention. Thus I
> > >>>> would
> > >>>>>> lean
> > >>>>>>>>>> towards
> > >>>>>>>>>>>>>>>> Fabian’s
> > >>>>>>>>>>>>>>>>>>> proposal of multi-argument `map(Expression*)`
> > >>>> from
> > >>>>>>> those
> > >>>>>>>> 3
> > >>>>>>>>>>>> options.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Another topic is that I’m not 100% convinced
> > >>> that
> > >>>>> we
> > >>>>>>>> should
> > >>>>>>>>>> be
> > >>>>>>>>>>>>> adding
> > >>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>> api functions for `map`,`aggregate`,`flatMap`
> > >>> and
> > >>>>>>>>>>>> `flatAggregate`.
> > >>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>> the same could be achieved by changing
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> table.map(F('x))
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> into
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> table.select(F('x)).unnest()
> > >>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>> table.select(F('x).unnest())
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Where `unnest()` means unnest row/tuple type
> > >>>> into a
> > >>>>>>>>> columnar
> > >>>>>>>>>>>> table.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> table.flatMap(F('x))
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Could be on the other hand also handled by
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> table.select(F('x))
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> By correctly deducing that F(x) is a multi
> > >> row
> > >>>>> output
> > >>>>>>>>>> function
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Same might apply to `aggregate(F('x))`, but
> > >>> this
> > >>>>>> maybe
> > >>>>>>>>> could
> > >>>>>>>>>> be
> > >>>>>>>>>>>>>>>> replaced
> > >>>>>>>>>>>>>>>>>>> by:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> table.groupBy(…).select(F('x).unnest())
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Adding scalar functions should also be
> > >>> possible:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> table.groupBy('k).select(F('x).unnest(), ‘k)
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Maybe such approach would allow us to
> > >> implement
> > >>>> the
> > >>>>>>> same
> > >>>>>>>>>>> features
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> SQL as well?
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <
> > >>>>>>>>> chenghe...@gmail.com
> > >>>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thank you all for the great proposal and
> > >>>>> discussion!
> > >>>>>>>>>>>>>>>>>>>> I also prefer to move on to the next step,
> > >> so
> > >>> +1
> > >>>>> for
> > >>>>>>>>> opening
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>> JIRAs
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> start the work.
> > >>>>>>>>>>>>>>>>>>>> We can have more detailed discussion there.
> > >>> Btw,
> > >>>>> we
> > >>>>>>> can
> > >>>>>>>>>> start
> > >>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>> JIRAs
> > >>>>>>>>>>>>>>>>>>>> which we have agreed on.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>> Hequn
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan
> > >>> Wang <
> > >>>>>>>>>>>>> wshaox...@gmail.com
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> +1. I agree that we should open the JIRAs
> > >> to
> > >>>>> start
> > >>>>>>> the
> > >>>>>>>>>> work.
> > >>>>>>>>>>> We
> > >>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>>>>>>> have better ideas on the flavor of the
> > >>>> interface
> > >>>>>> when
> > >>>>>>>>>>>>>>>> implement/review
> > >>>>>>>>>>>>>>>>>>>>> the code.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>> shaoxuan
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On 11/20/18, jincheng sun <
> > >>>>>> sunjincheng...@gmail.com>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Thanks all for the feedback.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> @Piotr About not using abbreviations
> > >> naming,
> > >>>>> +1,I
> > >>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>> your proposal!Currently both DataSet and
> > >>>>>> DataStream
> > >>>>>>>> API
> > >>>>>>>>>> are
> > >>>>>>>>>>>>> using
> > >>>>>>>>>>>>>>>>>>>>>> `aggregate`,
> > >>>>>>>>>>>>>>>>>>>>>> BTW,I find other language also not using
> > >>>>>>> abbreviations
> > >>>>>>>>>>>>> naming,such
> > >>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>> R.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Sometimes the interface of the API is
> > >> really
> > >>>>>>> difficult
> > >>>>>>>>> to
> > >>>>>>>>>>>>> perfect,
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>>>>>> to spend a lot of time thinking and
> > >> feedback
> > >>>>> from
> > >>>>>> a
> > >>>>>>>>> large
> > >>>>>>>>>>>> number
> > >>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>> users,
> > >>>>>>>>>>>>>>>>>>>>>> and constantly improve, but for backward
> > >>>>>>> compatibility
> > >>>>>>>>>>> issues,
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> have to
> > >>>>>>>>>>>>>>>>>>>>>> adopt the most conservative approach when
> > >>>>>> designing
> > >>>>>>>> the
> > >>>>>>>>>>> API(Of
> > >>>>>>>>>>>>>>>>>>> course, I
> > >>>>>>>>>>>>>>>>>>>>> am
> > >>>>>>>>>>>>>>>>>>>>>> more in favor of developing more rich
> > >>>> features,
> > >>>>>> when
> > >>>>>>>> we
> > >>>>>>>>>>>> discuss
> > >>>>>>>>>>>>>>>>>>> clearly).
> > >>>>>>>>>>>>>>>>>>>>>> Therefore, I propose to divide the
> > >> function
> > >>>>>>>>> implementation
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>> map/faltMap/agg/flatAgg into basic
> > >> functions
> > >>>> of
> > >>>>>>> JIRAs
> > >>>>>>>>> and
> > >>>>>>>>>>>> JIRAs
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> support time attributes and groupKeys. We
> > >>> can
> > >>>>>>> develop
> > >>>>>>>>> the
> > >>>>>>>>>>>>> features
> > >>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>> we  have already agreed on the design. And
> > >>> we
> > >>>>> will
> > >>>>>>>>>> continue
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> discuss
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> uncertain design.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> In fact, in addition to the design of
> > >> APIs,
> > >>>>> there
> > >>>>>>> will
> > >>>>>>>>> be
> > >>>>>>>>>>>>> various
> > >>>>>>>>>>>>>>>>>>>>>> performance optimization details, such as:
> > >>>> table
> > >>>>>>>>> Aggregate
> > >>>>>>>>>>>>>> function
> > >>>>>>>>>>>>>>>>>>>>>> emitValue will generate multiple
> > >> calculation
> > >>>>>>> results,
> > >>>>>>>> in
> > >>>>>>>>>>>> extreme
> > >>>>>>>>>>>>>>>>>>> cases,
> > >>>>>>>>>>>>>>>>>>>>>> each record will trigger a large number of
> > >>>>> retract
> > >>>>>>>>>> messages,
> > >>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>> poor performance,so we will also optimize
> > >>> the
> > >>>>>>>> interface
> > >>>>>>>>>>>> design,
> > >>>>>>>>>>>>>> such
> > >>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>> adding the emitWithRetractValue interface
> > >> (I
> > >>>>> have
> > >>>>>>>>> updated
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>> google
> > >>>>>>>>>>>>>>>>>>> doc)
> > >>>>>>>>>>>>>>>>>>>>>> to allow the user to optionally perform
> > >>>>>> incremental
> > >>>>>>>>>>>>> calculations,
> > >>>>>>>>>>>>>>>> thus
> > >>>>>>>>>>>>>>>>>>>>>> avoiding a large number of retracts.
> > >> Details
> > >>>>> like
> > >>>>>>> this
> > >>>>>>>>> are
> > >>>>>>>>>>>>>> difficult
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> fully discuss in the mail list, so I
> > >>> recommend
> > >>>>>>>> creating
> > >>>>>>>>>>>>> JIRAs/FLIP
> > >>>>>>>>>>>>>>>>>>> first,
> > >>>>>>>>>>>>>>>>>>>>>> we develop designs that have been agreed
> > >>> upon
> > >>>>> and
> > >>>>>>>>> continue
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> discuss
> > >>>>>>>>>>>>>>>>>>>>>> non-deterministic designs!  What do you
> > >>> think?
> > >>>>>>>> @Fabian &
> > >>>>>>>>>>>> Piotr &
> > >>>>>>>>>>>>>>>>>>> XiaoWei
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com>
> > >>>>> 于2018年11月19日周一
> > >>>>>>>>>>> 上午12:07写道:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Fabian & Piotr, thanks for the
> > >> feedback!
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I appreciate your concerns, both on
> > >>> timestamp
> > >>>>>>>>> attributes
> > >>>>>>>>>> as
> > >>>>>>>>>>>>> well
> > >>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>> implicit group keys. At the same time,
> > >> I'm
> > >>>> also
> > >>>>>>>>> concerned
> > >>>>>>>>>>>> with
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> proposed
> > >>>>>>>>>>>>>>>>>>>>>>> approach of allowing Expression* as
> > >>>> parameters,
> > >>>>>>>>>> especially
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>> flatMap/flatAgg. So far, we never
> > >> allowed a
> > >>>>>> scalar
> > >>>>>>>>>>> expression
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> appear
> > >>>>>>>>>>>>>>>>>>>>>>> together with table expressions. With the
> > >>>>>>> Expression*
> > >>>>>>>>>>>> approach,
> > >>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>> happen for the parameters to
> > >>> flatMap/flatAgg.
> > >>>>>> I'm a
> > >>>>>>>> bit
> > >>>>>>>>>>>>> concerned
> > >>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>> fully understand the consequences when we
> > >>> try
> > >>>>> to
> > >>>>>>>> extend
> > >>>>>>>>>> our
> > >>>>>>>>>>>>>> system
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> future. I would be extra cautious in
> > >> doing
> > >>>>> this.
> > >>>>>> To
> > >>>>>>>>> avoid
> > >>>>>>>>>>>>> this, I
> > >>>>>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>> implicit group key for flatAgg is safer.
> > >>> For
> > >>>>>>> flatMap,
> > >>>>>>>>> if
> > >>>>>>>>>>>> users
> > >>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> keep
> > >>>>>>>>>>>>>>>>>>>>>>> the rowtime column, he can use
> > >>>> crossApply/join
> > >>>>>>>> instead.
> > >>>>>>>>>> So
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>> losing any real functionality here.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Also a clarification on the following
> > >>>> example:
> > >>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > >>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a
> > >>>> group
> > >>>>>>> key.
> > >>>>>>>>>>>>>>>>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2,
> > >>>> 'col1,
> > >>>>>>>> 'col2)
> > >>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as
> > >> 'rtime)
> > >>>>>>>>>>>>>>>>>>>>>>> If we did not have the select clause in
> > >>> this
> > >>>>>>> example,
> > >>>>>>>>> we
> > >>>>>>>>>>> will
> > >>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>> 'w as
> > >>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>> regular column in the output. It should
> > >> not
> > >>>>>>> magically
> > >>>>>>>>>>>>> disappear.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> The concern is not as strong for
> > >>>>>>> Table.map/Table.agg
> > >>>>>>>>>>> because
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>> mixing scalar and table expressions. But
> > >> we
> > >>>>> also
> > >>>>>>> want
> > >>>>>>>>> to
> > >>>>>>>>>>> be a
> > >>>>>>>>>>>>> bit
> > >>>>>>>>>>>>>>>>>>>>>>> consistent with these methods. If we used
> > >>>>>> implicit
> > >>>>>>>>> group
> > >>>>>>>>>>> keys
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>> Table.flatAgg, we probably should do the
> > >>> same
> > >>>>> for
> > >>>>>>>>>>> Table.agg.
> > >>>>>>>>>>>>> Now
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>> have to choose what to do with
> > >> Table.map. I
> > >>>> can
> > >>>>>> see
> > >>>>>>>>> good
> > >>>>>>>>>>>>>> arguments
> > >>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>> both sides. But starting with a single
> > >>>>> Expression
> > >>>>>>>> seems
> > >>>>>>>>>>> safer
> > >>>>>>>>>>>>>>>> because
> > >>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>> we can always extend to Expression* in
> > >> the
> > >>>>>> future.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> While thinking about this problem, it
> > >>> appears
> > >>>>>> that
> > >>>>>>> we
> > >>>>>>>>> may
> > >>>>>>>>>>>> need
> > >>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>> work
> > >>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>> our handling of watermarks for SQL/Table
> > >>> API.
> > >>>>> Our
> > >>>>>>>>> current
> > >>>>>>>>>>> way
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>> propagating the watermarks from source
> > >> all
> > >>>> the
> > >>>>>> way
> > >>>>>>> to
> > >>>>>>>>>> sink
> > >>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> optimal. For example, after a tumbling
> > >>>> window,
> > >>>>>> the
> > >>>>>>>>>>> watermark
> > >>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>>>>> be advanced to just before the expiring
> > >> of
> > >>>> next
> > >>>>>>>>> window. I
> > >>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>> general, each operator may need to
> > >> generate
> > >>>> new
> > >>>>>>>>>> watermarks
> > >>>>>>>>>>>>>> instead
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>> simply propagating them. Once we accept
> > >>> that
> > >>>>>>>> watermarks
> > >>>>>>>>>> may
> > >>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>>>>>> during
> > >>>>>>>>>>>>>>>>>>>>>>> the execution, it appears that the
> > >>> timestamp
> > >>>>>>> columns
> > >>>>>>>>> may
> > >>>>>>>>>>> also
> > >>>>>>>>>>>>>>>>>>> change, as
> > >>>>>>>>>>>>>>>>>>>>>>> long as we have some way to associate
> > >>>> watermark
> > >>>>>>> with
> > >>>>>>>>> it.
> > >>>>>>>>>> My
> > >>>>>>>>>>>>>>>>>>> intuition is
> > >>>>>>>>>>>>>>>>>>>>>>> that once we have a through solution for
> > >>> the
> > >>>>>>>> watermark
> > >>>>>>>>>>> issue,
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> able to solve the problem we encountered
> > >>> for
> > >>>>>>>> Table.map
> > >>>>>>>>>> in a
> > >>>>>>>>>>>>>> cleaner
> > >>>>>>>>>>>>>>>>>>> way.
> > >>>>>>>>>>>>>>>>>>>>>>> But this is a complex issue which
> > >> deserves
> > >>> a
> > >>>>>>>> discussion
> > >>>>>>>>>> on
> > >>>>>>>>>>>> its
> > >>>>>>>>>>>>>> own.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>> Xiaowei
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr
> > >>>>> Nowojski <
> > >>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com>
> > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Isn’t the problem of multiple
> > >> expressions
> > >>>>>> limited
> > >>>>>>>> only
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>> `flat***`
> > >>>>>>>>>>>>>>>>>>>>>>>> functions and to be more specific only
> > >> to
> > >>>>> having
> > >>>>>>> two
> > >>>>>>>>> (or
> > >>>>>>>>>>>> more)
> > >>>>>>>>>>>>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>>>> table functions passed as an
> > >> expressions?
> > >>>>>>>>>>>>>> `.flatAgg(TableAggA('a),
> > >>>>>>>>>>>>>>>>>>>>>>>> scalarFunction1(‘b),
> > >> scalarFunction2(‘c))`
> > >>>>> seems
> > >>>>>>> to
> > >>>>>>>> be
> > >>>>>>>>>>> well
> > >>>>>>>>>>>>>>>> defined
> > >>>>>>>>>>>>>>>>>>>>>>>> (duplicate result of every scalar
> > >> function
> > >>>> to
> > >>>>>>> every
> > >>>>>>>>>>> record.
> > >>>>>>>>>>>> Or
> > >>>>>>>>>>>>>> am
> > >>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>> missing
> > >>>>>>>>>>>>>>>>>>>>>>>> something?
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Another remark, I would be in favour of
> > >>> not
> > >>>>>> using
> > >>>>>>>>>>>>> abbreviations
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>> naming
> > >>>>>>>>>>>>>>>>>>>>>>>> `agg` -> `aggregate`, `flatAgg` ->
> > >>>>>>> `flatAggregate`.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian
> > >> Hueske <
> > >>>>>>>>>>> fhue...@gmail.com
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> I said before, that I think that the
> > >>>> append()
> > >>>>>>>> method
> > >>>>>>>>> is
> > >>>>>>>>>>>>> better
> > >>>>>>>>>>>>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>>>>> implicitly forwarding keys, but still,
> > >> I
> > >>>>>> believe
> > >>>>>>> it
> > >>>>>>>>>> adds
> > >>>>>>>>>>>>>>>>>>> unnecessary
> > >>>>>>>>>>>>>>>>>>>>>>>> boiler
> > >>>>>>>>>>>>>>>>>>>>>>>>> plate code.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Moreover, I haven't seen a convincing
> > >>>>> argument
> > >>>>>>> why
> > >>>>>>>>>>>>>>>> map(Expression*)
> > >>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>> worse than map(Expression). In either
> > >>> case
> > >>>> we
> > >>>>>>> need
> > >>>>>>>> to
> > >>>>>>>>>> do
> > >>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>> kinds
> > >>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>> checks to prevent invalid use of
> > >>> functions.
> > >>>>>>>>>>>>>>>>>>>>>>>>> If the method is not correctly used, we
> > >>> can
> > >>>>>> emit
> > >>>>>>> a
> > >>>>>>>>> good
> > >>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>> message
> > >>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>> documenting map(Expression*) will be
> > >>> easier
> > >>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>>>> map(append(Expression*)),
> > >>>>>>>>>>>>>>>>>>>>>>>>> in my opinion.
> > >>>>>>>>>>>>>>>>>>>>>>>>> I think we should not add unnessary
> > >>> syntax
> > >>>>>> unless
> > >>>>>>>>> there
> > >>>>>>>>>>> is
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>> good
> > >>>>>>>>>>>>>>>>>>>>>>> reason
> > >>>>>>>>>>>>>>>>>>>>>>>>> and to be honest, I haven't seen this
> > >>>> reason
> > >>>>>> yet.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Regarding the groupBy.agg() method, I
> > >>> think
> > >>>>> it
> > >>>>>>>> should
> > >>>>>>>>>>>> behave
> > >>>>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>>>>>>> other method, i.e., not do any implicit
> > >>>>>>> forwarding.
> > >>>>>>>>>>>>>>>>>>>>>>>>> Let's take the example of the windowed
> > >>>> group
> > >>>>>> by,
> > >>>>>>>> that
> > >>>>>>>>>> you
> > >>>>>>>>>>>>>> posted
> > >>>>>>>>>>>>>>>>>>>>>>> before.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > >>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be
> > >> a
> > >>>>> group
> > >>>>>>> key.
> > >>>>>>>>>>>>>>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1,
> > >>>> 'col2)
> > >>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as
> > >> 'rtime)
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> What happens if 'w.rowtime is not
> > >>> selected?
> > >>>>>> What
> > >>>>>>> is
> > >>>>>>>>> the
> > >>>>>>>>>>>> data
> > >>>>>>>>>>>>>> type
> > >>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>> field 'w in the resulting Table? Is it
> > >> a
> > >>>>>> regular
> > >>>>>>>>> field
> > >>>>>>>>>> at
> > >>>>>>>>>>>> all
> > >>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>> system field that disappears if it is
> > >> not
> > >>>>>>> selected?
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> IMO, the following syntax is shorter,
> > >>> more
> > >>>>>>>> explicit,
> > >>>>>>>>>> and
> > >>>>>>>>>>>>> better
> > >>>>>>>>>>>>>>>>>>>>>>>>> aligned
> > >>>>>>>>>>>>>>>>>>>>>>>>> with the regular window.groupBy.select
> > >>>>>>> aggregations
> > >>>>>>>>>> that
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>>> supported
> > >>>>>>>>>>>>>>>>>>>>>>>>> today.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > >>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be
> > >> a
> > >>>>> group
> > >>>>>>> key.
> > >>>>>>>>>>>>>>>>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2,
> > >>>> agg('a))
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr
> > >>> schrieb
> > >>>>>>> jincheng
> > >>>>>>>>>> sun <
> > >>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian/Xiaowei,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I am very sorry for my late reply!
> > >> Glad
> > >>> to
> > >>>>> see
> > >>>>>>>> your
> > >>>>>>>>>>> reply,
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>> sounds
> > >>>>>>>>>>>>>>>>>>>>>>>>>> pretty good!
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the approach with
> > >> append()
> > >>>>> which
> > >>>>>>> can
> > >>>>>>>>>>> clearly
> > >>>>>>>>>>>>>>>> defined
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> result schema is better which Fabian
> > >>>>>> mentioned.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> In addition and append() and also
> > >>> contains
> > >>>>>>>> non-time
> > >>>>>>>>>>>>>> attributes,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> e.g.:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime)
> > >>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(append(udf('name), 'address,
> > >>>>>>>>>> 'rowtime).as('col1,
> > >>>>>>>>>>>>>> 'col2,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> 'address, 'rowtime)
> > >>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on
> > >> 'rowtime
> > >>>> as
> > >>>>>> 'w)
> > >>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'address)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> In this way the append() is very
> > >> useful,
> > >>>> and
> > >>>>>> the
> > >>>>>>>>>>> behavior
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>> very
> > >>>>>>>>>>>>>>>>>>>>>>>> similar
> > >>>>>>>>>>>>>>>>>>>>>>>>>> to withForwardedFields() in DataSet.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> So +1 to using append() approach for
> > >> the
> > >>>>>>>>>>> map()&flatmap()!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> But how about the agg() and flatAgg()?
> > >>> In
> > >>>>>>>>> agg/flatAgg
> > >>>>>>>>>>>> case I
> > >>>>>>>>>>>>>>>> agree
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei's approach that define the
> > >> keys
> > >>> to
> > >>>>> be
> > >>>>>>>>> implied
> > >>>>>>>>>> in
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> result
> > >>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>> and appears at the beginning, for
> > >>> example
> > >>>> as
> > >>>>>>>>> follows:
> > >>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > >>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should
> > >> be a
> > >>>>> group
> > >>>>>>>> key.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1,
> > >>>> 'col2)
> > >>>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as
> > >>> 'rtime)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> What to you think? @Fabian @Xiaowei
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com>
> > >>>>>> 于2018年11月9日周五
> > >>>>>>>>>>> 下午6:35写道:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the summary!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I like the approach with append()
> > >>> better
> > >>>>> than
> > >>>>>>> the
> > >>>>>>>>>>>> implicit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> forwarding
> > >>>>>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> clearly indicates which fields are
> > >>>>> forwarded.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, I don't see much benefit
> > >> over
> > >>>> the
> > >>>>>>>>>>>>>> flatMap(Expression*)
> > >>>>>>>>>>>>>>>>>>>>>>>> variant,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> we would still need to analyze the
> > >> full
> > >>>>>>>> expression
> > >>>>>>>>>> tree
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> ensure
> > >>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> most (or exactly?) one Scalar /
> > >>>>> TableFunction
> > >>>>>>> is
> > >>>>>>>>>> used.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Fabian
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr
> > >>> schrieb
> > >>>>>>>> jincheng
> > >>>>>>>>>> sun
> > >>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> We are discussing very detailed
> > >>> content
> > >>>>>> about
> > >>>>>>>> this
> > >>>>>>>>>>>>> proposal.
> > >>>>>>>>>>>>>>>> We
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> trying
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> to design the API in many aspects
> > >>>>>>>> (functionality,
> > >>>>>>>>>>>>>>>> compatibility,
> > >>>>>>>>>>>>>>>>>>>>>>> ease
> > >>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> use, etc.). I think this is a very
> > >>> good
> > >>>>>>> process.
> > >>>>>>>>>> Only
> > >>>>>>>>>>>>> such a
> > >>>>>>>>>>>>>>>>>>>>>>> detailed
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion, In order to develop PR
> > >>> more
> > >>>>>>> clearly
> > >>>>>>>>> and
> > >>>>>>>>>>>>> smoothly
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> later
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> stage. I am very grateful to @Fabian
> > >>> and
> > >>>>>>>> @Xiaowei
> > >>>>>>>>>> for
> > >>>>>>>>>>>>>>>> sharing a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> lot
> > >>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> good ideas.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> About the definition of method
> > >>>> signatures
> > >>>>> I
> > >>>>>>> want
> > >>>>>>>>> to
> > >>>>>>>>>>>> share
> > >>>>>>>>>>>>> my
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> points
> > >>>>>>>>>>>>>>>>>>>>>>>>>> here
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> which I am discussing with fabian in
> > >>>>> google
> > >>>>>>> doc
> > >>>>>>>>> (not
> > >>>>>>>>>>> yet
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> completed),
> > >>>>>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> follows:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Assume we have a table:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> val tab = util.addTable[(Long,
> > >>>>>>>> String)]("MyTable",
> > >>>>>>>>>>>> 'long,
> > >>>>>>>>>>>>>>>>>>>>> 'string,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'proctime.proctime)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Approach 1:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> case1: Map follows Source Table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result =
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(udf('string)).as('proctime,
> > >>>> 'col1,
> > >>>>>>>>> 'col2)//
> > >>>>>>>>>>>>> proctime
> > >>>>>>>>>>>>>>>>>>>>>>> implied
> > >>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the output
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on
> > >>>> 'proctime
> > >>>>> as
> > >>>>>>> 'w)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian
> > >>>>>> mentioned
> > >>>>>>>>>> above)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result =
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>   .groupBy('w, 'k1, 'k2) // 'w
> > >> should
> > >>>> be
> > >>>>> a
> > >>>>>>>> group
> > >>>>>>>>>> key.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>   .flatAgg(tabAgg('a)).as('k1, 'k2,
> > >>> 'w,
> > >>>>>>> 'col1,
> > >>>>>>>>>> 'col2)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>   .select('k1, 'col1, 'w.rowtime as
> > >>>>> 'rtime)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s
> > >>>> approach,
> > >>>>>>> which
> > >>>>>>>>> the
> > >>>>>>>>>>>> result
> > >>>>>>>>>>>>>>>>>>> schema
> > >>>>>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> clearly defined, but add a built-in
> > >>>> append
> > >>>>>>> UDF.
> > >>>>>>>>> That
> > >>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface
> > >> only
> > >>>>>> accept
> > >>>>>>>> one
> > >>>>>>>>>>>>>> Expression.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> val result =
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(append(udf('string), 'long,
> > >>>>>>> 'proctime))
> > >>>>>>>> as
> > >>>>>>>>>>>> ('col1,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'col2,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'long, 'proctime)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on
> > >>>> 'proctime
> > >>>>>> as
> > >>>>>>>> 'w)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Note: Append is a special UDF for
> > >>>> built-in
> > >>>>>>> that
> > >>>>>>>>> can
> > >>>>>>>>>>> pass
> > >>>>>>>>>>>>>>>> through
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> column.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> So, May be we can defined the as
> > >>>>>>>>>>> table.map(Expression)
> > >>>>>>>>>>>>>>>> first,
> > >>>>>>>>>>>>>>>>>>>>> If
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> necessary, we can extend to
> > >>>>>>>> table.map(Expression*)
> > >>>>>>>>>> in
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> future
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> ?
> > >>>>>>>>>>>>>>>>>>>>>>>> Of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> course, I also hope that we can do
> > >>> more
> > >>>>>>>> perfection
> > >>>>>>>>>> in
> > >>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> through
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com>
> > >>>>>>>> 于2018年11月7日周三
> > >>>>>>>>>>>>> 下午11:45写道:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that the key question you
> > >>>> raised
> > >>>>> is
> > >>>>>>> if
> > >>>>>>>> we
> > >>>>>>>>>>> allow
> > >>>>>>>>>>>>>> extra
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> parameters
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the methods
> > >> map/flatMap/agg/flatAgg.
> > >>> I
> > >>>>> can
> > >>>>>>> see
> > >>>>>>>>> why
> > >>>>>>>>>>>>> allowing
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> appear
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> more convenient in some cases.
> > >>> However,
> > >>>>> it
> > >>>>>>>> might
> > >>>>>>>>>> also
> > >>>>>>>>>>>>> cause
> > >>>>>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> confusions
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we do that. For example, do we
> > >>> allow
> > >>>>>>>> multiple
> > >>>>>>>>>> UDFs
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>> these
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> expressions?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we do, the semantics may be
> > >> weird
> > >>> to
> > >>>>>>> define,
> > >>>>>>>>>> e.g.
> > >>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>> does
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>> table.groupBy('k).flatAgg(TableAggA('a),
> > >>>>>>>>>>> TableAggB('b))
> > >>>>>>>>>>>>>> mean?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> though
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not allowing it may appear less
> > >>>> powerful,
> > >>>>>> but
> > >>>>>>>> it
> > >>>>>>>>>> can
> > >>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>>>> things
> > >>>>>>>>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> intuitive too. In the case of
> > >>>>> agg/flatAgg,
> > >>>>>> we
> > >>>>>>>> can
> > >>>>>>>>>>>> define
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> keys
> > >>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implied in the result table and
> > >>> appears
> > >>>>> at
> > >>>>>>> the
> > >>>>>>>>>>>> beginning.
> > >>>>>>>>>>>>>> You
> > >>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>> use a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> select method if you want to modify
> > >>>> this
> > >>>>>>>>> behavior.
> > >>>>>>>>>> I
> > >>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> eventually
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we will have some API which allows
> > >>>> other
> > >>>>>>>>>> expressions
> > >>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters, but I think it's better
> > >>> to
> > >>>> do
> > >>>>>>> that
> > >>>>>>>>>> after
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>> introduce
> > >>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> concept of nested tables. A lot of
> > >>>> things
> > >>>>>> we
> > >>>>>>>>>>> suggested
> > >>>>>>>>>>>>> here
> > >>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> considered as special cases of
> > >> that.
> > >>>> But
> > >>>>>>> things
> > >>>>>>>>> are
> > >>>>>>>>>>>> much
> > >>>>>>>>>>>>>>>>>>> simpler
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave that to later.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM
> > >> Fabian
> > >>>>>> Hueske
> > >>>>>>> <
> > >>>>>>>>>>>>>>>>>>> fhue...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re emit:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think we should start with a
> > >> well
> > >>>>>>> understood
> > >>>>>>>>>>>> semantics
> > >>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>> full
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replacement. This is how the other
> > >>> agg
> > >>>>>>>> functions
> > >>>>>>>>>>> work.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As was said before, there are open
> > >>>>>> questions
> > >>>>>>>>>>> regarding
> > >>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>> append
> > >>>>>>>>>>>>>>>>>>>>>>>>>> mode
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (checkpointing, whether supporting
> > >>>>>>> retractions
> > >>>>>>>>> or
> > >>>>>>>>>>> not
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>> yes
> > >>>>>>>>>>>>>>>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare them, ...).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Since this seems to be an
> > >>>> optimization,
> > >>>>>> I'd
> > >>>>>>>>>> postpone
> > >>>>>>>>>>>> it.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re grouping keys:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we should
> > >>> automatically
> > >>>>> add
> > >>>>>>> them
> > >>>>>>>>>>> because
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> result
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> schema
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would not be intuitive.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would they be added at the
> > >> beginning
> > >>>> of
> > >>>>>> the
> > >>>>>>>>> tuple
> > >>>>>>>>>> or
> > >>>>>>>>>>>> at
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> end?
> > >>>>>>>>>>>>>>>>>>>>>>>>>> What
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields of windows would
> > >> be
> > >>>>> added?
> > >>>>>>> In
> > >>>>>>>>>> which
> > >>>>>>>>>>>>> order
> > >>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>> they
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, we could support syntax
> > >>> like
> > >>>>>> this:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val t: Table = ???
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> t
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble ... as 'w)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('a, 'b)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)),
> > >>>> 'w.end
> > >>>>> as
> > >>>>>>>>> 'wend,
> > >>>>>>>>>>>>>> 'w.rowtime
> > >>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 'rtime)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The result schema would be clearly
> > >>>>> defined
> > >>>>>>> as
> > >>>>>>>>> [b,
> > >>>>>>>>>> a,
> > >>>>>>>>>>>> f1,
> > >>>>>>>>>>>>>> f2,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ...,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> fn,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wend,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the
> > >>> result
> > >>>>>>>>> attributes
> > >>>>>>>>>> of
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> UDF.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re Multi-staged evaluation:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think this should be an
> > >>> optimization
> > >>>>>> that
> > >>>>>>>> can
> > >>>>>>>>> be
> > >>>>>>>>>>>>> applied
> > >>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> UDF
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implements the merge() method.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr
> > >>>>> schrieb
> > >>>>>>>>> Shaoxuan
> > >>>>>>>>>>>> Wang
> > >>>>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wshaox...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi xiaowei,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I agree with you that the
> > >>>>> semantics
> > >>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> TableAggregateFunction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> emit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> much more complex than
> > >>>>> AggregateFunction.
> > >>>>>>> The
> > >>>>>>>>>>>>> fundamental
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> difference
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that TableAggregateFunction
> > >> emits a
> > >>>>>> "table"
> > >>>>>>>>> while
> > >>>>>>>>>>>>>>>>>>>>>>>>>> AggregateFunction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outputs
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (a column of) a "row". In the
> > >> case
> > >>> of
> > >>>>>>>>>>>> AggregateFunction
> > >>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>>>> has
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mode which is “replacing”
> > >> (complete
> > >>>>>>> update).
> > >>>>>>>>> But
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableAggregateFunction, it could
> > >> be
> > >>>>>>>> incremental
> > >>>>>>>>>>> (only
> > >>>>>>>>>>>>>> emit
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> updated
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results) update or complete
> > >> update
> > >>>>>> (always
> > >>>>>>>> emit
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> entire
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “emit" is triggered).  From the
> > >>>>>> performance
> > >>>>>>>>>>>>> perspective,
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use incremental update. But we
> > >> need
> > >>>>>> review
> > >>>>>>>> and
> > >>>>>>>>>>> design
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> carefully,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> especially taking into account
> > >> the
> > >>>>> cases
> > >>>>>> of
> > >>>>>>>> the
> > >>>>>>>>>>>>> failover
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (instead
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> back-up the ACC it may also needs
> > >>> to
> > >>>>>>> remember
> > >>>>>>>>> the
> > >>>>>>>>>>>> emit
> > >>>>>>>>>>>>>>>>>>> offset)
> > >>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> retractions, as the semantics of
> > >>>>>>>>>>>> TableAggregateFunction
> > >>>>>>>>>>>>>>>> emit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> than other UDFs. TableFunction
> > >> also
> > >>>>>> emits a
> > >>>>>>>>>> table,
> > >>>>>>>>>>>> but
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>> does
> > >>>>>>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> worry this due to the nature of
> > >>>>>> stateless.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shaoxuan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM
> > >>>> Xiaowei
> > >>>>>>> Jiang
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <xiaow...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for adding the public
> > >>>>>> interfaces! I
> > >>>>>>>>> think
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>> it's a
> > >>>>>>>>>>>>>>>>>>>>>>>>>> very
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> good
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> start. There are a few points
> > >> that
> > >>>> we
> > >>>>>> need
> > >>>>>>>> to
> > >>>>>>>>>> have
> > >>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - TableAggregateFunction - this
> > >>> is a
> > >>>>>> very
> > >>>>>>>>>> complex
> > >>>>>>>>>>>>> beast,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> definitely
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most complex user defined
> > >> objects
> > >>> we
> > >>>>>>>>> introduced
> > >>>>>>>>>> so
> > >>>>>>>>>>>>> far.
> > >>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite some interesting questions
> > >>>> here.
> > >>>>>> For
> > >>>>>>>>>>> example,
> > >>>>>>>>>>>> do
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> allow
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multi-staged TableAggregate in
> > >>> this
> > >>>>>> case?
> > >>>>>>>> What
> > >>>>>>>>>> is
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> semantics
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit? Is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it amendments to the previous
> > >>>> output,
> > >>>>> or
> > >>>>>>>>>> replacing
> > >>>>>>>>>>>>> it? I
> > >>>>>>>>>>>>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> subject itself is worth a
> > >>> discussion
> > >>>>> to
> > >>>>>>> make
> > >>>>>>>>>> sure
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> get
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> details
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the
> > >>> group
> > >>>>> keys
> > >>>>>>>>>>>> automatically
> > >>>>>>>>>>>>>>>>>>>>>>>>>> appear
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output? how about the case of
> > >>>>> windowing
> > >>>>>>>>>>> aggregation?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM
> > >>>>> jincheng
> > >>>>>>> sun
> > >>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, Xiaowei,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss
> > >>> of
> > >>>>>> Table
> > >>>>>>>> API
> > >>>>>>>>>>>>>> Enhancement
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Outline
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> !
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I quickly looked at the overall
> > >>>>>> content,
> > >>>>>>>>> these
> > >>>>>>>>>>> are
> > >>>>>>>>>>>>> good
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> expressions
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offline discussions. But from
> > >> the
> > >>>>>> points
> > >>>>>>> of
> > >>>>>>>>> my
> > >>>>>>>>>>>> view,
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> add
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> usage of public interfaces that
> > >>> we
> > >>>>> will
> > >>>>>>>>>> introduce
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> propose.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added the following usage
> > >>>> description
> > >>>>>> of
> > >>>>>>>>>>> interface
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> operators
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> google doc:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Map Operator
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map operator is a new operator
> > >> of
> > >>>>>> Table,
> > >>>>>>>> Map
> > >>>>>>>>>>>> operator
> > >>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scalar function, and can return
> > >>>>>>>> multi-column.
> > >>>>>>>>>> The
> > >>>>>>>>>>>>> usage
> > >>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> follows:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .map(fun:
> > >> ScalarFunction).as(‘a,
> > >>>> ‘b,
> > >>>>>> ‘c)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. FlatMap Operator
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FaltMap operator is a new
> > >>> operator
> > >>>> of
> > >>>>>>>> Table,
> > >>>>>>>>>>>> FlatMap
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> operator
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a table function, and can
> > >> return
> > >>>>>>> multi-row.
> > >>>>>>>>> The
> > >>>>>>>>>>>> usage
> > >>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> follows:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .flatMap(fun:
> > >>>> TableFunction).as(‘a,
> > >>>>>> ‘b,
> > >>>>>>>> ‘c)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .select(‘a, ‘c)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Agg Operator
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Agg operator is a new operator
> > >> of
> > >>>>>>>>>>>> Table/GroupedTable,
> > >>>>>>>>>>>>>>>> Agg
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply a aggregate function, and
> > >>> can
> > >>>>>>> return
> > >>>>>>>>>>>>>> multi-column.
> > >>>>>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> usage
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> follows:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .groupBy(‘a) // leave
> > >>>>> groupBy-Clause
> > >>>>>>> out
> > >>>>>>>> to
> > >>>>>>>>>>>> define
> > >>>>>>>>>>>>>>>>>>>>>>>>>> global
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .agg(fun:
> > >>>> AggregateFunction).as(‘a,
> > >>>>>> ‘b,
> > >>>>>>>> ‘c)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .select(‘a, ‘c)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4.  FlatAgg Operator
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FlatAgg operator is a new
> > >>> operator
> > >>>> of
> > >>>>>>>>>>>>>>>> Table/GroupedTable,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> FaltAgg
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator can apply a table
> > >>>> aggregate
> > >>>>>>>>> function,
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>> return
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multi-row.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The usage as follows:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .groupBy(‘a) // leave
> > >>>>> groupBy-Clause
> > >>>>>>> out
> > >>>>>>>>> to
> > >>>>>>>>>>>> define
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> global
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .flatAgg(fun:
> > >>>>>>>>> TableAggregateFunction).as(‘a,
> > >>>>>>>>>>> ‘b,
> > >>>>>>>>>>>>> ‘c)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The behavior of table
> > >> aggregates
> > >>>> is
> > >>>>>> most
> > >>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> GroupReduceFunction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> did,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which computed for a group of
> > >>>>> elements,
> > >>>>>>> and
> > >>>>>>>>>>>> output  a
> > >>>>>>>>>>>>>>>> group
> > >>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elements.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The TableAggregateFunction can
> > >> be
> > >>>>>> applied
> > >>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> GroupedTable.flatAgg() .
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface of
> > >>> TableAggregateFunction
> > >>>>>> has a
> > >>>>>>>> lot
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>> content,
> > >>>>>>>>>>>>>>>>>>> so
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> copy
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it here, Please look at the
> > >>> detail
> > >>>> in
> > >>>>>>>> google
> > >>>>>>>>>> doc:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will be very appreciate to
> > >>> anyone
> > >>>>> for
> > >>>>>>>>>> reviewing
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> commenting.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> -----------------------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> *Rome was not built in one day*
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> -----------------------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
> >
>

Reply via email to