Hi Jincheng,

> #1) No,watermark solves the issue of the late event. Here, the performance
> problem is caused by the update emit mode. i.e.: When current calculation
> result is output, the previous calculation result needs to be retracted.

Hmm, yes I missed this. For time-windowed cases (some aggregate/flatAggregate 
cases) emitting only on watermark should solve the problem. For non time 
windowed cases it would reduce the amount of retractions, right? Or am I still 
missing something?

> #3)I still hope to keep the simplicity that select only support projected
> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b,
> flatmap('d)).

table.select(F(‘a).unnest(), ‘b, G(‘c).unnest())

Could be rejected during some validation phase. On the other hand:

table.select(F(‘a).unnest(), ‘b, scalarG(‘c))
or
table.flatMap(F(‘a), ‘b, scalarG(‘c))

Could work and be more or less a syntax sugar for cross apply.

Piotrek

> On 21 Nov 2018, at 12:16, jincheng sun <sunjincheng...@gmail.com> wrote:
> 
> Hi shaoxuan & Hequn,
> 
> Thanks for your suggestion,I'll file the JIRAs later.
> We can prepare PRs while continuing to move forward the ongoing discussion.
> 
> Regards,
> Jincheng
> 
> jincheng sun <sunjincheng...@gmail.com> 于2018年11月21日周三 下午7:07写道:
> 
>> Hi Piotrek,
>> Thanks for your feedback, and thanks for  share your thoughts!
>> 
>> #1) No,watermark solves the issue of the late event. Here, the performance
>> problem is caused by the update emit mode. i.e.: When current calculation
>> result is output, the previous calculation result needs to be retracted.
>> #2) As I mentioned above we should continue the discussion until we solve
>> the problems raised by Xiaowei and Fabian.
>> #3)I still hope to keep the simplicity that select only support projected
>> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b,
>> flatmap('d)).
>> 
>> Thanks,
>> Jincheng
>> 
>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午5:24写道:
>> 
>>> Hi,
>>> 
>>> 1.
>>> 
>>>> In fact, in addition to the design of APIs, there will be various
>>>> performance optimization details, such as: table Aggregate function
>>>> emitValue will generate multiple calculation results, in extreme cases,
>>>> each record will trigger a large number of retract messages, this will
>>> have
>>>> poor performance
>>> 
>>> Can this be solved/mitigated by emitting the results only on watermarks?
>>> I think that was the path that we decided to take both for Temporal Joins
>>> and upsert stream conversion. I know that this increases the latency and
>>> there is a place for a future global setting/user preference “emit the data
>>> ASAP mode”, but emitting only on watermarks seems to me as a better/more
>>> sane default.
>>> 
>>> 2.
>>> 
>>> With respect to the API discussion and implicit columns. The problem for
>>> me so far is I’m not sure if I like the additionally complexity of
>>> `append()` solution, while implicit columns are definitely not in the
>>> spirit of SQL. Neither joins nor aggregations add extra unexpected columns
>>> to the result without asking. This definitely can be confusing for the
>>> users since it brakes the convention. Thus I would lean towards Fabian’s
>>> proposal of multi-argument `map(Expression*)` from those 3 options.
>>> 
>>> 3.
>>> 
>>> Another topic is that I’m not 100% convinced that we should be adding new
>>> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think
>>> the same could be achieved by changing
>>> 
>>> table.map(F('x))
>>> 
>>> into
>>> 
>>> table.select(F('x)).unnest()
>>> or
>>> table.select(F('x).unnest())
>>> 
>>> Where `unnest()` means unnest row/tuple type into a columnar table.
>>> 
>>> table.flatMap(F('x))
>>> 
>>> Could be on the other hand also handled by
>>> 
>>> table.select(F('x))
>>> 
>>> By correctly deducing that F(x) is a multi row output function
>>> 
>>> Same might apply to `aggregate(F('x))`, but this maybe could be replaced
>>> by:
>>> 
>>> table.groupBy(…).select(F('x).unnest())
>>> 
>>> Adding scalar functions should also be possible:
>>> 
>>> table.groupBy('k).select(F('x).unnest(), ‘k)
>>> 
>>> Maybe such approach would allow us to implement the same features in the
>>> SQL as well?
>>> 
>>> Piotrek
>>> 
>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <chenghe...@gmail.com> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Thank you all for the great proposal and discussion!
>>>> I also prefer to move on to the next step, so +1 for opening the JIRAs
>>> to
>>>> start the work.
>>>> We can have more detailed discussion there. Btw, we can start with JIRAs
>>>> which we have agreed on.
>>>> 
>>>> Best,
>>>> Hequn
>>>> 
>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <wshaox...@gmail.com>
>>> wrote:
>>>> 
>>>>> +1. I agree that we should open the JIRAs to start the work. We may
>>>>> have better ideas on the flavor of the interface when implement/review
>>>>> the code.
>>>>> 
>>>>> Regards,
>>>>> shaoxuan
>>>>> 
>>>>> 
>>>>> On 11/20/18, jincheng sun <sunjincheng...@gmail.com> wrote:
>>>>>> Hi all,
>>>>>> 
>>>>>> Thanks all for the feedback.
>>>>>> 
>>>>>> @Piotr About not using abbreviations naming,  +1,I like
>>>>>> your proposal!Currently both DataSet and DataStream API are using
>>>>>> `aggregate`,
>>>>>> BTW,I find other language also not using abbreviations naming,such as
>>> R.
>>>>>> 
>>>>>> Sometimes the interface of the API is really difficult to perfect, we
>>>>> need
>>>>>> to spend a lot of time thinking and feedback from a large number of
>>>>> users,
>>>>>> and constantly improve, but for backward compatibility issues, we
>>> have to
>>>>>> adopt the most conservative approach when designing the API(Of
>>> course, I
>>>>> am
>>>>>> more in favor of developing more rich features, when we discuss
>>> clearly).
>>>>>> Therefore, I propose to divide the function implementation of
>>>>>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that
>>>>>> support time attributes and groupKeys. We can develop the features
>>> which
>>>>>> we  have already agreed on the design. And we will continue to discuss
>>>>> the
>>>>>> uncertain design.
>>>>>> 
>>>>>> In fact, in addition to the design of APIs, there will be various
>>>>>> performance optimization details, such as: table Aggregate function
>>>>>> emitValue will generate multiple calculation results, in extreme
>>> cases,
>>>>>> each record will trigger a large number of retract messages, this will
>>>>> have
>>>>>> poor performance,so we will also optimize the interface design, such
>>> as
>>>>>> adding the emitWithRetractValue interface (I have updated the google
>>> doc)
>>>>>> to allow the user to optionally perform incremental calculations, thus
>>>>>> avoiding a large number of retracts. Details like this are difficult
>>> to
>>>>>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP
>>> first,
>>>>>> we develop designs that have been agreed upon and continue to discuss
>>>>>> non-deterministic designs!  What do you think? @Fabian & Piotr &
>>> XiaoWei
>>>>>> 
>>>>>> Best,
>>>>>> Jincheng
>>>>>> 
>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一 上午12:07写道:
>>>>>> 
>>>>>>> Hi Fabian & Piotr, thanks for the feedback!
>>>>>>> 
>>>>>>> I appreciate your concerns, both on timestamp attributes as well as
>>> on
>>>>>>> implicit group keys. At the same time, I'm also concerned with the
>>>>>>> proposed
>>>>>>> approach of allowing Expression* as parameters, especially for
>>>>>>> flatMap/flatAgg. So far, we never allowed a scalar expression to
>>> appear
>>>>>>> together with table expressions. With the Expression* approach, this
>>>>> will
>>>>>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on
>>> if
>>>>>>> we
>>>>>>> fully understand the consequences when we try to extend our system in
>>>>> the
>>>>>>> future. I would be extra cautious in doing this. To avoid this, I
>>> think
>>>>>>> an
>>>>>>> implicit group key for flatAgg is safer. For flatMap, if users want
>>> to
>>>>>>> keep
>>>>>>> the rowtime column, he can use crossApply/join instead. So we are not
>>>>>>> losing any real functionality here.
>>>>>>> 
>>>>>>> Also a clarification on the following example:
>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>   .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>   .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>   .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>> If we did not have the select clause in this example, we will have
>>> 'w as
>>>>>>> a
>>>>>>> regular column in the output. It should not magically disappear.
>>>>>>> 
>>>>>>> The concern is not as strong for Table.map/Table.agg because we are
>>> not
>>>>>>> mixing scalar and table expressions. But we also want to be a bit
>>>>>>> consistent with these methods. If we used implicit group keys for
>>>>>>> Table.flatAgg, we probably should do the same for Table.agg. Now we
>>> only
>>>>>>> have to choose what to do with Table.map. I can see good arguments
>>> from
>>>>>>> both sides. But starting with a single Expression seems safer because
>>>>>>> that
>>>>>>> we can always extend to Expression* in the future.
>>>>>>> 
>>>>>>> While thinking about this problem, it appears that we may need more
>>> work
>>>>>>> in
>>>>>>> our handling of watermarks for SQL/Table API. Our current way of
>>>>>>> propagating the watermarks from source all the way to sink might not
>>> be
>>>>>>> optimal. For example, after a tumbling window, the watermark can
>>>>> actually
>>>>>>> be advanced to just before the expiring of next window. I think that
>>> in
>>>>>>> general, each operator may need to generate new watermarks instead of
>>>>>>> simply propagating them. Once we accept that watermarks may change
>>>>> during
>>>>>>> the execution, it appears that the timestamp columns may also
>>> change, as
>>>>>>> long as we have some way to associate watermark with it. My
>>> intuition is
>>>>>>> that once we have a through solution for the watermark issue, we may
>>> be
>>>>>>> able to solve the problem we encountered for Table.map in a cleaner
>>> way.
>>>>>>> But this is a complex issue which deserves a discussion on its own.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Xiaowei
>>>>>>> 
>>>>>>> 
>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <
>>>>> pi...@data-artisans.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> Isn’t the problem of multiple expressions limited only to `flat***`
>>>>>>>> functions and to be more specific only to having two (or more)
>>>>>>>> different
>>>>>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a),
>>>>>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined
>>>>>>>> (duplicate result of every scalar function to every record. Or am I
>>>>>>> missing
>>>>>>>> something?
>>>>>>>> 
>>>>>>>> Another remark, I would be in favour of not using abbreviations and
>>>>>>> naming
>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.
>>>>>>>> 
>>>>>>>> Piotrek
>>>>>>>> 
>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Jincheng,
>>>>>>>>> 
>>>>>>>>> I said before, that I think that the append() method is better than
>>>>>>>>> implicitly forwarding keys, but still, I believe it adds
>>> unnecessary
>>>>>>>> boiler
>>>>>>>>> plate code.
>>>>>>>>> 
>>>>>>>>> Moreover, I haven't seen a convincing argument why map(Expression*)
>>>>>>>>> is
>>>>>>>>> worse than map(Expression). In either case we need to do all kinds
>>>>> of
>>>>>>>>> checks to prevent invalid use of functions.
>>>>>>>>> If the method is not correctly used, we can emit a good error
>>>>> message
>>>>>>> and
>>>>>>>>> documenting map(Expression*) will be easier than
>>>>>>>> map(append(Expression*)),
>>>>>>>>> in my opinion.
>>>>>>>>> I think we should not add unnessary syntax unless there is a good
>>>>>>> reason
>>>>>>>>> and to be honest, I haven't seen this reason yet.
>>>>>>>>> 
>>>>>>>>> Regarding the groupBy.agg() method, I think it should behave just
>>>>>>>>> like
>>>>>>>> any
>>>>>>>>> other method, i.e., not do any implicit forwarding.
>>>>>>>>> Let's take the example of the windowed group by, that you posted
>>>>>>> before.
>>>>>>>>> 
>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>  .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>  .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>>  .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>> 
>>>>>>>>> What happens if 'w.rowtime is not selected? What is the data type
>>> of
>>>>>>> the
>>>>>>>>> field 'w in the resulting Table? Is it a regular field at all or
>>>>> just
>>>>>>>>> a
>>>>>>>>> system field that disappears if it is not selected?
>>>>>>>>> 
>>>>>>>>> IMO, the following syntax is shorter, more explicit, and better
>>>>>>>>> aligned
>>>>>>>>> with the regular window.groupBy.select aggregations that are
>>>>>>>>> supported
>>>>>>>>> today.
>>>>>>>>> 
>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>  .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>  .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Best, Fabian
>>>>>>>>> 
>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
>>>>>>>>> sunjincheng...@gmail.com>:
>>>>>>>>> 
>>>>>>>>>> Hi Fabian/Xiaowei,
>>>>>>>>>> 
>>>>>>>>>> I am very sorry for my late reply! Glad to see your reply, and
>>>>>>>>>> sounds
>>>>>>>>>> pretty good!
>>>>>>>>>> I agree that the approach with append() which can clearly defined
>>>>>>>>>> the
>>>>>>>>>> result schema is better which Fabian mentioned.
>>>>>>>>>> In addition and append() and also contains non-time attributes,
>>>>>>>>>> e.g.:
>>>>>>>>>> 
>>>>>>>>>>  tab('name, 'age, 'address, 'rowtime)
>>>>>>>>>>  tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
>>>>>>>>>> 'address, 'rowtime)
>>>>>>>>>>  .window(Tumble over 5.millis on 'rowtime as 'w)
>>>>>>>>>>  .groupBy('w, 'address)
>>>>>>>>>> 
>>>>>>>>>> In this way the append() is very useful, and the behavior is very
>>>>>>>> similar
>>>>>>>>>> to withForwardedFields() in DataSet.
>>>>>>>>>> So +1 to using append() approach for the map()&flatmap()!
>>>>>>>>>> 
>>>>>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree
>>>>>>>>>> Xiaowei's approach that define the keys to be implied in the
>>> result
>>>>>>>> table
>>>>>>>>>> and appears at the beginning, for example as follows:
>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>  .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>>  .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>>>  .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>>> 
>>>>>>>>>> What to you think? @Fabian @Xiaowei
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Jincheng
>>>>>>>>>> 
>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道:
>>>>>>>>>> 
>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the summary!
>>>>>>>>>>> I like the approach with append() better than the implicit
>>>>>>>>>>> forwarding
>>>>>>>> as
>>>>>>>>>> it
>>>>>>>>>>> clearly indicates which fields are forwarded.
>>>>>>>>>>> However, I don't see much benefit over the flatMap(Expression*)
>>>>>>>> variant,
>>>>>>>>>> as
>>>>>>>>>>> we would still need to analyze the full expression tree to ensure
>>>>>>> that
>>>>>>>> at
>>>>>>>>>>> most (or exactly?) one Scalar / TableFunction is used.
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> Fabian
>>>>>>>>>>> 
>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
>>>>>>>>>>> sunjincheng...@gmail.com>:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>> 
>>>>>>>>>>>> We are discussing very detailed content about this proposal. We
>>>>>>>>>>>> are
>>>>>>>>>>> trying
>>>>>>>>>>>> to design the API in many aspects (functionality, compatibility,
>>>>>>> ease
>>>>>>>>>> of
>>>>>>>>>>>> use, etc.). I think this is a very good process. Only such a
>>>>>>> detailed
>>>>>>>>>>>> discussion, In order to develop PR more clearly and smoothly in
>>>>>>>>>>>> the
>>>>>>>>>> later
>>>>>>>>>>>> stage. I am very grateful to @Fabian and  @Xiaowei for sharing a
>>>>>>>>>>>> lot
>>>>>>>> of
>>>>>>>>>>>> good ideas.
>>>>>>>>>>>> About the definition of method signatures I want to share my
>>>>>>>>>>>> points
>>>>>>>>>> here
>>>>>>>>>>>> which I am discussing with fabian in google doc (not yet
>>>>>>>>>>>> completed),
>>>>>>>> as
>>>>>>>>>>>> follows:
>>>>>>>>>>>> 
>>>>>>>>>>>> Assume we have a table:
>>>>>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long,
>>>>> 'string,
>>>>>>>>>>>> 'proctime.proctime)
>>>>>>>>>>>> 
>>>>>>>>>>>> Approach 1:
>>>>>>>>>>>> case1: Map follows Source Table
>>>>>>>>>>>> val result =
>>>>>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime
>>>>>>> implied
>>>>>>>>>> in
>>>>>>>>>>>> the output
>>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w)
>>>>>>>>>>>> 
>>>>>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above)
>>>>>>>>>>>> val result =
>>>>>>>>>>>>  tab.window(Tumble ... as 'w)
>>>>>>>>>>>>     .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>>>>     .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
>>>>>>>>>>>>     .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>>>>> 
>>>>>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result
>>> schema
>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>>> clearly defined, but add a built-in append UDF. That make
>>>>>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression.
>>>>>>>>>>>> val result =
>>>>>>>>>>>>  tab.map(append(udf('string), 'long, 'proctime)) as ('col1,
>>>>>>>>>>>> 'col2,
>>>>>>>>>>>> 'long, 'proctime)
>>>>>>>>>>>>   .window(Tumble over 5.millis on 'proctime as 'w)
>>>>>>>>>>>> 
>>>>>>>>>>>> Note: Append is a special UDF for built-in that can pass through
>>>>>>>>>>>> any
>>>>>>>>>>>> column.
>>>>>>>>>>>> 
>>>>>>>>>>>> So, May be we can defined the as  table.map(Expression)  first,
>>>>> If
>>>>>>>>>>>> necessary, we can extend to table.map(Expression*)  in the
>>> future
>>>>>>>>>>>> ?
>>>>>>>> Of
>>>>>>>>>>>> course, I also hope that we can do more perfection in this
>>>>>>>>>>>> proposal
>>>>>>>>>>> through
>>>>>>>>>>>> discussion.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Jincheng
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I think that the key question you raised is if we allow extra
>>>>>>>>>>> parameters
>>>>>>>>>>>> in
>>>>>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing
>>> that
>>>>>>> may
>>>>>>>>>>>> appear
>>>>>>>>>>>>> more convenient in some cases. However, it might also cause
>>> some
>>>>>>>>>>>> confusions
>>>>>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these
>>>>>>>>>>>> expressions?
>>>>>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does
>>>>>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean?
>>>>>>>>>>>>> Even
>>>>>>>>>>> though
>>>>>>>>>>>>> not allowing it may appear less powerful, but it can make
>>> things
>>>>>>> more
>>>>>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the
>>>>> keys
>>>>>>> to
>>>>>>>>>> be
>>>>>>>>>>>>> implied in the result table and appears at the beginning. You
>>>>> can
>>>>>>>>>> use a
>>>>>>>>>>>>> select method if you want to modify this behavior. I think that
>>>>>>>>>>>> eventually
>>>>>>>>>>>>> we will have some API which allows other expressions as
>>>>>>>>>>>>> additional
>>>>>>>>>>>>> parameters, but I think it's better to do that after we
>>>>> introduce
>>>>>>> the
>>>>>>>>>>>>> concept of nested tables. A lot of things we suggested here can
>>>>>>>>>>>>> be
>>>>>>>>>>>>> considered as special cases of that. But things are much
>>> simpler
>>>>>>>>>>>>> if
>>>>>>>>>> we
>>>>>>>>>>>>> leave that to later.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <
>>> fhue...@gmail.com
>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> * Re emit:
>>>>>>>>>>>>>> I think we should start with a well understood semantics of
>>>>> full
>>>>>>>>>>>>>> replacement. This is how the other agg functions work.
>>>>>>>>>>>>>> As was said before, there are open questions regarding an
>>>>> append
>>>>>>>>>> mode
>>>>>>>>>>>>>> (checkpointing, whether supporting retractions or not and if
>>>>> yes
>>>>>>>>>> how
>>>>>>>>>>> to
>>>>>>>>>>>>>> declare them, ...).
>>>>>>>>>>>>>> Since this seems to be an optimization, I'd postpone it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> * Re grouping keys:
>>>>>>>>>>>>>> I don't think we should automatically add them because the
>>>>>>>>>>>>>> result
>>>>>>>>>>>> schema
>>>>>>>>>>>>>> would not be intuitive.
>>>>>>>>>>>>>> Would they be added at the beginning of the tuple or at the
>>>>> end?
>>>>>>>>>> What
>>>>>>>>>>>>>> metadata fields of windows would be added? In which order
>>> would
>>>>>>>>>> they
>>>>>>>>>>> be
>>>>>>>>>>>>>> added?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> However, we could support syntax like this:
>>>>>>>>>>>>>> val t: Table = ???
>>>>>>>>>>>>>> t
>>>>>>>>>>>>>> .window(Tumble ... as 'w)
>>>>>>>>>>>>>> .groupBy('a, 'b)
>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime
>>>>> as
>>>>>>>>>>>> 'rtime)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2,
>>>>>>>>>>>>>> ...,
>>>>>>>>>> fn,
>>>>>>>>>>>>> wend,
>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> * Re Multi-staged evaluation:
>>>>>>>>>>>>>> I think this should be an optimization that can be applied if
>>>>>>>>>>>>>> the
>>>>>>>>>> UDF
>>>>>>>>>>>>>> implements the merge() method.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
>>>>>>>>>>>>>> wshaox...@gmail.com
>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi xiaowei,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Yes, I agree with you that the semantics of
>>>>>>>>>> TableAggregateFunction
>>>>>>>>>>>> emit
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> much more complex than AggregateFunction. The fundamental
>>>>>>>>>>> difference
>>>>>>>>>>>> is
>>>>>>>>>>>>>>> that TableAggregateFunction emits a "table" while
>>>>>>>>>> AggregateFunction
>>>>>>>>>>>>>> outputs
>>>>>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it
>>>>> only
>>>>>>>>>> has
>>>>>>>>>>>> one
>>>>>>>>>>>>>>> mode which is “replacing” (complete update). But for
>>>>>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit
>>> the
>>>>>>>>>> new
>>>>>>>>>>>>>> updated
>>>>>>>>>>>>>>> results) update or complete update (always emit the entire
>>>>>>>>>>>>>>> table
>>>>>>>>>>> when
>>>>>>>>>>>>>>> “emit" is triggered).  From the performance perspective, we
>>>>>>>>>>>>>>> might
>>>>>>>>>>>> want
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> use incremental update. But we need review and design this
>>>>>>>>>>> carefully,
>>>>>>>>>>>>>>> especially taking into account the cases of the failover
>>>>>>>>>>>>>>> (instead
>>>>>>>>>>> of
>>>>>>>>>>>>> just
>>>>>>>>>>>>>>> back-up the ACC it may also needs to remember the emit
>>> offset)
>>>>>>>>>> and
>>>>>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit
>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it
>>> does
>>>>>>>>>> not
>>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> worry this due to the nature of stateless.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Shaoxuan
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang
>>>>>>>>>>>>>>> <xiaow...@gmail.com
>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a
>>>>>>>>>> very
>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>> start. There are a few points that we need to have more
>>>>>>>>>>>> discussions.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast,
>>>>>>>>>>>>> definitely
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> most complex user defined objects we introduced so far. I
>>>>>>>>>>> think
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> quite some interesting questions here. For example, do we
>>>>>>>>>>> allow
>>>>>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the
>>>>>>>>>>> semantics
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> emit? Is
>>>>>>>>>>>>>>>> it amendments to the previous output, or replacing it? I
>>>>>>>>>> think
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> subject itself is worth a discussion to make sure we get
>>>>> the
>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>> right.
>>>>>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically
>>>>>>>>>> appear
>>>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> output? how about the case of windowing aggregation?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
>>>>>>>>>>>>> sunjincheng...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi, Xiaowei,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement
>>>>>>>>>>> Outline
>>>>>>>>>>>> !
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I quickly looked at the overall content, these are good
>>>>>>>>>>>> expressions
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> our
>>>>>>>>>>>>>>>>> offline discussions. But from the points of my view, we
>>>>>>>>>> should
>>>>>>>>>>>> add
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> usage of public interfaces that we will introduce in this
>>>>>>>>>>>> propose.
>>>>>>>>>>>>>>> So, I
>>>>>>>>>>>>>>>>> added the following usage description of  interface and
>>>>>>>>>>> operators
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> google doc:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 1. Map Operator
>>>>>>>>>>>>>>>>>  Map operator is a new operator of Table, Map operator can
>>>>>>>>>>>>> apply a
>>>>>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as
>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>   .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 2. FlatMap Operator
>>>>>>>>>>>>>>>>>  FaltMap operator is a new operator of Table, FlatMap
>>>>>>>>>>> operator
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>>> a table function, and can return multi-row. The usage as
>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>    .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>>    .select(‘a, ‘c)
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 3. Agg Operator
>>>>>>>>>>>>>>>>>  Agg operator is a new operator of Table/GroupedTable, Agg
>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>> apply a aggregate function, and can return multi-column.
>>> The
>>>>>>>>>>>> usage
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>    .groupBy(‘a) // leave groupBy-Clause out to define
>>>>>>>>>> global
>>>>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>>>    .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>>    .select(‘a, ‘c)
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 4.  FlatAgg Operator
>>>>>>>>>>>>>>>>>  FlatAgg operator is a new operator of Table/GroupedTable,
>>>>>>>>>>>>> FaltAgg
>>>>>>>>>>>>>>>>> operator can apply a table aggregate function, and can
>>>>> return
>>>>>>>>>>>>>>> multi-row.
>>>>>>>>>>>>>>>>> The usage as follows:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>  val res = tab
>>>>>>>>>>>>>>>>>     .groupBy(‘a) // leave groupBy-Clause out to define
>>>>>>>>>>> global
>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>>>     .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>>     .select(‘a, ‘c)
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 5. TableAggregateFunction
>>>>>>>>>>>>>>>>>   The behavior of table aggregates is most like
>>>>>>>>>>>>>> GroupReduceFunction
>>>>>>>>>>>>>>>> did,
>>>>>>>>>>>>>>>>> which computed for a group of elements, and output  a group
>>>>>>>>>> of
>>>>>>>>>>>>>>> elements.
>>>>>>>>>>>>>>>>> The TableAggregateFunction can be applied on
>>>>>>>>>>>>> GroupedTable.flatAgg() .
>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content,
>>> so
>>>>>>>>>> I
>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>> copy
>>>>>>>>>>>>>>>>> it here, Please look at the detail in google doc:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and
>>>>>>>>>>> commenting.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> 
>>>>> 
>>> -----------------------------------------------------------------------------------
>>>>> 
>>>>> *Rome was not built in one day*
>>>>> 
>>>>> 
>>>>> 
>>> -----------------------------------------------------------------------------------
>>>>> 
>>> 
>>> 

Reply via email to