Hi,

My concerns are about the case when there is no additional select() method,
i.e.,

tab.window(Tumble ... as 'w)
    .groupBy('w, 'k1, 'k2)
    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)

In this case, 'w is a composite field consisting of three fields (end,
start, rowtime).
Once we add a new property, it would need to be added to the composite
type.
Any query that relies on the composite type with three fields will fail
after adding a forth field.

Best, Fabian

Am Fr., 23. Nov. 2018 um 02:01 Uhr schrieb jincheng sun <
sunjincheng...@gmail.com>:

> Thanks Fabian,
>
> Thanks a lot for your feedback, and very important and necessary design
> reminders!
>
> Yes, your are right!  Spark is the specified grouping columns displayed
> before 1.3, but the grouping columns are implicitly passed in spark1.4 and
> later. The reason for changing this behavior is that due to the user
> feedback. Although implicit delivery will have the drawbacks you mentioned,
> this approach is really convenient for the user.
> I agree that grouping on windows we have to pay attention to the handling
> of the window's properties, because we may introduce new window property.
> So, from the points of view, We delay the processing of the window
> property, ie: we pass the complex type 'w on the tableAPI, and provide
> different property method operations in the SELECT according to the type of
> 'w, such as: 'w.start, 'w.end, 'w.xxx , in the TableAPI will limit and
> verify the attribute operations that 'w has. An example is as follows:
>
> tab.window(Tumble ... as 'w)
>     .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>     .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) // 'w is
> composite field
>     .select('k1, 'col1, 'w.rowtime as 'ts, 'w.xxx as 'xx) // In select we
> will limit and verify  that ’w.xx is allowed
>
> I am not sure if I fully understand your concerns, if there any understand
> are mistakes, please correct me. Any feedback is appreciate!
>
> Bests,
> Jincheng
>
>
> Fabian Hueske <fhue...@gmail.com> 于2018年11月22日周四 下午10:13写道:
>
> > Hi all,
> >
> > First of all, it is correct that the flatMap(Expression*) and
> > flatAggregate(Expression*) methods would mix scalar and table values.
> > This would be a new concept that is not present in the current API.
> > From my point of view, the semantics are quite clear, but I understand
> that
> > others are more careful and worry about future extensions.
> >
> > I am fine with going for single expression arguments for map() and
> > flatMap(). We can later expand them to Expression* if we feel the need
> and
> > are more comfortable about the implications.
> > Whenever, a time attribute needs to be forwarded, users can fall back to
> > join(TableFunction) as Xiaowei mentioned.
> > So we restrict the usability of the new methods but don't lose
> > functionality and don't prevent future extensions.
> >
> > The aggregate() and flatAggregate() case is more difficult because
> implicit
> > forwarding of grouping fields cannot be changed later without breaking
> the
> > API.
> > There are other APIs (e.g., Spark) that also implicitly forward the
> > grouping columns. So this is not uncommon.
> > However, I personally don't like that approach, because it is implicit
> and
> > introduces a new behavior that is not present in the current API.
> >
> > One thing to consider here is the handling of grouping on windows.
> > If I understood Xiaowei correctly, a composite field that is named like
> the
> > window alias (e.g., 'w) would be implicitly added to the result of
> > aggregate() or flatAggregate().
> > The composite field would have fields like (start, end, rowtime) or
> (start,
> > end, proctime) depending on the window type.
> > If we would ever introduce a fourth window property, we might break
> > existing queries.
> > Is this something that we should worry about?
> >
> > Best,
> > Fabian
> >
> > Am Do., 22. Nov. 2018 um 14:03 Uhr schrieb Piotr Nowojski <
> > pi...@data-artisans.com>:
> >
> > > Hi Jincheng,
> > >
> > > #1) ok, got it.
> > >
> > > #3)
> > > > From points of my view I we can using
> > > > `Expression`, and after the discussion decided to use Expression*,
> then
> > > > improve it. In any case, we can use Expression, and there is an
> > > opportunity
> > > > to become Expression* (compatibility). If we use Expression*
> directly,
> > it
> > > > is difficult for us to become Expression, which will break the
> > > > compatibility between versions.  What do you think?
> > >
> > > I don’t think that’s the case here. If we start with single param
> > > `flatMap(Expression)`, it will need implicit columns to be present in
> the
> > > result, which:
> > >
> > > a) IMO it brakes SQL convention (that’s why I’m against this)
> > > b) we can not later easily introduce `flatMap(Expression*)` without
> those
> > > implicit columns, without braking the compatibility or at least without
> > > making `flatMap(Expression*)` and `flatMap(Expression)` terribly
> > > inconsistent.
> > >
> > > To elaborate on (a). It’s not nice if our own API is inconsistent and
> it
> > > sometimes behaves one way and sometimes another way:
> > >
> > > table.groupBy(‘k).select(scalarAggregateFunction(‘v)) => single column
> > > result, just the output of `scalarAggregateFunction`
> > > vs
> > > table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v)) => both
> > result
> > > of `tableAggregateFunction` plus key (and an optional window context ?)
> > >
> > > Thus I think we have to now decide which way we want to jump, since
> later
> > > will be too late. Or again, am I missing something? :)
> > >
> > > Piotrek
> > >
> > > > On 22 Nov 2018, at 02:07, jincheng sun <sunjincheng...@gmail.com>
> > wrote:
> > > >
> > > > Hi Piotrek,
> > > > #1)We have unbounded and bounded group window aggregate, for
> unbounded
> > > case
> > > > we should early fire the result with retract message, we can not
> using
> > > > watermark, because unbounded aggregate never finished. (for
> improvement
> > > we
> > > > can introduce micro-batch in feature),  for bounded window we never
> > > support
> > > > early fire, so we do not need retract.
> > > > #3)  About validation of `table.select(F(‘a).unnest(), ‘b,
> > > > G(‘c).unnest())/table.flatMap(F(‘a), ‘b, scalarG(‘c))` Fabian had
> > > mentioned
> > > > above, please look at the prior mail.  For `table.flatMap(F(‘a), ‘b,
> > > > scalarG(‘c))` that we concerned, i.e.:  we should discuss the issue
> of
> > > > `Expression*` vs `Expression`. From points of my view I we can using
> > > > `Expression`, and after the discussion decided to use Expression*,
> then
> > > > improve it. In any case, we can use Expression, and there is an
> > > opportunity
> > > > to become Expression* (compatibility). If we use Expression*
> directly,
> > it
> > > > is difficult for us to become Expression, which will break the
> > > > compatibility between versions.  What do you think?
> > > >
> > > > If there anything not clearly, welcome any feedback!Agains,thanks for
> > > share
> > > > your thoughts!
> > > >
> > > > Thanks,
> > > > Jincheng
> > > >
> > > > Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午9:37写道:
> > > >
> > > >> Hi Jincheng,
> > > >>
> > > >>> #1) No,watermark solves the issue of the late event. Here, the
> > > >> performance
> > > >>> problem is caused by the update emit mode. i.e.: When current
> > > calculation
> > > >>> result is output, the previous calculation result needs to be
> > > retracted.
> > > >>
> > > >> Hmm, yes I missed this. For time-windowed cases (some
> > > >> aggregate/flatAggregate cases) emitting only on watermark should
> solve
> > > the
> > > >> problem. For non time windowed cases it would reduce the amount of
> > > >> retractions, right? Or am I still missing something?
> > > >>
> > > >>> #3)I still hope to keep the simplicity that select only support
> > > projected
> > > >>> scalar, we can hardly tell the semantics of tab.select(flatmap('a),
> > 'b,
> > > >>> flatmap('d)).
> > > >>
> > > >> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest())
> > > >>
> > > >> Could be rejected during some validation phase. On the other hand:
> > > >>
> > > >> table.select(F(‘a).unnest(), ‘b, scalarG(‘c))
> > > >> or
> > > >> table.flatMap(F(‘a), ‘b, scalarG(‘c))
> > > >>
> > > >> Could work and be more or less a syntax sugar for cross apply.
> > > >>
> > > >> Piotrek
> > > >>
> > > >>> On 21 Nov 2018, at 12:16, jincheng sun <sunjincheng...@gmail.com>
> > > wrote:
> > > >>>
> > > >>> Hi shaoxuan & Hequn,
> > > >>>
> > > >>> Thanks for your suggestion,I'll file the JIRAs later.
> > > >>> We can prepare PRs while continuing to move forward the ongoing
> > > >> discussion.
> > > >>>
> > > >>> Regards,
> > > >>> Jincheng
> > > >>>
> > > >>> jincheng sun <sunjincheng...@gmail.com> 于2018年11月21日周三 下午7:07写道:
> > > >>>
> > > >>>> Hi Piotrek,
> > > >>>> Thanks for your feedback, and thanks for  share your thoughts!
> > > >>>>
> > > >>>> #1) No,watermark solves the issue of the late event. Here, the
> > > >> performance
> > > >>>> problem is caused by the update emit mode. i.e.: When current
> > > >> calculation
> > > >>>> result is output, the previous calculation result needs to be
> > > retracted.
> > > >>>> #2) As I mentioned above we should continue the discussion until
> we
> > > >> solve
> > > >>>> the problems raised by Xiaowei and Fabian.
> > > >>>> #3)I still hope to keep the simplicity that select only support
> > > >> projected
> > > >>>> scalar, we can hardly tell the semantics of
> tab.select(flatmap('a),
> > > 'b,
> > > >>>> flatmap('d)).
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Jincheng
> > > >>>>
> > > >>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午5:24写道:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> 1.
> > > >>>>>
> > > >>>>>> In fact, in addition to the design of APIs, there will be
> various
> > > >>>>>> performance optimization details, such as: table Aggregate
> > function
> > > >>>>>> emitValue will generate multiple calculation results, in extreme
> > > >> cases,
> > > >>>>>> each record will trigger a large number of retract messages,
> this
> > > will
> > > >>>>> have
> > > >>>>>> poor performance
> > > >>>>>
> > > >>>>> Can this be solved/mitigated by emitting the results only on
> > > >> watermarks?
> > > >>>>> I think that was the path that we decided to take both for
> Temporal
> > > >> Joins
> > > >>>>> and upsert stream conversion. I know that this increases the
> > latency
> > > >> and
> > > >>>>> there is a place for a future global setting/user preference
> “emit
> > > the
> > > >> data
> > > >>>>> ASAP mode”, but emitting only on watermarks seems to me as a
> > > >> better/more
> > > >>>>> sane default.
> > > >>>>>
> > > >>>>> 2.
> > > >>>>>
> > > >>>>> With respect to the API discussion and implicit columns. The
> > problem
> > > >> for
> > > >>>>> me so far is I’m not sure if I like the additionally complexity
> of
> > > >>>>> `append()` solution, while implicit columns are definitely not in
> > the
> > > >>>>> spirit of SQL. Neither joins nor aggregations add extra
> unexpected
> > > >> columns
> > > >>>>> to the result without asking. This definitely can be confusing
> for
> > > the
> > > >>>>> users since it brakes the convention. Thus I would lean towards
> > > >> Fabian’s
> > > >>>>> proposal of multi-argument `map(Expression*)` from those 3
> options.
> > > >>>>>
> > > >>>>> 3.
> > > >>>>>
> > > >>>>> Another topic is that I’m not 100% convinced that we should be
> > adding
> > > >> new
> > > >>>>> api functions for `map`,`aggregate`,`flatMap` and
> `flatAggregate`.
> > I
> > > >> think
> > > >>>>> the same could be achieved by changing
> > > >>>>>
> > > >>>>> table.map(F('x))
> > > >>>>>
> > > >>>>> into
> > > >>>>>
> > > >>>>> table.select(F('x)).unnest()
> > > >>>>> or
> > > >>>>> table.select(F('x).unnest())
> > > >>>>>
> > > >>>>> Where `unnest()` means unnest row/tuple type into a columnar
> table.
> > > >>>>>
> > > >>>>> table.flatMap(F('x))
> > > >>>>>
> > > >>>>> Could be on the other hand also handled by
> > > >>>>>
> > > >>>>> table.select(F('x))
> > > >>>>>
> > > >>>>> By correctly deducing that F(x) is a multi row output function
> > > >>>>>
> > > >>>>> Same might apply to `aggregate(F('x))`, but this maybe could be
> > > >> replaced
> > > >>>>> by:
> > > >>>>>
> > > >>>>> table.groupBy(…).select(F('x).unnest())
> > > >>>>>
> > > >>>>> Adding scalar functions should also be possible:
> > > >>>>>
> > > >>>>> table.groupBy('k).select(F('x).unnest(), ‘k)
> > > >>>>>
> > > >>>>> Maybe such approach would allow us to implement the same features
> > in
> > > >> the
> > > >>>>> SQL as well?
> > > >>>>>
> > > >>>>> Piotrek
> > > >>>>>
> > > >>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <chenghe...@gmail.com>
> > wrote:
> > > >>>>>>
> > > >>>>>> Hi,
> > > >>>>>>
> > > >>>>>> Thank you all for the great proposal and discussion!
> > > >>>>>> I also prefer to move on to the next step, so +1 for opening the
> > > JIRAs
> > > >>>>> to
> > > >>>>>> start the work.
> > > >>>>>> We can have more detailed discussion there. Btw, we can start
> with
> > > >> JIRAs
> > > >>>>>> which we have agreed on.
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Hequn
> > > >>>>>>
> > > >>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <
> > wshaox...@gmail.com
> > > >
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> +1. I agree that we should open the JIRAs to start the work. We
> > may
> > > >>>>>>> have better ideas on the flavor of the interface when
> > > >> implement/review
> > > >>>>>>> the code.
> > > >>>>>>>
> > > >>>>>>> Regards,
> > > >>>>>>> shaoxuan
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On 11/20/18, jincheng sun <sunjincheng...@gmail.com> wrote:
> > > >>>>>>>> Hi all,
> > > >>>>>>>>
> > > >>>>>>>> Thanks all for the feedback.
> > > >>>>>>>>
> > > >>>>>>>> @Piotr About not using abbreviations naming,  +1,I like
> > > >>>>>>>> your proposal!Currently both DataSet and DataStream API are
> > using
> > > >>>>>>>> `aggregate`,
> > > >>>>>>>> BTW,I find other language also not using abbreviations
> > naming,such
> > > >> as
> > > >>>>> R.
> > > >>>>>>>>
> > > >>>>>>>> Sometimes the interface of the API is really difficult to
> > perfect,
> > > >> we
> > > >>>>>>> need
> > > >>>>>>>> to spend a lot of time thinking and feedback from a large
> number
> > > of
> > > >>>>>>> users,
> > > >>>>>>>> and constantly improve, but for backward compatibility issues,
> > we
> > > >>>>> have to
> > > >>>>>>>> adopt the most conservative approach when designing the API(Of
> > > >>>>> course, I
> > > >>>>>>> am
> > > >>>>>>>> more in favor of developing more rich features, when we
> discuss
> > > >>>>> clearly).
> > > >>>>>>>> Therefore, I propose to divide the function implementation of
> > > >>>>>>>> map/faltMap/agg/flatAgg into basic functions of JIRAs and
> JIRAs
> > > that
> > > >>>>>>>> support time attributes and groupKeys. We can develop the
> > features
> > > >>>>> which
> > > >>>>>>>> we  have already agreed on the design. And we will continue to
> > > >> discuss
> > > >>>>>>> the
> > > >>>>>>>> uncertain design.
> > > >>>>>>>>
> > > >>>>>>>> In fact, in addition to the design of APIs, there will be
> > various
> > > >>>>>>>> performance optimization details, such as: table Aggregate
> > > function
> > > >>>>>>>> emitValue will generate multiple calculation results, in
> extreme
> > > >>>>> cases,
> > > >>>>>>>> each record will trigger a large number of retract messages,
> > this
> > > >> will
> > > >>>>>>> have
> > > >>>>>>>> poor performance,so we will also optimize the interface
> design,
> > > such
> > > >>>>> as
> > > >>>>>>>> adding the emitWithRetractValue interface (I have updated the
> > > google
> > > >>>>> doc)
> > > >>>>>>>> to allow the user to optionally perform incremental
> > calculations,
> > > >> thus
> > > >>>>>>>> avoiding a large number of retracts. Details like this are
> > > difficult
> > > >>>>> to
> > > >>>>>>>> fully discuss in the mail list, so I recommend creating
> > JIRAs/FLIP
> > > >>>>> first,
> > > >>>>>>>> we develop designs that have been agreed upon and continue to
> > > >> discuss
> > > >>>>>>>> non-deterministic designs!  What do you think? @Fabian &
> Piotr &
> > > >>>>> XiaoWei
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> Jincheng
> > > >>>>>>>>
> > > >>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一 上午12:07写道:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Fabian & Piotr, thanks for the feedback!
> > > >>>>>>>>>
> > > >>>>>>>>> I appreciate your concerns, both on timestamp attributes as
> > well
> > > as
> > > >>>>> on
> > > >>>>>>>>> implicit group keys. At the same time, I'm also concerned
> with
> > > the
> > > >>>>>>>>> proposed
> > > >>>>>>>>> approach of allowing Expression* as parameters, especially
> for
> > > >>>>>>>>> flatMap/flatAgg. So far, we never allowed a scalar expression
> > to
> > > >>>>> appear
> > > >>>>>>>>> together with table expressions. With the Expression*
> approach,
> > > >> this
> > > >>>>>>> will
> > > >>>>>>>>> happen for the parameters to flatMap/flatAgg. I'm a bit
> > concerned
> > > >> on
> > > >>>>> if
> > > >>>>>>>>> we
> > > >>>>>>>>> fully understand the consequences when we try to extend our
> > > system
> > > >> in
> > > >>>>>>> the
> > > >>>>>>>>> future. I would be extra cautious in doing this. To avoid
> > this, I
> > > >>>>> think
> > > >>>>>>>>> an
> > > >>>>>>>>> implicit group key for flatAgg is safer. For flatMap, if
> users
> > > want
> > > >>>>> to
> > > >>>>>>>>> keep
> > > >>>>>>>>> the rowtime column, he can use crossApply/join instead. So we
> > are
> > > >> not
> > > >>>>>>>>> losing any real functionality here.
> > > >>>>>>>>>
> > > >>>>>>>>> Also a clarification on the following example:
> > > >>>>>>>>> tab.window(Tumble ... as 'w)
> > > >>>>>>>>>  .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > >>>>>>>>>  .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > > >>>>>>>>>  .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > >>>>>>>>> If we did not have the select clause in this example, we will
> > > have
> > > >>>>> 'w as
> > > >>>>>>>>> a
> > > >>>>>>>>> regular column in the output. It should not magically
> > disappear.
> > > >>>>>>>>>
> > > >>>>>>>>> The concern is not as strong for Table.map/Table.agg because
> we
> > > are
> > > >>>>> not
> > > >>>>>>>>> mixing scalar and table expressions. But we also want to be a
> > bit
> > > >>>>>>>>> consistent with these methods. If we used implicit group keys
> > for
> > > >>>>>>>>> Table.flatAgg, we probably should do the same for Table.agg.
> > Now
> > > we
> > > >>>>> only
> > > >>>>>>>>> have to choose what to do with Table.map. I can see good
> > > arguments
> > > >>>>> from
> > > >>>>>>>>> both sides. But starting with a single Expression seems safer
> > > >> because
> > > >>>>>>>>> that
> > > >>>>>>>>> we can always extend to Expression* in the future.
> > > >>>>>>>>>
> > > >>>>>>>>> While thinking about this problem, it appears that we may
> need
> > > more
> > > >>>>> work
> > > >>>>>>>>> in
> > > >>>>>>>>> our handling of watermarks for SQL/Table API. Our current way
> > of
> > > >>>>>>>>> propagating the watermarks from source all the way to sink
> > might
> > > >> not
> > > >>>>> be
> > > >>>>>>>>> optimal. For example, after a tumbling window, the watermark
> > can
> > > >>>>>>> actually
> > > >>>>>>>>> be advanced to just before the expiring of next window. I
> think
> > > >> that
> > > >>>>> in
> > > >>>>>>>>> general, each operator may need to generate new watermarks
> > > instead
> > > >> of
> > > >>>>>>>>> simply propagating them. Once we accept that watermarks may
> > > change
> > > >>>>>>> during
> > > >>>>>>>>> the execution, it appears that the timestamp columns may also
> > > >>>>> change, as
> > > >>>>>>>>> long as we have some way to associate watermark with it. My
> > > >>>>> intuition is
> > > >>>>>>>>> that once we have a through solution for the watermark issue,
> > we
> > > >> may
> > > >>>>> be
> > > >>>>>>>>> able to solve the problem we encountered for Table.map in a
> > > cleaner
> > > >>>>> way.
> > > >>>>>>>>> But this is a complex issue which deserves a discussion on
> its
> > > own.
> > > >>>>>>>>>
> > > >>>>>>>>> Regards,
> > > >>>>>>>>> Xiaowei
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <
> > > >>>>>>> pi...@data-artisans.com>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Isn’t the problem of multiple expressions limited only to
> > > >> `flat***`
> > > >>>>>>>>>> functions and to be more specific only to having two (or
> more)
> > > >>>>>>>>>> different
> > > >>>>>>>>>> table functions passed as an expressions?
> > > `.flatAgg(TableAggA('a),
> > > >>>>>>>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well
> > > >> defined
> > > >>>>>>>>>> (duplicate result of every scalar function to every record.
> Or
> > > am
> > > >> I
> > > >>>>>>>>> missing
> > > >>>>>>>>>> something?
> > > >>>>>>>>>>
> > > >>>>>>>>>> Another remark, I would be in favour of not using
> > abbreviations
> > > >> and
> > > >>>>>>>>> naming
> > > >>>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Piotrek
> > > >>>>>>>>>>
> > > >>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com
> >
> > > >> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Hi Jincheng,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I said before, that I think that the append() method is
> > better
> > > >> than
> > > >>>>>>>>>>> implicitly forwarding keys, but still, I believe it adds
> > > >>>>> unnecessary
> > > >>>>>>>>>> boiler
> > > >>>>>>>>>>> plate code.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Moreover, I haven't seen a convincing argument why
> > > >> map(Expression*)
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>> worse than map(Expression). In either case we need to do
> all
> > > >> kinds
> > > >>>>>>> of
> > > >>>>>>>>>>> checks to prevent invalid use of functions.
> > > >>>>>>>>>>> If the method is not correctly used, we can emit a good
> error
> > > >>>>>>> message
> > > >>>>>>>>> and
> > > >>>>>>>>>>> documenting map(Expression*) will be easier than
> > > >>>>>>>>>> map(append(Expression*)),
> > > >>>>>>>>>>> in my opinion.
> > > >>>>>>>>>>> I think we should not add unnessary syntax unless there is
> a
> > > good
> > > >>>>>>>>> reason
> > > >>>>>>>>>>> and to be honest, I haven't seen this reason yet.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Regarding the groupBy.agg() method, I think it should
> behave
> > > just
> > > >>>>>>>>>>> like
> > > >>>>>>>>>> any
> > > >>>>>>>>>>> other method, i.e., not do any implicit forwarding.
> > > >>>>>>>>>>> Let's take the example of the windowed group by, that you
> > > posted
> > > >>>>>>>>> before.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > >>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > > >>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> What happens if 'w.rowtime is not selected? What is the
> data
> > > type
> > > >>>>> of
> > > >>>>>>>>> the
> > > >>>>>>>>>>> field 'w in the resulting Table? Is it a regular field at
> all
> > > or
> > > >>>>>>> just
> > > >>>>>>>>>>> a
> > > >>>>>>>>>>> system field that disappears if it is not selected?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> IMO, the following syntax is shorter, more explicit, and
> > better
> > > >>>>>>>>>>> aligned
> > > >>>>>>>>>>> with the regular window.groupBy.select aggregations that
> are
> > > >>>>>>>>>>> supported
> > > >>>>>>>>>>> today.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > >>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Best, Fabian
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
> > > >>>>>>>>>>> sunjincheng...@gmail.com>:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Fabian/Xiaowei,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I am very sorry for my late reply! Glad to see your reply,
> > and
> > > >>>>>>>>>>>> sounds
> > > >>>>>>>>>>>> pretty good!
> > > >>>>>>>>>>>> I agree that the approach with append() which can clearly
> > > >> defined
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>> result schema is better which Fabian mentioned.
> > > >>>>>>>>>>>> In addition and append() and also contains non-time
> > > attributes,
> > > >>>>>>>>>>>> e.g.:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime)
> > > >>>>>>>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1,
> > > 'col2,
> > > >>>>>>>>>>>> 'address, 'rowtime)
> > > >>>>>>>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w)
> > > >>>>>>>>>>>> .groupBy('w, 'address)
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> In this way the append() is very useful, and the behavior
> is
> > > >> very
> > > >>>>>>>>>> similar
> > > >>>>>>>>>>>> to withForwardedFields() in DataSet.
> > > >>>>>>>>>>>> So +1 to using append() approach for the map()&flatmap()!
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg
> case I
> > > >> agree
> > > >>>>>>>>>>>> Xiaowei's approach that define the keys to be implied in
> the
> > > >>>>> result
> > > >>>>>>>>>> table
> > > >>>>>>>>>>>> and appears at the beginning, for example as follows:
> > > >>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > >>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > >>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > > >>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> What to you think? @Fabian @Xiaowei
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>> Jincheng
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Jincheng,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks for the summary!
> > > >>>>>>>>>>>>> I like the approach with append() better than the
> implicit
> > > >>>>>>>>>>>>> forwarding
> > > >>>>>>>>>> as
> > > >>>>>>>>>>>> it
> > > >>>>>>>>>>>>> clearly indicates which fields are forwarded.
> > > >>>>>>>>>>>>> However, I don't see much benefit over the
> > > flatMap(Expression*)
> > > >>>>>>>>>> variant,
> > > >>>>>>>>>>>> as
> > > >>>>>>>>>>>>> we would still need to analyze the full expression tree
> to
> > > >> ensure
> > > >>>>>>>>> that
> > > >>>>>>>>>> at
> > > >>>>>>>>>>>>> most (or exactly?) one Scalar / TableFunction is used.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>> Fabian
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
> > > >>>>>>>>>>>>> sunjincheng...@gmail.com>:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> We are discussing very detailed content about this
> > proposal.
> > > >> We
> > > >>>>>>>>>>>>>> are
> > > >>>>>>>>>>>>> trying
> > > >>>>>>>>>>>>>> to design the API in many aspects (functionality,
> > > >> compatibility,
> > > >>>>>>>>> ease
> > > >>>>>>>>>>>> of
> > > >>>>>>>>>>>>>> use, etc.). I think this is a very good process. Only
> > such a
> > > >>>>>>>>> detailed
> > > >>>>>>>>>>>>>> discussion, In order to develop PR more clearly and
> > smoothly
> > > >> in
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>> later
> > > >>>>>>>>>>>>>> stage. I am very grateful to @Fabian and  @Xiaowei for
> > > >> sharing a
> > > >>>>>>>>>>>>>> lot
> > > >>>>>>>>>> of
> > > >>>>>>>>>>>>>> good ideas.
> > > >>>>>>>>>>>>>> About the definition of method signatures I want to
> share
> > my
> > > >>>>>>>>>>>>>> points
> > > >>>>>>>>>>>> here
> > > >>>>>>>>>>>>>> which I am discussing with fabian in google doc (not yet
> > > >>>>>>>>>>>>>> completed),
> > > >>>>>>>>>> as
> > > >>>>>>>>>>>>>> follows:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Assume we have a table:
> > > >>>>>>>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable",
> 'long,
> > > >>>>>>> 'string,
> > > >>>>>>>>>>>>>> 'proctime.proctime)
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Approach 1:
> > > >>>>>>>>>>>>>> case1: Map follows Source Table
> > > >>>>>>>>>>>>>> val result =
> > > >>>>>>>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)//
> > proctime
> > > >>>>>>>>> implied
> > > >>>>>>>>>>>> in
> > > >>>>>>>>>>>>>> the output
> > > >>>>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w)
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above)
> > > >>>>>>>>>>>>>> val result =
> > > >>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > >>>>>>>>>>>>>>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > >>>>>>>>>>>>>>    .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
> > > >>>>>>>>>>>>>>    .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the
> result
> > > >>>>> schema
> > > >>>>>>>>>> would
> > > >>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>> clearly defined, but add a built-in append UDF. That
> make
> > > >>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface only accept one
> > > Expression.
> > > >>>>>>>>>>>>>> val result =
> > > >>>>>>>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as
> ('col1,
> > > >>>>>>>>>>>>>> 'col2,
> > > >>>>>>>>>>>>>> 'long, 'proctime)
> > > >>>>>>>>>>>>>>  .window(Tumble over 5.millis on 'proctime as 'w)
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Note: Append is a special UDF for built-in that can pass
> > > >> through
> > > >>>>>>>>>>>>>> any
> > > >>>>>>>>>>>>>> column.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> So, May be we can defined the as  table.map(Expression)
> > > >> first,
> > > >>>>>>> If
> > > >>>>>>>>>>>>>> necessary, we can extend to table.map(Expression*)  in
> the
> > > >>>>> future
> > > >>>>>>>>>>>>>> ?
> > > >>>>>>>>>> Of
> > > >>>>>>>>>>>>>> course, I also hope that we can do more perfection in
> this
> > > >>>>>>>>>>>>>> proposal
> > > >>>>>>>>>>>>> through
> > > >>>>>>>>>>>>>> discussion.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>> Jincheng
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三
> > 下午11:45写道:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Hi Fabian,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I think that the key question you raised is if we allow
> > > extra
> > > >>>>>>>>>>>>> parameters
> > > >>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why
> > allowing
> > > >>>>> that
> > > >>>>>>>>> may
> > > >>>>>>>>>>>>>> appear
> > > >>>>>>>>>>>>>>> more convenient in some cases. However, it might also
> > cause
> > > >>>>> some
> > > >>>>>>>>>>>>>> confusions
> > > >>>>>>>>>>>>>>> if we do that. For example, do we allow multiple UDFs
> in
> > > >> these
> > > >>>>>>>>>>>>>> expressions?
> > > >>>>>>>>>>>>>>> If we do, the semantics may be weird to define, e.g.
> what
> > > >> does
> > > >>>>>>>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b))
> > > mean?
> > > >>>>>>>>>>>>>>> Even
> > > >>>>>>>>>>>>> though
> > > >>>>>>>>>>>>>>> not allowing it may appear less powerful, but it can
> make
> > > >>>>> things
> > > >>>>>>>>> more
> > > >>>>>>>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can
> define
> > > the
> > > >>>>>>> keys
> > > >>>>>>>>> to
> > > >>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>> implied in the result table and appears at the
> beginning.
> > > You
> > > >>>>>>> can
> > > >>>>>>>>>>>> use a
> > > >>>>>>>>>>>>>>> select method if you want to modify this behavior. I
> > think
> > > >> that
> > > >>>>>>>>>>>>>> eventually
> > > >>>>>>>>>>>>>>> we will have some API which allows other expressions as
> > > >>>>>>>>>>>>>>> additional
> > > >>>>>>>>>>>>>>> parameters, but I think it's better to do that after we
> > > >>>>>>> introduce
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>>>> concept of nested tables. A lot of things we suggested
> > here
> > > >> can
> > > >>>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>> considered as special cases of that. But things are
> much
> > > >>>>> simpler
> > > >>>>>>>>>>>>>>> if
> > > >>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>> leave that to later.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>> Xiaowei
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <
> > > >>>>> fhue...@gmail.com
> > > >>>>>>>>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> * Re emit:
> > > >>>>>>>>>>>>>>>> I think we should start with a well understood
> semantics
> > > of
> > > >>>>>>> full
> > > >>>>>>>>>>>>>>>> replacement. This is how the other agg functions work.
> > > >>>>>>>>>>>>>>>> As was said before, there are open questions regarding
> > an
> > > >>>>>>> append
> > > >>>>>>>>>>>> mode
> > > >>>>>>>>>>>>>>>> (checkpointing, whether supporting retractions or not
> > and
> > > if
> > > >>>>>>> yes
> > > >>>>>>>>>>>> how
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>> declare them, ...).
> > > >>>>>>>>>>>>>>>> Since this seems to be an optimization, I'd postpone
> it.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> * Re grouping keys:
> > > >>>>>>>>>>>>>>>> I don't think we should automatically add them because
> > the
> > > >>>>>>>>>>>>>>>> result
> > > >>>>>>>>>>>>>> schema
> > > >>>>>>>>>>>>>>>> would not be intuitive.
> > > >>>>>>>>>>>>>>>> Would they be added at the beginning of the tuple or
> at
> > > the
> > > >>>>>>> end?
> > > >>>>>>>>>>>> What
> > > >>>>>>>>>>>>>>>> metadata fields of windows would be added? In which
> > order
> > > >>>>> would
> > > >>>>>>>>>>>> they
> > > >>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>> added?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> However, we could support syntax like this:
> > > >>>>>>>>>>>>>>>> val t: Table = ???
> > > >>>>>>>>>>>>>>>> t
> > > >>>>>>>>>>>>>>>> .window(Tumble ... as 'w)
> > > >>>>>>>>>>>>>>>> .groupBy('a, 'b)
> > > >>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend,
> > > 'w.rowtime
> > > >>>>>>> as
> > > >>>>>>>>>>>>>> 'rtime)
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> The result schema would be clearly defined as [b, a,
> f1,
> > > f2,
> > > >>>>>>>>>>>>>>>> ...,
> > > >>>>>>>>>>>> fn,
> > > >>>>>>>>>>>>>>> wend,
> > > >>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of
> the
> > > >> UDF.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> * Re Multi-staged evaluation:
> > > >>>>>>>>>>>>>>>> I think this should be an optimization that can be
> > applied
> > > >> if
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>> UDF
> > > >>>>>>>>>>>>>>>> implements the merge() method.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Best, Fabian
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan
> Wang
> > <
> > > >>>>>>>>>>>>>>>> wshaox...@gmail.com
> > > >>>>>>>>>>>>>>>>> :
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hi xiaowei,
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Yes, I agree with you that the semantics of
> > > >>>>>>>>>>>> TableAggregateFunction
> > > >>>>>>>>>>>>>> emit
> > > >>>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>> much more complex than AggregateFunction. The
> > fundamental
> > > >>>>>>>>>>>>> difference
> > > >>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>> that TableAggregateFunction emits a "table" while
> > > >>>>>>>>>>>> AggregateFunction
> > > >>>>>>>>>>>>>>>> outputs
> > > >>>>>>>>>>>>>>>>> (a column of) a "row". In the case of
> AggregateFunction
> > > it
> > > >>>>>>> only
> > > >>>>>>>>>>>> has
> > > >>>>>>>>>>>>>> one
> > > >>>>>>>>>>>>>>>>> mode which is “replacing” (complete update). But for
> > > >>>>>>>>>>>>>>>>> TableAggregateFunction, it could be incremental (only
> > > emit
> > > >>>>> the
> > > >>>>>>>>>>>> new
> > > >>>>>>>>>>>>>>>> updated
> > > >>>>>>>>>>>>>>>>> results) update or complete update (always emit the
> > > entire
> > > >>>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>> when
> > > >>>>>>>>>>>>>>>>> “emit" is triggered).  From the performance
> > perspective,
> > > we
> > > >>>>>>>>>>>>>>>>> might
> > > >>>>>>>>>>>>>> want
> > > >>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>> use incremental update. But we need review and design
> > > this
> > > >>>>>>>>>>>>> carefully,
> > > >>>>>>>>>>>>>>>>> especially taking into account the cases of the
> > failover
> > > >>>>>>>>>>>>>>>>> (instead
> > > >>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>> just
> > > >>>>>>>>>>>>>>>>> back-up the ACC it may also needs to remember the
> emit
> > > >>>>> offset)
> > > >>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>> retractions, as the semantics of
> TableAggregateFunction
> > > >> emit
> > > >>>>>>>>>>>>>>>>> are
> > > >>>>>>>>>>>>>>>> different
> > > >>>>>>>>>>>>>>>>> than other UDFs. TableFunction also emits a table,
> but
> > it
> > > >>>>> does
> > > >>>>>>>>>>>> not
> > > >>>>>>>>>>>>>> need
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>> worry this due to the nature of stateless.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>>>> Shaoxuan
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang
> > > >>>>>>>>>>>>>>>>> <xiaow...@gmail.com
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Hi Jincheng,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Thanks for adding the public interfaces! I think
> that
> > > >> it's a
> > > >>>>>>>>>>>> very
> > > >>>>>>>>>>>>>>> good
> > > >>>>>>>>>>>>>>>>>> start. There are a few points that we need to have
> > more
> > > >>>>>>>>>>>>>> discussions.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> - TableAggregateFunction - this is a very complex
> > beast,
> > > >>>>>>>>>>>>>>> definitely
> > > >>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> most complex user defined objects we introduced so
> > far.
> > > I
> > > >>>>>>>>>>>>> think
> > > >>>>>>>>>>>>>>>> there
> > > >>>>>>>>>>>>>>>>>> are
> > > >>>>>>>>>>>>>>>>>> quite some interesting questions here. For example,
> do
> > > we
> > > >>>>>>>>>>>>> allow
> > > >>>>>>>>>>>>>>>>>> multi-staged TableAggregate in this case? What is
> the
> > > >>>>>>>>>>>>> semantics
> > > >>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>> emit? Is
> > > >>>>>>>>>>>>>>>>>> it amendments to the previous output, or replacing
> > it? I
> > > >>>>>>>>>>>> think
> > > >>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>>> subject itself is worth a discussion to make sure we
> > get
> > > >>>>>>> the
> > > >>>>>>>>>>>>>>> details
> > > >>>>>>>>>>>>>>>>>> right.
> > > >>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the group keys
> automatically
> > > >>>>>>>>>>>> appear
> > > >>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> output? how about the case of windowing aggregation?
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>>>>> Xiaowei
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
> > > >>>>>>>>>>>>>>> sunjincheng...@gmail.com>
> > > >>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Hi, Xiaowei,
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss of Table API
> > > Enhancement
> > > >>>>>>>>>>>>> Outline
> > > >>>>>>>>>>>>>> !
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I quickly looked at the overall content, these are
> > good
> > > >>>>>>>>>>>>>> expressions
> > > >>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>> our
> > > >>>>>>>>>>>>>>>>>>> offline discussions. But from the points of my
> view,
> > we
> > > >>>>>>>>>>>> should
> > > >>>>>>>>>>>>>> add
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> usage of public interfaces that we will introduce
> in
> > > this
> > > >>>>>>>>>>>>>> propose.
> > > >>>>>>>>>>>>>>>>> So, I
> > > >>>>>>>>>>>>>>>>>>> added the following usage description of  interface
> > and
> > > >>>>>>>>>>>>> operators
> > > >>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>> google doc:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 1. Map Operator
> > > >>>>>>>>>>>>>>>>>>> Map operator is a new operator of Table, Map
> operator
> > > >> can
> > > >>>>>>>>>>>>>>> apply a
> > > >>>>>>>>>>>>>>>>>>> scalar function, and can return multi-column. The
> > usage
> > > >> as
> > > >>>>>>>>>>>>>> follows:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > >>>>>>>>>>>>>>>>>>>  .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
> > > >>>>>>>>>>>>>>>>>>>  .select(‘a, ‘c)
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 2. FlatMap Operator
> > > >>>>>>>>>>>>>>>>>>> FaltMap operator is a new operator of Table,
> FlatMap
> > > >>>>>>>>>>>>> operator
> > > >>>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>> apply
> > > >>>>>>>>>>>>>>>>>>> a table function, and can return multi-row. The
> usage
> > > as
> > > >>>>>>>>>>>>> follows:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > >>>>>>>>>>>>>>>>>>>   .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
> > > >>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 3. Agg Operator
> > > >>>>>>>>>>>>>>>>>>> Agg operator is a new operator of
> Table/GroupedTable,
> > > >> Agg
> > > >>>>>>>>>>>>>>>> operator
> > > >>>>>>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>>> apply a aggregate function, and can return
> > > multi-column.
> > > >>>>> The
> > > >>>>>>>>>>>>>> usage
> > > >>>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>>>> follows:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > >>>>>>>>>>>>>>>>>>>   .groupBy(‘a) // leave groupBy-Clause out to
> define
> > > >>>>>>>>>>>> global
> > > >>>>>>>>>>>>>>>>>> aggregates
> > > >>>>>>>>>>>>>>>>>>>   .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
> > > >>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 4.  FlatAgg Operator
> > > >>>>>>>>>>>>>>>>>>> FlatAgg operator is a new operator of
> > > >> Table/GroupedTable,
> > > >>>>>>>>>>>>>>> FaltAgg
> > > >>>>>>>>>>>>>>>>>>> operator can apply a table aggregate function, and
> > can
> > > >>>>>>> return
> > > >>>>>>>>>>>>>>>>> multi-row.
> > > >>>>>>>>>>>>>>>>>>> The usage as follows:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > >>>>>>>>>>>>>>>>>>>    .groupBy(‘a) // leave groupBy-Clause out to
> define
> > > >>>>>>>>>>>>> global
> > > >>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>> aggregates
> > > >>>>>>>>>>>>>>>>>>>    .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b,
> > ‘c)
> > > >>>>>>>>>>>>>>>>>>>    .select(‘a, ‘c)
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction
> > > >>>>>>>>>>>>>>>>>>>  The behavior of table aggregates is most like
> > > >>>>>>>>>>>>>>>> GroupReduceFunction
> > > >>>>>>>>>>>>>>>>>> did,
> > > >>>>>>>>>>>>>>>>>>> which computed for a group of elements, and
> output  a
> > > >> group
> > > >>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>> elements.
> > > >>>>>>>>>>>>>>>>>>> The TableAggregateFunction can be applied on
> > > >>>>>>>>>>>>>>> GroupedTable.flatAgg() .
> > > >>>>>>>>>>>>>>>>> The
> > > >>>>>>>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of
> > > content,
> > > >>>>> so
> > > >>>>>>>>>>>> I
> > > >>>>>>>>>>>>>>> don't
> > > >>>>>>>>>>>>>>>>> copy
> > > >>>>>>>>>>>>>>>>>>> it here, Please look at the detail in google doc:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing
> and
> > > >>>>>>>>>>>>> commenting.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>> Jincheng
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> --
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>
> > >
> >
> -----------------------------------------------------------------------------------
> > > >>>>>>>
> > > >>>>>>> *Rome was not built in one day*
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>
> > >
> >
> -----------------------------------------------------------------------------------
> > > >>>>>>>
> > > >>>>>
> > > >>>>>
> > > >>
> > > >>
> > >
> > >
> > >
> >
>

Reply via email to