Hi Fabian,

I think that the key question you raised is if we allow extra parameters in
the methods map/flatMap/agg/flatAgg. I can see why allowing that may appear
more convenient in some cases. However, it might also cause some confusions
if we do that. For example, do we allow multiple UDFs in these expressions?
If we do, the semantics may be weird to define, e.g. what does
table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even though
not allowing it may appear less powerful, but it can make things more
intuitive too. In the case of agg/flatAgg, we can define the keys to be
implied in the result table and appears at the beginning. You can use a
select method if you want to modify this behavior. I think that eventually
we will have some API which allows other expressions as additional
parameters, but I think it's better to do that after we introduce the
concept of nested tables. A lot of things we suggested here can be
considered as special cases of that. But things are much simpler if we
leave that to later.

Regards,
Xiaowei

On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> * Re emit:
> I think we should start with a well understood semantics of full
> replacement. This is how the other agg functions work.
> As was said before, there are open questions regarding an append mode
> (checkpointing, whether supporting retractions or not and if yes how to
> declare them, ...).
> Since this seems to be an optimization, I'd postpone it.
>
> * Re grouping keys:
> I don't think we should automatically add them because the result schema
> would not be intuitive.
> Would they be added at the beginning of the tuple or at the end? What
> metadata fields of windows would be added? In which order would they be
> added?
>
> However, we could support syntax like this:
> val t: Table = ???
> t
>   .window(Tumble ... as 'w)
>   .groupBy('a, 'b)
>   .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as 'rtime)
>
> The result schema would be clearly defined as [b, a, f1, f2, ..., fn, wend,
> rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
>
> * Re Multi-staged evaluation:
> I think this should be an optimization that can be applied if the UDF
> implements the merge() method.
>
> Best, Fabian
>
> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
> wshaox...@gmail.com
> >:
>
> > Hi xiaowei,
> >
> > Yes, I agree with you that the semantics of TableAggregateFunction emit
> is
> > much more complex than AggregateFunction. The fundamental difference is
> > that TableAggregateFunction emits a "table" while AggregateFunction
> outputs
> > (a column of) a "row". In the case of AggregateFunction it only has one
> > mode which is “replacing” (complete update). But for
> > TableAggregateFunction, it could be incremental (only emit the new
> updated
> > results) update or complete update (always emit the entire table when
> > “emit" is triggered).  From the performance perspective, we might want to
> > use incremental update. But we need review and design this carefully,
> > especially taking into account the cases of the failover (instead of just
> > back-up the ACC it may also needs to remember the emit offset) and
> > retractions, as the semantics of TableAggregateFunction emit are
> different
> > than other UDFs. TableFunction also emits a table, but it does not need
> to
> > worry this due to the nature of stateless.
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <xiaow...@gmail.com> wrote:
> >
> > > Hi Jincheng,
> > >
> > > Thanks for adding the public interfaces! I think that it's a very good
> > > start. There are a few points that we need to have more discussions.
> > >
> > >    - TableAggregateFunction - this is a very complex beast, definitely
> > the
> > >    most complex user defined objects we introduced so far. I think
> there
> > > are
> > >    quite some interesting questions here. For example, do we allow
> > >    multi-staged TableAggregate in this case? What is the semantics of
> > > emit? Is
> > >    it amendments to the previous output, or replacing it? I think that
> > this
> > >    subject itself is worth a discussion to make sure we get the details
> > > right.
> > >    - GroupedTable.agg - does the group keys automatically appear in the
> > >    output? how about the case of windowing aggregation?
> > >
> > > Regards,
> > > Xiaowei
> > >
> > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <sunjincheng...@gmail.com>
> > > wrote:
> > >
> > > > Hi, Xiaowei,
> > > >
> > > > Thanks for bring up the discuss of Table API Enhancement Outline !
> > > >
> > > > I quickly looked at the overall content, these are good expressions
> of
> > > our
> > > > offline discussions. But from the points of my view, we should add
> the
> > > > usage of public interfaces that we will introduce in this propose.
> > So, I
> > > > added the following usage description of  interface and operators  in
> > > > google doc:
> > > >
> > > > 1. Map Operator
> > > >     Map operator is a new operator of Table, Map operator can apply a
> > > > scalar function, and can return multi-column. The usage as follows:
> > > >
> > > >   val res = tab
> > > >      .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
> > > >      .select(‘a, ‘c)
> > > >
> > > > 2. FlatMap Operator
> > > >     FaltMap operator is a new operator of Table, FlatMap operator can
> > > apply
> > > > a table function, and can return multi-row. The usage as follows:
> > > >
> > > >   val res = tab
> > > >       .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
> > > >       .select(‘a, ‘c)
> > > >
> > > > 3. Agg Operator
> > > >     Agg operator is a new operator of Table/GroupedTable, Agg
> operator
> > > can
> > > > apply a aggregate function, and can return multi-column. The usage as
> > > > follows:
> > > >
> > > >    val res = tab
> > > >       .groupBy(‘a) // leave groupBy-Clause out to define global
> > > aggregates
> > > >       .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
> > > >       .select(‘a, ‘c)
> > > >
> > > > 4.  FlatAgg Operator
> > > >     FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg
> > > > operator can apply a table aggregate function, and can return
> > multi-row.
> > > > The usage as follows:
> > > >
> > > >     val res = tab
> > > >        .groupBy(‘a) // leave groupBy-Clause out to define global
> table
> > > > aggregates
> > > >        .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
> > > >        .select(‘a, ‘c)
> > > >
> > > >   5. TableAggregateFunction
> > > >      The behavior of table aggregates is most like
> GroupReduceFunction
> > > did,
> > > > which computed for a group of elements, and output  a group of
> > elements.
> > > > The TableAggregateFunction can be applied on GroupedTable.flatAgg() .
> > The
> > > > interface of TableAggregateFunction has a lot of content, so I don't
> > copy
> > > > it here, Please look at the detail in google doc:
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> > > >
> > > > I will be very appreciate to anyone for reviewing and commenting.
> > > >
> > > > Best,
> > > > Jincheng
> > > >
> > >
> >
>

Reply via email to