Hi Jincheng,

#1) ok, got it.

#3)
> From points of my view I we can using
> `Expression`, and after the discussion decided to use Expression*, then
> improve it. In any case, we can use Expression, and there is an opportunity
> to become Expression* (compatibility). If we use Expression* directly, it
> is difficult for us to become Expression, which will break the
> compatibility between versions.  What do you think?

I don’t think that’s the case here. If we start with single param 
`flatMap(Expression)`, it will need implicit columns to be present in the 
result, which:

a) IMO it brakes SQL convention (that’s why I’m against this)
b) we can not later easily introduce `flatMap(Expression*)` without those 
implicit columns, without braking the compatibility or at least without making 
`flatMap(Expression*)` and `flatMap(Expression)` terribly inconsistent.

To elaborate on (a). It’s not nice if our own API is inconsistent and it 
sometimes behaves one way and sometimes another way:

table.groupBy(‘k).select(scalarAggregateFunction(‘v)) => single column result, 
just the output of `scalarAggregateFunction`
vs
table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v)) => both result of 
`tableAggregateFunction` plus key (and an optional window context ?)

Thus I think we have to now decide which way we want to jump, since later will 
be too late. Or again, am I missing something? :)

Piotrek

> On 22 Nov 2018, at 02:07, jincheng sun <sunjincheng...@gmail.com> wrote:
> 
> Hi Piotrek,
> #1)We have unbounded and bounded group window aggregate, for unbounded case
> we should early fire the result with retract message, we can not using
> watermark, because unbounded aggregate never finished. (for improvement we
> can introduce micro-batch in feature),  for bounded window we never support
> early fire, so we do not need retract.
> #3)  About validation of `table.select(F(‘a).unnest(), ‘b,
> G(‘c).unnest())/table.flatMap(F(‘a), ‘b, scalarG(‘c))` Fabian had mentioned
> above, please look at the prior mail.  For `table.flatMap(F(‘a), ‘b,
> scalarG(‘c))` that we concerned, i.e.:  we should discuss the issue of
> `Expression*` vs `Expression`. From points of my view I we can using
> `Expression`, and after the discussion decided to use Expression*, then
> improve it. In any case, we can use Expression, and there is an opportunity
> to become Expression* (compatibility). If we use Expression* directly, it
> is difficult for us to become Expression, which will break the
> compatibility between versions.  What do you think?
> 
> If there anything not clearly, welcome any feedback!Agains,thanks for share
> your thoughts!
> 
> Thanks,
> Jincheng
> 
> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午9:37写道:
> 
>> Hi Jincheng,
>> 
>>> #1) No,watermark solves the issue of the late event. Here, the
>> performance
>>> problem is caused by the update emit mode. i.e.: When current calculation
>>> result is output, the previous calculation result needs to be retracted.
>> 
>> Hmm, yes I missed this. For time-windowed cases (some
>> aggregate/flatAggregate cases) emitting only on watermark should solve the
>> problem. For non time windowed cases it would reduce the amount of
>> retractions, right? Or am I still missing something?
>> 
>>> #3)I still hope to keep the simplicity that select only support projected
>>> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b,
>>> flatmap('d)).
>> 
>> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest())
>> 
>> Could be rejected during some validation phase. On the other hand:
>> 
>> table.select(F(‘a).unnest(), ‘b, scalarG(‘c))
>> or
>> table.flatMap(F(‘a), ‘b, scalarG(‘c))
>> 
>> Could work and be more or less a syntax sugar for cross apply.
>> 
>> Piotrek
>> 
>>> On 21 Nov 2018, at 12:16, jincheng sun <sunjincheng...@gmail.com> wrote:
>>> 
>>> Hi shaoxuan & Hequn,
>>> 
>>> Thanks for your suggestion,I'll file the JIRAs later.
>>> We can prepare PRs while continuing to move forward the ongoing
>> discussion.
>>> 
>>> Regards,
>>> Jincheng
>>> 
>>> jincheng sun <sunjincheng...@gmail.com> 于2018年11月21日周三 下午7:07写道:
>>> 
>>>> Hi Piotrek,
>>>> Thanks for your feedback, and thanks for  share your thoughts!
>>>> 
>>>> #1) No,watermark solves the issue of the late event. Here, the
>> performance
>>>> problem is caused by the update emit mode. i.e.: When current
>> calculation
>>>> result is output, the previous calculation result needs to be retracted.
>>>> #2) As I mentioned above we should continue the discussion until we
>> solve
>>>> the problems raised by Xiaowei and Fabian.
>>>> #3)I still hope to keep the simplicity that select only support
>> projected
>>>> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b,
>>>> flatmap('d)).
>>>> 
>>>> Thanks,
>>>> Jincheng
>>>> 
>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月21日周三 下午5:24写道:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> 1.
>>>>> 
>>>>>> In fact, in addition to the design of APIs, there will be various
>>>>>> performance optimization details, such as: table Aggregate function
>>>>>> emitValue will generate multiple calculation results, in extreme
>> cases,
>>>>>> each record will trigger a large number of retract messages, this will
>>>>> have
>>>>>> poor performance
>>>>> 
>>>>> Can this be solved/mitigated by emitting the results only on
>> watermarks?
>>>>> I think that was the path that we decided to take both for Temporal
>> Joins
>>>>> and upsert stream conversion. I know that this increases the latency
>> and
>>>>> there is a place for a future global setting/user preference “emit the
>> data
>>>>> ASAP mode”, but emitting only on watermarks seems to me as a
>> better/more
>>>>> sane default.
>>>>> 
>>>>> 2.
>>>>> 
>>>>> With respect to the API discussion and implicit columns. The problem
>> for
>>>>> me so far is I’m not sure if I like the additionally complexity of
>>>>> `append()` solution, while implicit columns are definitely not in the
>>>>> spirit of SQL. Neither joins nor aggregations add extra unexpected
>> columns
>>>>> to the result without asking. This definitely can be confusing for the
>>>>> users since it brakes the convention. Thus I would lean towards
>> Fabian’s
>>>>> proposal of multi-argument `map(Expression*)` from those 3 options.
>>>>> 
>>>>> 3.
>>>>> 
>>>>> Another topic is that I’m not 100% convinced that we should be adding
>> new
>>>>> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I
>> think
>>>>> the same could be achieved by changing
>>>>> 
>>>>> table.map(F('x))
>>>>> 
>>>>> into
>>>>> 
>>>>> table.select(F('x)).unnest()
>>>>> or
>>>>> table.select(F('x).unnest())
>>>>> 
>>>>> Where `unnest()` means unnest row/tuple type into a columnar table.
>>>>> 
>>>>> table.flatMap(F('x))
>>>>> 
>>>>> Could be on the other hand also handled by
>>>>> 
>>>>> table.select(F('x))
>>>>> 
>>>>> By correctly deducing that F(x) is a multi row output function
>>>>> 
>>>>> Same might apply to `aggregate(F('x))`, but this maybe could be
>> replaced
>>>>> by:
>>>>> 
>>>>> table.groupBy(…).select(F('x).unnest())
>>>>> 
>>>>> Adding scalar functions should also be possible:
>>>>> 
>>>>> table.groupBy('k).select(F('x).unnest(), ‘k)
>>>>> 
>>>>> Maybe such approach would allow us to implement the same features in
>> the
>>>>> SQL as well?
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <chenghe...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Thank you all for the great proposal and discussion!
>>>>>> I also prefer to move on to the next step, so +1 for opening the JIRAs
>>>>> to
>>>>>> start the work.
>>>>>> We can have more detailed discussion there. Btw, we can start with
>> JIRAs
>>>>>> which we have agreed on.
>>>>>> 
>>>>>> Best,
>>>>>> Hequn
>>>>>> 
>>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <wshaox...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> +1. I agree that we should open the JIRAs to start the work. We may
>>>>>>> have better ideas on the flavor of the interface when
>> implement/review
>>>>>>> the code.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> shaoxuan
>>>>>>> 
>>>>>>> 
>>>>>>> On 11/20/18, jincheng sun <sunjincheng...@gmail.com> wrote:
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> Thanks all for the feedback.
>>>>>>>> 
>>>>>>>> @Piotr About not using abbreviations naming,  +1,I like
>>>>>>>> your proposal!Currently both DataSet and DataStream API are using
>>>>>>>> `aggregate`,
>>>>>>>> BTW,I find other language also not using abbreviations naming,such
>> as
>>>>> R.
>>>>>>>> 
>>>>>>>> Sometimes the interface of the API is really difficult to perfect,
>> we
>>>>>>> need
>>>>>>>> to spend a lot of time thinking and feedback from a large number of
>>>>>>> users,
>>>>>>>> and constantly improve, but for backward compatibility issues, we
>>>>> have to
>>>>>>>> adopt the most conservative approach when designing the API(Of
>>>>> course, I
>>>>>>> am
>>>>>>>> more in favor of developing more rich features, when we discuss
>>>>> clearly).
>>>>>>>> Therefore, I propose to divide the function implementation of
>>>>>>>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that
>>>>>>>> support time attributes and groupKeys. We can develop the features
>>>>> which
>>>>>>>> we  have already agreed on the design. And we will continue to
>> discuss
>>>>>>> the
>>>>>>>> uncertain design.
>>>>>>>> 
>>>>>>>> In fact, in addition to the design of APIs, there will be various
>>>>>>>> performance optimization details, such as: table Aggregate function
>>>>>>>> emitValue will generate multiple calculation results, in extreme
>>>>> cases,
>>>>>>>> each record will trigger a large number of retract messages, this
>> will
>>>>>>> have
>>>>>>>> poor performance,so we will also optimize the interface design, such
>>>>> as
>>>>>>>> adding the emitWithRetractValue interface (I have updated the google
>>>>> doc)
>>>>>>>> to allow the user to optionally perform incremental calculations,
>> thus
>>>>>>>> avoiding a large number of retracts. Details like this are difficult
>>>>> to
>>>>>>>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP
>>>>> first,
>>>>>>>> we develop designs that have been agreed upon and continue to
>> discuss
>>>>>>>> non-deterministic designs!  What do you think? @Fabian & Piotr &
>>>>> XiaoWei
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Jincheng
>>>>>>>> 
>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月19日周一 上午12:07写道:
>>>>>>>> 
>>>>>>>>> Hi Fabian & Piotr, thanks for the feedback!
>>>>>>>>> 
>>>>>>>>> I appreciate your concerns, both on timestamp attributes as well as
>>>>> on
>>>>>>>>> implicit group keys. At the same time, I'm also concerned with the
>>>>>>>>> proposed
>>>>>>>>> approach of allowing Expression* as parameters, especially for
>>>>>>>>> flatMap/flatAgg. So far, we never allowed a scalar expression to
>>>>> appear
>>>>>>>>> together with table expressions. With the Expression* approach,
>> this
>>>>>>> will
>>>>>>>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned
>> on
>>>>> if
>>>>>>>>> we
>>>>>>>>> fully understand the consequences when we try to extend our system
>> in
>>>>>>> the
>>>>>>>>> future. I would be extra cautious in doing this. To avoid this, I
>>>>> think
>>>>>>>>> an
>>>>>>>>> implicit group key for flatAgg is safer. For flatMap, if users want
>>>>> to
>>>>>>>>> keep
>>>>>>>>> the rowtime column, he can use crossApply/join instead. So we are
>> not
>>>>>>>>> losing any real functionality here.
>>>>>>>>> 
>>>>>>>>> Also a clarification on the following example:
>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>  .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>  .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>>  .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>> If we did not have the select clause in this example, we will have
>>>>> 'w as
>>>>>>>>> a
>>>>>>>>> regular column in the output. It should not magically disappear.
>>>>>>>>> 
>>>>>>>>> The concern is not as strong for Table.map/Table.agg because we are
>>>>> not
>>>>>>>>> mixing scalar and table expressions. But we also want to be a bit
>>>>>>>>> consistent with these methods. If we used implicit group keys for
>>>>>>>>> Table.flatAgg, we probably should do the same for Table.agg. Now we
>>>>> only
>>>>>>>>> have to choose what to do with Table.map. I can see good arguments
>>>>> from
>>>>>>>>> both sides. But starting with a single Expression seems safer
>> because
>>>>>>>>> that
>>>>>>>>> we can always extend to Expression* in the future.
>>>>>>>>> 
>>>>>>>>> While thinking about this problem, it appears that we may need more
>>>>> work
>>>>>>>>> in
>>>>>>>>> our handling of watermarks for SQL/Table API. Our current way of
>>>>>>>>> propagating the watermarks from source all the way to sink might
>> not
>>>>> be
>>>>>>>>> optimal. For example, after a tumbling window, the watermark can
>>>>>>> actually
>>>>>>>>> be advanced to just before the expiring of next window. I think
>> that
>>>>> in
>>>>>>>>> general, each operator may need to generate new watermarks instead
>> of
>>>>>>>>> simply propagating them. Once we accept that watermarks may change
>>>>>>> during
>>>>>>>>> the execution, it appears that the timestamp columns may also
>>>>> change, as
>>>>>>>>> long as we have some way to associate watermark with it. My
>>>>> intuition is
>>>>>>>>> that once we have a through solution for the watermark issue, we
>> may
>>>>> be
>>>>>>>>> able to solve the problem we encountered for Table.map in a cleaner
>>>>> way.
>>>>>>>>> But this is a complex issue which deserves a discussion on its own.
>>>>>>>>> 
>>>>>>>>> Regards,
>>>>>>>>> Xiaowei
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <
>>>>>>> pi...@data-artisans.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> Isn’t the problem of multiple expressions limited only to
>> `flat***`
>>>>>>>>>> functions and to be more specific only to having two (or more)
>>>>>>>>>> different
>>>>>>>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a),
>>>>>>>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well
>> defined
>>>>>>>>>> (duplicate result of every scalar function to every record. Or am
>> I
>>>>>>>>> missing
>>>>>>>>>> something?
>>>>>>>>>> 
>>>>>>>>>> Another remark, I would be in favour of not using abbreviations
>> and
>>>>>>>>> naming
>>>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.
>>>>>>>>>> 
>>>>>>>>>> Piotrek
>>>>>>>>>> 
>>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>> 
>>>>>>>>>>> I said before, that I think that the append() method is better
>> than
>>>>>>>>>>> implicitly forwarding keys, but still, I believe it adds
>>>>> unnecessary
>>>>>>>>>> boiler
>>>>>>>>>>> plate code.
>>>>>>>>>>> 
>>>>>>>>>>> Moreover, I haven't seen a convincing argument why
>> map(Expression*)
>>>>>>>>>>> is
>>>>>>>>>>> worse than map(Expression). In either case we need to do all
>> kinds
>>>>>>> of
>>>>>>>>>>> checks to prevent invalid use of functions.
>>>>>>>>>>> If the method is not correctly used, we can emit a good error
>>>>>>> message
>>>>>>>>> and
>>>>>>>>>>> documenting map(Expression*) will be easier than
>>>>>>>>>> map(append(Expression*)),
>>>>>>>>>>> in my opinion.
>>>>>>>>>>> I think we should not add unnessary syntax unless there is a good
>>>>>>>>> reason
>>>>>>>>>>> and to be honest, I haven't seen this reason yet.
>>>>>>>>>>> 
>>>>>>>>>>> Regarding the groupBy.agg() method, I think it should behave just
>>>>>>>>>>> like
>>>>>>>>>> any
>>>>>>>>>>> other method, i.e., not do any implicit forwarding.
>>>>>>>>>>> Let's take the example of the windowed group by, that you posted
>>>>>>>>> before.
>>>>>>>>>>> 
>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>>>> 
>>>>>>>>>>> What happens if 'w.rowtime is not selected? What is the data type
>>>>> of
>>>>>>>>> the
>>>>>>>>>>> field 'w in the resulting Table? Is it a regular field at all or
>>>>>>> just
>>>>>>>>>>> a
>>>>>>>>>>> system field that disappears if it is not selected?
>>>>>>>>>>> 
>>>>>>>>>>> IMO, the following syntax is shorter, more explicit, and better
>>>>>>>>>>> aligned
>>>>>>>>>>> with the regular window.groupBy.select aggregations that are
>>>>>>>>>>> supported
>>>>>>>>>>> today.
>>>>>>>>>>> 
>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>> 
>>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
>>>>>>>>>>> sunjincheng...@gmail.com>:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Fabian/Xiaowei,
>>>>>>>>>>>> 
>>>>>>>>>>>> I am very sorry for my late reply! Glad to see your reply, and
>>>>>>>>>>>> sounds
>>>>>>>>>>>> pretty good!
>>>>>>>>>>>> I agree that the approach with append() which can clearly
>> defined
>>>>>>>>>>>> the
>>>>>>>>>>>> result schema is better which Fabian mentioned.
>>>>>>>>>>>> In addition and append() and also contains non-time attributes,
>>>>>>>>>>>> e.g.:
>>>>>>>>>>>> 
>>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime)
>>>>>>>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
>>>>>>>>>>>> 'address, 'rowtime)
>>>>>>>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w)
>>>>>>>>>>>> .groupBy('w, 'address)
>>>>>>>>>>>> 
>>>>>>>>>>>> In this way the append() is very useful, and the behavior is
>> very
>>>>>>>>>> similar
>>>>>>>>>>>> to withForwardedFields() in DataSet.
>>>>>>>>>>>> So +1 to using append() approach for the map()&flatmap()!
>>>>>>>>>>>> 
>>>>>>>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I
>> agree
>>>>>>>>>>>> Xiaowei's approach that define the keys to be implied in the
>>>>> result
>>>>>>>>>> table
>>>>>>>>>>>> and appears at the beginning, for example as follows:
>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>>>>> 
>>>>>>>>>>>> What to you think? @Fabian @Xiaowei
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Jincheng
>>>>>>>>>>>> 
>>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the summary!
>>>>>>>>>>>>> I like the approach with append() better than the implicit
>>>>>>>>>>>>> forwarding
>>>>>>>>>> as
>>>>>>>>>>>> it
>>>>>>>>>>>>> clearly indicates which fields are forwarded.
>>>>>>>>>>>>> However, I don't see much benefit over the flatMap(Expression*)
>>>>>>>>>> variant,
>>>>>>>>>>>> as
>>>>>>>>>>>>> we would still need to analyze the full expression tree to
>> ensure
>>>>>>>>> that
>>>>>>>>>> at
>>>>>>>>>>>>> most (or exactly?) one Scalar / TableFunction is used.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
>>>>>>>>>>>>> sunjincheng...@gmail.com>:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> We are discussing very detailed content about this proposal.
>> We
>>>>>>>>>>>>>> are
>>>>>>>>>>>>> trying
>>>>>>>>>>>>>> to design the API in many aspects (functionality,
>> compatibility,
>>>>>>>>> ease
>>>>>>>>>>>> of
>>>>>>>>>>>>>> use, etc.). I think this is a very good process. Only such a
>>>>>>>>> detailed
>>>>>>>>>>>>>> discussion, In order to develop PR more clearly and smoothly
>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>> later
>>>>>>>>>>>>>> stage. I am very grateful to @Fabian and  @Xiaowei for
>> sharing a
>>>>>>>>>>>>>> lot
>>>>>>>>>> of
>>>>>>>>>>>>>> good ideas.
>>>>>>>>>>>>>> About the definition of method signatures I want to share my
>>>>>>>>>>>>>> points
>>>>>>>>>>>> here
>>>>>>>>>>>>>> which I am discussing with fabian in google doc (not yet
>>>>>>>>>>>>>> completed),
>>>>>>>>>> as
>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Assume we have a table:
>>>>>>>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long,
>>>>>>> 'string,
>>>>>>>>>>>>>> 'proctime.proctime)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Approach 1:
>>>>>>>>>>>>>> case1: Map follows Source Table
>>>>>>>>>>>>>> val result =
>>>>>>>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime
>>>>>>>>> implied
>>>>>>>>>>>> in
>>>>>>>>>>>>>> the output
>>>>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above)
>>>>>>>>>>>>>> val result =
>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>>>>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>>>>>>    .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
>>>>>>>>>>>>>>    .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result
>>>>> schema
>>>>>>>>>> would
>>>>>>>>>>>>> be
>>>>>>>>>>>>>> clearly defined, but add a built-in append UDF. That make
>>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression.
>>>>>>>>>>>>>> val result =
>>>>>>>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1,
>>>>>>>>>>>>>> 'col2,
>>>>>>>>>>>>>> 'long, 'proctime)
>>>>>>>>>>>>>>  .window(Tumble over 5.millis on 'proctime as 'w)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Note: Append is a special UDF for built-in that can pass
>> through
>>>>>>>>>>>>>> any
>>>>>>>>>>>>>> column.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> So, May be we can defined the as  table.map(Expression)
>> first,
>>>>>>> If
>>>>>>>>>>>>>> necessary, we can extend to table.map(Expression*)  in the
>>>>> future
>>>>>>>>>>>>>> ?
>>>>>>>>>> Of
>>>>>>>>>>>>>> course, I also hope that we can do more perfection in this
>>>>>>>>>>>>>> proposal
>>>>>>>>>>>>> through
>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I think that the key question you raised is if we allow extra
>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing
>>>>> that
>>>>>>>>> may
>>>>>>>>>>>>>> appear
>>>>>>>>>>>>>>> more convenient in some cases. However, it might also cause
>>>>> some
>>>>>>>>>>>>>> confusions
>>>>>>>>>>>>>>> if we do that. For example, do we allow multiple UDFs in
>> these
>>>>>>>>>>>>>> expressions?
>>>>>>>>>>>>>>> If we do, the semantics may be weird to define, e.g. what
>> does
>>>>>>>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean?
>>>>>>>>>>>>>>> Even
>>>>>>>>>>>>> though
>>>>>>>>>>>>>>> not allowing it may appear less powerful, but it can make
>>>>> things
>>>>>>>>> more
>>>>>>>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the
>>>>>>> keys
>>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>>>> implied in the result table and appears at the beginning. You
>>>>>>> can
>>>>>>>>>>>> use a
>>>>>>>>>>>>>>> select method if you want to modify this behavior. I think
>> that
>>>>>>>>>>>>>> eventually
>>>>>>>>>>>>>>> we will have some API which allows other expressions as
>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>> parameters, but I think it's better to do that after we
>>>>>>> introduce
>>>>>>>>> the
>>>>>>>>>>>>>>> concept of nested tables. A lot of things we suggested here
>> can
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> considered as special cases of that. But things are much
>>>>> simpler
>>>>>>>>>>>>>>> if
>>>>>>>>>>>> we
>>>>>>>>>>>>>>> leave that to later.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <
>>>>> fhue...@gmail.com
>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> * Re emit:
>>>>>>>>>>>>>>>> I think we should start with a well understood semantics of
>>>>>>> full
>>>>>>>>>>>>>>>> replacement. This is how the other agg functions work.
>>>>>>>>>>>>>>>> As was said before, there are open questions regarding an
>>>>>>> append
>>>>>>>>>>>> mode
>>>>>>>>>>>>>>>> (checkpointing, whether supporting retractions or not and if
>>>>>>> yes
>>>>>>>>>>>> how
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> declare them, ...).
>>>>>>>>>>>>>>>> Since this seems to be an optimization, I'd postpone it.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> * Re grouping keys:
>>>>>>>>>>>>>>>> I don't think we should automatically add them because the
>>>>>>>>>>>>>>>> result
>>>>>>>>>>>>>> schema
>>>>>>>>>>>>>>>> would not be intuitive.
>>>>>>>>>>>>>>>> Would they be added at the beginning of the tuple or at the
>>>>>>> end?
>>>>>>>>>>>> What
>>>>>>>>>>>>>>>> metadata fields of windows would be added? In which order
>>>>> would
>>>>>>>>>>>> they
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> added?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> However, we could support syntax like this:
>>>>>>>>>>>>>>>> val t: Table = ???
>>>>>>>>>>>>>>>> t
>>>>>>>>>>>>>>>> .window(Tumble ... as 'w)
>>>>>>>>>>>>>>>> .groupBy('a, 'b)
>>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime
>>>>>>> as
>>>>>>>>>>>>>> 'rtime)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2,
>>>>>>>>>>>>>>>> ...,
>>>>>>>>>>>> fn,
>>>>>>>>>>>>>>> wend,
>>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the
>> UDF.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> * Re Multi-staged evaluation:
>>>>>>>>>>>>>>>> I think this should be an optimization that can be applied
>> if
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>> UDF
>>>>>>>>>>>>>>>> implements the merge() method.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
>>>>>>>>>>>>>>>> wshaox...@gmail.com
>>>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi xiaowei,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Yes, I agree with you that the semantics of
>>>>>>>>>>>> TableAggregateFunction
>>>>>>>>>>>>>> emit
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> much more complex than AggregateFunction. The fundamental
>>>>>>>>>>>>> difference
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> that TableAggregateFunction emits a "table" while
>>>>>>>>>>>> AggregateFunction
>>>>>>>>>>>>>>>> outputs
>>>>>>>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it
>>>>>>> only
>>>>>>>>>>>> has
>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>> mode which is “replacing” (complete update). But for
>>>>>>>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit
>>>>> the
>>>>>>>>>>>> new
>>>>>>>>>>>>>>>> updated
>>>>>>>>>>>>>>>>> results) update or complete update (always emit the entire
>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>> “emit" is triggered).  From the performance perspective, we
>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> use incremental update. But we need review and design this
>>>>>>>>>>>>> carefully,
>>>>>>>>>>>>>>>>> especially taking into account the cases of the failover
>>>>>>>>>>>>>>>>> (instead
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>> back-up the ACC it may also needs to remember the emit
>>>>> offset)
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction
>> emit
>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it
>>>>> does
>>>>>>>>>>>> not
>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> worry this due to the nature of stateless.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> Shaoxuan
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang
>>>>>>>>>>>>>>>>> <xiaow...@gmail.com
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks for adding the public interfaces! I think that
>> it's a
>>>>>>>>>>>> very
>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>> start. There are a few points that we need to have more
>>>>>>>>>>>>>> discussions.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast,
>>>>>>>>>>>>>>> definitely
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> most complex user defined objects we introduced so far. I
>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>> quite some interesting questions here. For example, do we
>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the
>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> emit? Is
>>>>>>>>>>>>>>>>>> it amendments to the previous output, or replacing it? I
>>>>>>>>>>>> think
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> subject itself is worth a discussion to make sure we get
>>>>>>> the
>>>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>>>> right.
>>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically
>>>>>>>>>>>> appear
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> output? how about the case of windowing aggregation?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
>>>>>>>>>>>>>>> sunjincheng...@gmail.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi, Xiaowei,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement
>>>>>>>>>>>>> Outline
>>>>>>>>>>>>>> !
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I quickly looked at the overall content, these are good
>>>>>>>>>>>>>> expressions
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> our
>>>>>>>>>>>>>>>>>>> offline discussions. But from the points of my view, we
>>>>>>>>>>>> should
>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> usage of public interfaces that we will introduce in this
>>>>>>>>>>>>>> propose.
>>>>>>>>>>>>>>>>> So, I
>>>>>>>>>>>>>>>>>>> added the following usage description of  interface and
>>>>>>>>>>>>> operators
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> google doc:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 1. Map Operator
>>>>>>>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator
>> can
>>>>>>>>>>>>>>> apply a
>>>>>>>>>>>>>>>>>>> scalar function, and can return multi-column. The usage
>> as
>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>>>  .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>>>>  .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 2. FlatMap Operator
>>>>>>>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap
>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>>>>> a table function, and can return multi-row. The usage as
>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>>>   .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 3. Agg Operator
>>>>>>>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable,
>> Agg
>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>> apply a aggregate function, and can return multi-column.
>>>>> The
>>>>>>>>>>>>>> usage
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>>>   .groupBy(‘a) // leave groupBy-Clause out to define
>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>>>>>   .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 4.  FlatAgg Operator
>>>>>>>>>>>>>>>>>>> FlatAgg operator is a new operator of
>> Table/GroupedTable,
>>>>>>>>>>>>>>> FaltAgg
>>>>>>>>>>>>>>>>>>> operator can apply a table aggregate function, and can
>>>>>>> return
>>>>>>>>>>>>>>>>> multi-row.
>>>>>>>>>>>>>>>>>>> The usage as follows:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>>>    .groupBy(‘a) // leave groupBy-Clause out to define
>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>>>>>    .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>>>>    .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction
>>>>>>>>>>>>>>>>>>>  The behavior of table aggregates is most like
>>>>>>>>>>>>>>>> GroupReduceFunction
>>>>>>>>>>>>>>>>>> did,
>>>>>>>>>>>>>>>>>>> which computed for a group of elements, and output  a
>> group
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> elements.
>>>>>>>>>>>>>>>>>>> The TableAggregateFunction can be applied on
>>>>>>>>>>>>>>> GroupedTable.flatAgg() .
>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content,
>>>>> so
>>>>>>>>>>>> I
>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> copy
>>>>>>>>>>>>>>>>>>> it here, Please look at the detail in google doc:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and
>>>>>>>>>>>>> commenting.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> 
>>>>>>> 
>>>>> 
>> -----------------------------------------------------------------------------------
>>>>>>> 
>>>>>>> *Rome was not built in one day*
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>> -----------------------------------------------------------------------------------
>>>>>>> 
>>>>> 
>>>>> 
>> 
>> 


Reply via email to