Hi,

Isn’t the problem of multiple expressions limited only to `flat***` functions 
and to be more specific only to having two (or more) different table functions 
passed as an expressions? `.flatAgg(TableAggA('a), scalarFunction1(‘b), 
scalarFunction2(‘c))` seems to be well defined (duplicate result of every 
scalar function to every record. Or am I missing something?

Another remark, I would be in favour of not using abbreviations and naming 
`agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.

Piotrek

> On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Jincheng,
> 
> I said before, that I think that the append() method is better than
> implicitly forwarding keys, but still, I believe it adds unnecessary boiler
> plate code.
> 
> Moreover, I haven't seen a convincing argument why map(Expression*) is
> worse than map(Expression). In either case we need to do all kinds of
> checks to prevent invalid use of functions.
> If the method is not correctly used, we can emit a good error message and
> documenting map(Expression*) will be easier than map(append(Expression*)),
> in my opinion.
> I think we should not add unnessary syntax unless there is a good reason
> and to be honest, I haven't seen this reason yet.
> 
> Regarding the groupBy.agg() method, I think it should behave just like any
> other method, i.e., not do any implicit forwarding.
> Let's take the example of the windowed group by, that you posted before.
> 
> tab.window(Tumble ... as 'w)
>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>    .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>    .select('k1, 'col1, 'w.rowtime as 'rtime)
> 
> What happens if 'w.rowtime is not selected? What is the data type of the
> field 'w in the resulting Table? Is it a regular field at all or just a
> system field that disappears if it is not selected?
> 
> IMO, the following syntax is shorter, more explicit, and better aligned
> with the regular window.groupBy.select aggregations that are supported
> today.
> 
> tab.window(Tumble ... as 'w)
>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>    .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
> 
> 
> Best, Fabian
> 
> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
> sunjincheng...@gmail.com>:
> 
>> Hi Fabian/Xiaowei,
>> 
>> I am very sorry for my late reply! Glad to see your reply, and sounds
>> pretty good!
>> I agree that the approach with append() which can clearly defined the
>> result schema is better which Fabian mentioned.
>> In addition and append() and also contains non-time attributes, e.g.:
>> 
>>    tab('name, 'age, 'address, 'rowtime)
>>    tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
>> 'address, 'rowtime)
>>    .window(Tumble over 5.millis on 'rowtime as 'w)
>>    .groupBy('w, 'address)
>> 
>> In this way the append() is very useful, and the behavior is very similar
>> to withForwardedFields() in DataSet.
>> So +1 to using append() approach for the map()&flatmap()!
>> 
>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree
>> Xiaowei's approach that define the keys to be implied in the result table
>> and appears at the beginning, for example as follows:
>>  tab.window(Tumble ... as 'w)
>>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>    .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>    .select('k1, 'col1, 'w.rowtime as 'rtime)
>> 
>> What to you think? @Fabian @Xiaowei
>> 
>> Thanks,
>> Jincheng
>> 
>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道:
>> 
>>> Hi Jincheng,
>>> 
>>> Thanks for the summary!
>>> I like the approach with append() better than the implicit forwarding as
>> it
>>> clearly indicates which fields are forwarded.
>>> However, I don't see much benefit over the flatMap(Expression*) variant,
>> as
>>> we would still need to analyze the full expression tree to ensure that at
>>> most (or exactly?) one Scalar / TableFunction is used.
>>> 
>>> Best,
>>> Fabian
>>> 
>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
>>> sunjincheng...@gmail.com>:
>>> 
>>>> Hi all,
>>>> 
>>>> We are discussing very detailed content about this proposal. We are
>>> trying
>>>> to design the API in many aspects (functionality, compatibility, ease
>> of
>>>> use, etc.). I think this is a very good process. Only such a detailed
>>>> discussion, In order to develop PR more clearly and smoothly in the
>> later
>>>> stage. I am very grateful to @Fabian and  @Xiaowei for sharing a lot of
>>>> good ideas.
>>>> About the definition of method signatures I want to share my points
>> here
>>>> which I am discussing with fabian in google doc (not yet completed), as
>>>> follows:
>>>> 
>>>> Assume we have a table:
>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string,
>>>> 'proctime.proctime)
>>>> 
>>>> Approach 1:
>>>> case1: Map follows Source Table
>>>> val result =
>>>>  tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied
>> in
>>>> the output
>>>>  .window(Tumble over 5.millis on 'proctime as 'w)
>>>> 
>>>> case2: FatAgg follows Window (Fabian mentioned above)
>>>> val result =
>>>>    tab.window(Tumble ... as 'w)
>>>>       .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>       .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
>>>>       .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>> 
>>>> Approach 2: Similar to Fabian‘s approach, which the result schema would
>>> be
>>>> clearly defined, but add a built-in append UDF. That make
>>>> map/flatmap/agg/flatAgg interface only accept one Expression.
>>>> val result =
>>>>    tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2,
>>>> 'long, 'proctime)
>>>>     .window(Tumble over 5.millis on 'proctime as 'w)
>>>> 
>>>> Note: Append is a special UDF for built-in that can pass through any
>>>> column.
>>>> 
>>>> So, May be we can defined the as  table.map(Expression)  first, If
>>>> necessary, we can extend to table.map(Expression*)  in the future ?  Of
>>>> course, I also hope that we can do more perfection in this proposal
>>> through
>>>> discussion.
>>>> 
>>>> Thanks,
>>>> Jincheng
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道:
>>>> 
>>>>> Hi Fabian,
>>>>> 
>>>>> I think that the key question you raised is if we allow extra
>>> parameters
>>>> in
>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that may
>>>> appear
>>>>> more convenient in some cases. However, it might also cause some
>>>> confusions
>>>>> if we do that. For example, do we allow multiple UDFs in these
>>>> expressions?
>>>>> If we do, the semantics may be weird to define, e.g. what does
>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even
>>> though
>>>>> not allowing it may appear less powerful, but it can make things more
>>>>> intuitive too. In the case of agg/flatAgg, we can define the keys to
>> be
>>>>> implied in the result table and appears at the beginning. You can
>> use a
>>>>> select method if you want to modify this behavior. I think that
>>>> eventually
>>>>> we will have some API which allows other expressions as additional
>>>>> parameters, but I think it's better to do that after we introduce the
>>>>> concept of nested tables. A lot of things we suggested here can be
>>>>> considered as special cases of that. But things are much simpler if
>> we
>>>>> leave that to later.
>>>>> 
>>>>> Regards,
>>>>> Xiaowei
>>>>> 
>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> * Re emit:
>>>>>> I think we should start with a well understood semantics of full
>>>>>> replacement. This is how the other agg functions work.
>>>>>> As was said before, there are open questions regarding an append
>> mode
>>>>>> (checkpointing, whether supporting retractions or not and if yes
>> how
>>> to
>>>>>> declare them, ...).
>>>>>> Since this seems to be an optimization, I'd postpone it.
>>>>>> 
>>>>>> * Re grouping keys:
>>>>>> I don't think we should automatically add them because the result
>>>> schema
>>>>>> would not be intuitive.
>>>>>> Would they be added at the beginning of the tuple or at the end?
>> What
>>>>>> metadata fields of windows would be added? In which order would
>> they
>>> be
>>>>>> added?
>>>>>> 
>>>>>> However, we could support syntax like this:
>>>>>> val t: Table = ???
>>>>>> t
>>>>>>  .window(Tumble ... as 'w)
>>>>>>  .groupBy('a, 'b)
>>>>>>  .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as
>>>> 'rtime)
>>>>>> 
>>>>>> The result schema would be clearly defined as [b, a, f1, f2, ...,
>> fn,
>>>>> wend,
>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
>>>>>> 
>>>>>> * Re Multi-staged evaluation:
>>>>>> I think this should be an optimization that can be applied if the
>> UDF
>>>>>> implements the merge() method.
>>>>>> 
>>>>>> Best, Fabian
>>>>>> 
>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
>>>>>> wshaox...@gmail.com
>>>>>>> :
>>>>>> 
>>>>>>> Hi xiaowei,
>>>>>>> 
>>>>>>> Yes, I agree with you that the semantics of
>> TableAggregateFunction
>>>> emit
>>>>>> is
>>>>>>> much more complex than AggregateFunction. The fundamental
>>> difference
>>>> is
>>>>>>> that TableAggregateFunction emits a "table" while
>> AggregateFunction
>>>>>> outputs
>>>>>>> (a column of) a "row". In the case of AggregateFunction it only
>> has
>>>> one
>>>>>>> mode which is “replacing” (complete update). But for
>>>>>>> TableAggregateFunction, it could be incremental (only emit the
>> new
>>>>>> updated
>>>>>>> results) update or complete update (always emit the entire table
>>> when
>>>>>>> “emit" is triggered).  From the performance perspective, we might
>>>> want
>>>>> to
>>>>>>> use incremental update. But we need review and design this
>>> carefully,
>>>>>>> especially taking into account the cases of the failover (instead
>>> of
>>>>> just
>>>>>>> back-up the ACC it may also needs to remember the emit offset)
>> and
>>>>>>> retractions, as the semantics of TableAggregateFunction emit are
>>>>>> different
>>>>>>> than other UDFs. TableFunction also emits a table, but it does
>> not
>>>> need
>>>>>> to
>>>>>>> worry this due to the nature of stateless.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Shaoxuan
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <xiaow...@gmail.com
>>> 
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Jincheng,
>>>>>>>> 
>>>>>>>> Thanks for adding the public interfaces! I think that it's a
>> very
>>>>> good
>>>>>>>> start. There are a few points that we need to have more
>>>> discussions.
>>>>>>>> 
>>>>>>>>   - TableAggregateFunction - this is a very complex beast,
>>>>> definitely
>>>>>>> the
>>>>>>>>   most complex user defined objects we introduced so far. I
>>> think
>>>>>> there
>>>>>>>> are
>>>>>>>>   quite some interesting questions here. For example, do we
>>> allow
>>>>>>>>   multi-staged TableAggregate in this case? What is the
>>> semantics
>>>> of
>>>>>>>> emit? Is
>>>>>>>>   it amendments to the previous output, or replacing it? I
>> think
>>>>> that
>>>>>>> this
>>>>>>>>   subject itself is worth a discussion to make sure we get the
>>>>> details
>>>>>>>> right.
>>>>>>>>   - GroupedTable.agg - does the group keys automatically
>> appear
>>> in
>>>>> the
>>>>>>>>   output? how about the case of windowing aggregation?
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Xiaowei
>>>>>>>> 
>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
>>>>> sunjincheng...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi, Xiaowei,
>>>>>>>>> 
>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement
>>> Outline
>>>> !
>>>>>>>>> 
>>>>>>>>> I quickly looked at the overall content, these are good
>>>> expressions
>>>>>> of
>>>>>>>> our
>>>>>>>>> offline discussions. But from the points of my view, we
>> should
>>>> add
>>>>>> the
>>>>>>>>> usage of public interfaces that we will introduce in this
>>>> propose.
>>>>>>> So, I
>>>>>>>>> added the following usage description of  interface and
>>> operators
>>>>> in
>>>>>>>>> google doc:
>>>>>>>>> 
>>>>>>>>> 1. Map Operator
>>>>>>>>>    Map operator is a new operator of Table, Map operator can
>>>>> apply a
>>>>>>>>> scalar function, and can return multi-column. The usage as
>>>> follows:
>>>>>>>>> 
>>>>>>>>>  val res = tab
>>>>>>>>>     .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>     .select(‘a, ‘c)
>>>>>>>>> 
>>>>>>>>> 2. FlatMap Operator
>>>>>>>>>    FaltMap operator is a new operator of Table, FlatMap
>>> operator
>>>>> can
>>>>>>>> apply
>>>>>>>>> a table function, and can return multi-row. The usage as
>>> follows:
>>>>>>>>> 
>>>>>>>>>  val res = tab
>>>>>>>>>      .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>      .select(‘a, ‘c)
>>>>>>>>> 
>>>>>>>>> 3. Agg Operator
>>>>>>>>>    Agg operator is a new operator of Table/GroupedTable, Agg
>>>>>> operator
>>>>>>>> can
>>>>>>>>> apply a aggregate function, and can return multi-column. The
>>>> usage
>>>>> as
>>>>>>>>> follows:
>>>>>>>>> 
>>>>>>>>>   val res = tab
>>>>>>>>>      .groupBy(‘a) // leave groupBy-Clause out to define
>> global
>>>>>>>> aggregates
>>>>>>>>>      .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>      .select(‘a, ‘c)
>>>>>>>>> 
>>>>>>>>> 4.  FlatAgg Operator
>>>>>>>>>    FlatAgg operator is a new operator of Table/GroupedTable,
>>>>> FaltAgg
>>>>>>>>> operator can apply a table aggregate function, and can return
>>>>>>> multi-row.
>>>>>>>>> The usage as follows:
>>>>>>>>> 
>>>>>>>>>    val res = tab
>>>>>>>>>       .groupBy(‘a) // leave groupBy-Clause out to define
>>> global
>>>>>> table
>>>>>>>>> aggregates
>>>>>>>>>       .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>       .select(‘a, ‘c)
>>>>>>>>> 
>>>>>>>>>  5. TableAggregateFunction
>>>>>>>>>     The behavior of table aggregates is most like
>>>>>> GroupReduceFunction
>>>>>>>> did,
>>>>>>>>> which computed for a group of elements, and output  a group
>> of
>>>>>>> elements.
>>>>>>>>> The TableAggregateFunction can be applied on
>>>>> GroupedTable.flatAgg() .
>>>>>>> The
>>>>>>>>> interface of TableAggregateFunction has a lot of content, so
>> I
>>>>> don't
>>>>>>> copy
>>>>>>>>> it here, Please look at the detail in google doc:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
>>>>>>>>> 
>>>>>>>>> I will be very appreciate to anyone for reviewing and
>>> commenting.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Jincheng
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to