Hi Jincheng & Fabian,

+1 From my point of view.

I like this idea that you have to close `flatAggregate` with `select` 
statement. In a way it will be consistent with normal `groupBy` and indeed it 
solves the problem of mixing table and scalar functions. 

I would be against supporting `select(‘*)` that returns only `col1` and `col2`. 
`select('*)` for me would mean “give me everything that you have” and it would 
be very confusing for me that:

  tab.window(Tumble ... as 'w)
   .groupBy('w, 'k1, 'k2)
   .flatAggregate(tableAgg('a))
   .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)

would return more columns compared to:

  tab.window(Tumble ... as 'w)
   .groupBy('w, 'k1, 'k2)
   .flatAggregate(tableAgg('a))
   .select(‘*)

Also I would be fine with not supporting ever `.select(‘*)`. After all it would 
be consistent with regular aggregation where this doesn’t make sense and is not 
supported as well:

  tab.window(Tumble ... as 'w)
   .groupBy('w, 'k1, 'k2)
   .select(‘*)

Piotrek

> On 29 Nov 2018, at 10:51, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi,
> 
> OK, not supporting select('*) in the first version, sounds like a good
> first step, +1 for this.
> 
> However, I don't think that select('*) returning only the result columns of
> the agg function would be a significant break in semantics.
> Since aggregate()/flatAggregate() is the last command and (visibly) only
> forwards the result columns of the agg function, returning those for '*
> seems fine with me.
> The grouping columns (+window properties) are "side-passed" from the
> groupBy.
> 
> So, IMO, both select('*) and select('k1, 'k2, 'w.rowtime, '*) have well
> defined semantics. But we can add these shortcuts later.
> 
> Best,
> Fabian
> 
> 
> Am Mi., 28. Nov. 2018 um 11:13 Uhr schrieb jincheng sun <
> sunjincheng...@gmail.com>:
> 
>> Hi Fabian,
>> 
>> Thank you for listing the detailed example of forcing the use of select.
>> 
>> If I didn't make it clear before, I would like to share my thoughts  about
>> the group keys here:
>> 
>>  1. agg/flatagg(Expression) keeps a single Expression;
>>  2. The way to force users to use select is as follows(As you mentioned):
>> 
>>   val tabX2 = tab.window(Tumble ... as 'w)
>> 
>>    .groupBy('w, 'k1, 'k2)
>> 
>>    .flatAgg(tableAgg('a))
>> 
>>    .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)
>> 
>> 
>> 3.   IMO. we do not support select "*" in window group table at the first
>> version.
>>      But i am fine if you want support the shout cut,i.e.:
>> `select('w.rowtime, 'k1, 'k2, '*)` , which meant "*" does not include group
>> keys.
>>     Just a little bit worried that "*" represents all columns in a
>> non-grouped table, and groupedTable does not represent all columns.
>>     The semantics of “*” are not uniform in this case.
>> 
>> What do you think?
>> 
>> Thanks,
>> Jincheng
>> 
>> 
>> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午7:27写道:
>> 
>>> I don't think we came to a conclusion yet.
>>> Are you suggesting that 'w would disappear if we do the following:
>>> 
>>> val tabX = tab.window(Tumble ... as 'w)
>>>    .groupBy('w, 'k1, 'k2)
>>>    .flatAgg(tableAgg('a))
>>>    .select('*)
>>> 
>>> Given that tableAgg returns [col1, col2], what would the schema of tabX
>> be?
>>> 
>>> 1) [col1, col2]: fine with me
>>> 2) [k1, k2, col1, col2]: I'm very skeptical about this option because it
>> is
>>> IMO inconsistent. Users will ask, where did 'w go.
>>> 2) [w, k1, k2, col1, col2]: back to our original problem. What's the data
>>> type of 'w?
>>> 
>>> If we go for option 1, we can still allow to select keys.
>>> 
>>> val tabX2 = tab.window(Tumble ... as 'w)
>>>    .groupBy('w, 'k1, 'k2)
>>>    .flatAgg(tableAgg('a))
>>>    .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)
>>> 
>>> The good thing about enforcing select is that the result of flatAgg is
>> not
>>> a Table, i.e., the definition of the operation is not completed yet.
>>> On the other hand, we would need to list all result attributes of the
>> table
>>> function if we want to select keys. We could to that with a short cut:
>>> 
>>> val tabX3 = tab.window(Tumble ... as 'w)
>>>    .groupBy('w, 'k1, 'k2)
>>>    .flatAgg(tableAgg('a))
>>>    .select('w.rowtime, 'k1, 'k2, '*)
>>> 
>>> 
>>> Am Di., 27. Nov. 2018 um 10:46 Uhr schrieb jincheng sun <
>>> sunjincheng...@gmail.com>:
>>> 
>>>> Thanks Fabian, if we enforcing select, as i said before user should
>> using
>>>> 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX' etc.  In this way we
>>>> should not defined the type of 'w, we can keep the current way of using
>>> 'w.
>>>> I'll file the JIRA. and open the PR ASAP.
>>>> 
>>>> Thanks,
>>>> Jincheng
>>>> 
>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午4:50写道:
>>>> 
>>>>> I thought about it again.
>>>>> Enforcing select won't help us with the choice of how to represent
>> 'w.
>>>>> 
>>>>> select('*) and
>>>>> select('a, 'b, 'w).
>>>>> 
>>>>> would still be valid expressions and we need to decide how 'w is
>>>>> represented.
>>>>> As I said before, Tuple, Row, and Map have disadvantages because the
>>>> syntax
>>>>> 'w.rowtime or 'w.end would not be supported.
>>>>> 
>>>>> Am Di., 27. Nov. 2018 um 01:05 Uhr schrieb jincheng sun <
>>>>> sunjincheng...@gmail.com>:
>>>>> 
>>>>>> Before we have a good support for nest-table, may be forcing the
>> use
>>> of
>>>>>> select is good way, at least not causing compatibility issues.
>>>>>> 
>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月26日周一 下午6:48写道:
>>>>>> 
>>>>>>> I think the question is what is the data type of 'w.
>>>>>>> 
>>>>>>> Until now, I assumed it would be a nested tuple (Row or Tuple).
>>>>>>> Accessing nested fields in Row, Tuple or Pojo is done with get,
>>> i.e.,
>>>>>>> 'w.get("rowtime").
>>>>>>> Using a Map would not help because the access would be 'w.at
>>>>> ("rowtime").
>>>>>>> 
>>>>>>> We can of course also enforce the select() if aggregate /
>>>> flatAggregate
>>>>>> do
>>>>>>> not return a Table but some kind of AggregatedTable that does not
>>>>> provide
>>>>>>> any other function than select().
>>>>>>> 
>>>>>>> Am Mo., 26. Nov. 2018 um 01:12 Uhr schrieb jincheng sun <
>>>>>>> sunjincheng...@gmail.com>:
>>>>>>> 
>>>>>>>> Yes,I agree the problem is needs attention. IMO. It depends on
>>> how
>>>> we
>>>>>>>> define the ‘w type. The way you above defines the 'w type as a
>>>> tuple.
>>>>>> If
>>>>>>>> you serialize 'w to a Map, the compatibility will be better.
>> Even
>>>>> more
>>>>>> we
>>>>>>>> can define ‘w as a special type. UDF and Sink can't be used
>>>> directly.
>>>>>>> Must
>>>>>>>> use 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX', and I
>>> will
>>>> be
>>>>>>> very
>>>>>>>> grateful if you can share your solution to this problem,  and
>> we
>>>> also
>>>>>> can
>>>>>>>> discuss it carefully in the PR to be opened. What to you think?
>>>>>>>> 
>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午6:21写道:
>>>>>>>> 
>>>>>>>>> Something like:
>>>>>>>>> 
>>>>>>>>> val x = tab.window(Tumble ... as 'w)
>>>>>>>>>    .groupBy('w, 'k1, 'k2)
>>>>>>>>>    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>> 
>>>>>>>>> x.insertInto("sinkTable") // fails because result schema has
>>>>> changed
>>>>>>> from
>>>>>>>>> ((start, end, rowtime), k1, k2, col1, col2) to ((start, end,
>>>>> rowtime,
>>>>>>>>> newProperty), k1, k2, col1, col2)
>>>>>>>>> x.select(myUdf('w)) // fails because UDF expects (start, end,
>>>>>> rowtime)
>>>>>>>> and
>>>>>>>>> not (start, end, rowtime, newProperty)
>>>>>>>>> 
>>>>>>>>> Basically, every time when the composite type 'w is used as a
>>>>> whole.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Am Fr., 23. Nov. 2018 um 10:45 Uhr schrieb jincheng sun <
>>>>>>>>> sunjincheng...@gmail.com>:
>>>>>>>>> 
>>>>>>>>>> Hi Fabian,
>>>>>>>>>> 
>>>>>>>>>> I don't fully understand the question you mentioned:
>>>>>>>>>> 
>>>>>>>>>> Any query that relies on the composite type with three
>> fields
>>>>> will
>>>>>>> fail
>>>>>>>>>> 
>>>>>>>>>> after adding a forth field.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> I am appreciate if you can give some detail examples ?
>>>>>>>>>> 
>>>>>>>>>> Regards,
>>>>>>>>>> JIncheng
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午4:41写道:
>>>>>>>>>> 
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> My concerns are about the case when there is no
>> additional
>>>>>> select()
>>>>>>>>>> method,
>>>>>>>>>>> i.e.,
>>>>>>>>>>> 
>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>    .groupBy('w, 'k1, 'k2)
>>>>>>>>>>>    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>>>> 
>>>>>>>>>>> In this case, 'w is a composite field consisting of three
>>>>> fields
>>>>>>>> (end,
>>>>>>>>>>> start, rowtime).
>>>>>>>>>>> Once we add a new property, it would need to be added to
>>> the
>>>>>>>> composite
>>>>>>>>>>> type.
>>>>>>>>>>> Any query that relies on the composite type with three
>>> fields
>>>>>> will
>>>>>>>> fail
>>>>>>>>>>> after adding a forth field.
>>>>>>>>>>> 
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>> 
>>>>>>>>>>> Am Fr., 23. Nov. 2018 um 02:01 Uhr schrieb jincheng sun <
>>>>>>>>>>> sunjincheng...@gmail.com>:
>>>>>>>>>>> 
>>>>>>>>>>>> Thanks Fabian,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks a lot for your feedback, and very important and
>>>>>> necessary
>>>>>>>>> design
>>>>>>>>>>>> reminders!
>>>>>>>>>>>> 
>>>>>>>>>>>> Yes, your are right!  Spark is the specified grouping
>>>> columns
>>>>>>>>> displayed
>>>>>>>>>>>> before 1.3, but the grouping columns are implicitly
>>> passed
>>>> in
>>>>>>>>> spark1.4
>>>>>>>>>>> and
>>>>>>>>>>>> later. The reason for changing this behavior is that
>> due
>>> to
>>>>> the
>>>>>>>> user
>>>>>>>>>>>> feedback. Although implicit delivery will have the
>>>> drawbacks
>>>>>> you
>>>>>>>>>>> mentioned,
>>>>>>>>>>>> this approach is really convenient for the user.
>>>>>>>>>>>> I agree that grouping on windows we have to pay
>> attention
>>>> to
>>>>>> the
>>>>>>>>>> handling
>>>>>>>>>>>> of the window's properties, because we may introduce
>> new
>>>>> window
>>>>>>>>>> property.
>>>>>>>>>>>> So, from the points of view, We delay the processing of
>>> the
>>>>>>> window
>>>>>>>>>>>> property, ie: we pass the complex type 'w on the
>>> tableAPI,
>>>>> and
>>>>>>>>> provide
>>>>>>>>>>>> different property method operations in the SELECT
>>>> according
>>>>> to
>>>>>>> the
>>>>>>>>>> type
>>>>>>>>>>> of
>>>>>>>>>>>> 'w, such as: 'w.start, 'w.end, 'w.xxx , in the TableAPI
>>>> will
>>>>>>> limit
>>>>>>>>> and
>>>>>>>>>>>> verify the attribute operations that 'w has. An example
>>> is
>>>> as
>>>>>>>>> follows:
>>>>>>>>>>>> 
>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>>>>    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1,
>> 'col2)
>>>> //
>>>>> 'w
>>>>>>> is
>>>>>>>>>>>> composite field
>>>>>>>>>>>>    .select('k1, 'col1, 'w.rowtime as 'ts, 'w.xxx as
>> 'xx)
>>>> //
>>>>> In
>>>>>>>>> select
>>>>>>>>>> we
>>>>>>>>>>>> will limit and verify  that ’w.xx is allowed
>>>>>>>>>>>> 
>>>>>>>>>>>> I am not sure if I fully understand your concerns, if
>>> there
>>>>> any
>>>>>>>>>>> understand
>>>>>>>>>>>> are mistakes, please correct me. Any feedback is
>>>> appreciate!
>>>>>>>>>>>> 
>>>>>>>>>>>> Bests,
>>>>>>>>>>>> Jincheng
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年11月22日周四
>>>> 下午10:13写道:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> First of all, it is correct that the
>>> flatMap(Expression*)
>>>>> and
>>>>>>>>>>>>> flatAggregate(Expression*) methods would mix scalar
>> and
>>>>> table
>>>>>>>>> values.
>>>>>>>>>>>>> This would be a new concept that is not present in
>> the
>>>>>> current
>>>>>>>> API.
>>>>>>>>>>>>> From my point of view, the semantics are quite clear,
>>>> but I
>>>>>>>>>> understand
>>>>>>>>>>>> that
>>>>>>>>>>>>> others are more careful and worry about future
>>>> extensions.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am fine with going for single expression arguments
>>> for
>>>>>> map()
>>>>>>>> and
>>>>>>>>>>>>> flatMap(). We can later expand them to Expression* if
>>> we
>>>>> feel
>>>>>>> the
>>>>>>>>>> need
>>>>>>>>>>>> and
>>>>>>>>>>>>> are more comfortable about the implications.
>>>>>>>>>>>>> Whenever, a time attribute needs to be forwarded,
>> users
>>>> can
>>>>>>> fall
>>>>>>>>> back
>>>>>>>>>>> to
>>>>>>>>>>>>> join(TableFunction) as Xiaowei mentioned.
>>>>>>>>>>>>> So we restrict the usability of the new methods but
>>> don't
>>>>>> lose
>>>>>>>>>>>>> functionality and don't prevent future extensions.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The aggregate() and flatAggregate() case is more
>>>> difficult
>>>>>>>> because
>>>>>>>>>>>> implicit
>>>>>>>>>>>>> forwarding of grouping fields cannot be changed later
>>>>> without
>>>>>>>>>> breaking
>>>>>>>>>>>> the
>>>>>>>>>>>>> API.
>>>>>>>>>>>>> There are other APIs (e.g., Spark) that also
>> implicitly
>>>>>> forward
>>>>>>>> the
>>>>>>>>>>>>> grouping columns. So this is not uncommon.
>>>>>>>>>>>>> However, I personally don't like that approach,
>> because
>>>> it
>>>>> is
>>>>>>>>>> implicit
>>>>>>>>>>>> and
>>>>>>>>>>>>> introduces a new behavior that is not present in the
>>>>> current
>>>>>>> API.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> One thing to consider here is the handling of
>> grouping
>>> on
>>>>>>>> windows.
>>>>>>>>>>>>> If I understood Xiaowei correctly, a composite field
>>> that
>>>>> is
>>>>>>>> named
>>>>>>>>>> like
>>>>>>>>>>>> the
>>>>>>>>>>>>> window alias (e.g., 'w) would be implicitly added to
>>> the
>>>>>> result
>>>>>>>> of
>>>>>>>>>>>>> aggregate() or flatAggregate().
>>>>>>>>>>>>> The composite field would have fields like (start,
>> end,
>>>>>>> rowtime)
>>>>>>>> or
>>>>>>>>>>>> (start,
>>>>>>>>>>>>> end, proctime) depending on the window type.
>>>>>>>>>>>>> If we would ever introduce a fourth window property,
>> we
>>>>> might
>>>>>>>> break
>>>>>>>>>>>>> existing queries.
>>>>>>>>>>>>> Is this something that we should worry about?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Am Do., 22. Nov. 2018 um 14:03 Uhr schrieb Piotr
>>>> Nowojski <
>>>>>>>>>>>>> pi...@data-artisans.com>:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> #1) ok, got it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> #3)
>>>>>>>>>>>>>>> From points of my view I we can using
>>>>>>>>>>>>>>> `Expression`, and after the discussion decided to
>>> use
>>>>>>>>>> Expression*,
>>>>>>>>>>>> then
>>>>>>>>>>>>>>> improve it. In any case, we can use Expression,
>> and
>>>>> there
>>>>>>> is
>>>>>>>> an
>>>>>>>>>>>>>> opportunity
>>>>>>>>>>>>>>> to become Expression* (compatibility). If we use
>>>>>>> Expression*
>>>>>>>>>>>> directly,
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> is difficult for us to become Expression, which
>>> will
>>>>>> break
>>>>>>>> the
>>>>>>>>>>>>>>> compatibility between versions.  What do you
>> think?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I don’t think that’s the case here. If we start
>> with
>>>>> single
>>>>>>>> param
>>>>>>>>>>>>>> `flatMap(Expression)`, it will need implicit
>> columns
>>> to
>>>>> be
>>>>>>>>> present
>>>>>>>>>> in
>>>>>>>>>>>> the
>>>>>>>>>>>>>> result, which:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> a) IMO it brakes SQL convention (that’s why I’m
>>> against
>>>>>> this)
>>>>>>>>>>>>>> b) we can not later easily introduce
>>>>> `flatMap(Expression*)`
>>>>>>>>> without
>>>>>>>>>>>> those
>>>>>>>>>>>>>> implicit columns, without braking the compatibility
>>> or
>>>> at
>>>>>>> least
>>>>>>>>>>> without
>>>>>>>>>>>>>> making `flatMap(Expression*)` and
>>> `flatMap(Expression)`
>>>>>>>> terribly
>>>>>>>>>>>>>> inconsistent.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> To elaborate on (a). It’s not nice if our own API
>> is
>>>>>>>> inconsistent
>>>>>>>>>> and
>>>>>>>>>>>> it
>>>>>>>>>>>>>> sometimes behaves one way and sometimes another
>> way:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>> table.groupBy(‘k).select(scalarAggregateFunction(‘v))
>>>> =>
>>>>>>> single
>>>>>>>>>>> column
>>>>>>>>>>>>>> result, just the output of
>> `scalarAggregateFunction`
>>>>>>>>>>>>>> vs
>>>>>>>>>>>>>> 
>>>>> table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v))
>>>>>>> =>
>>>>>>>>> both
>>>>>>>>>>>>> result
>>>>>>>>>>>>>> of `tableAggregateFunction` plus key (and an
>> optional
>>>>>> window
>>>>>>>>>> context
>>>>>>>>>>> ?)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thus I think we have to now decide which way we
>> want
>>> to
>>>>>> jump,
>>>>>>>>> since
>>>>>>>>>>>> later
>>>>>>>>>>>>>> will be too late. Or again, am I missing something?
>>> :)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 22 Nov 2018, at 02:07, jincheng sun <
>>>>>>>>> sunjincheng...@gmail.com
>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>>>>>> #1)We have unbounded and bounded group window
>>>>> aggregate,
>>>>>>> for
>>>>>>>>>>>> unbounded
>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>> we should early fire the result with retract
>>> message,
>>>>> we
>>>>>>> can
>>>>>>>>> not
>>>>>>>>>>>> using
>>>>>>>>>>>>>>> watermark, because unbounded aggregate never
>>>> finished.
>>>>>> (for
>>>>>>>>>>>> improvement
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> can introduce micro-batch in feature),  for
>> bounded
>>>>>> window
>>>>>>> we
>>>>>>>>>> never
>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>> early fire, so we do not need retract.
>>>>>>>>>>>>>>> #3)  About validation of
>>>> `table.select(F(‘a).unnest(),
>>>>>> ‘b,
>>>>>>>>>>>>>>> G(‘c).unnest())/table.flatMap(F(‘a), ‘b,
>>>> scalarG(‘c))`
>>>>>>> Fabian
>>>>>>>>> had
>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>> above, please look at the prior mail.  For
>>>>>>>>> `table.flatMap(F(‘a),
>>>>>>>>>>> ‘b,
>>>>>>>>>>>>>>> scalarG(‘c))` that we concerned, i.e.:  we should
>>>>> discuss
>>>>>>> the
>>>>>>>>>> issue
>>>>>>>>>>>> of
>>>>>>>>>>>>>>> `Expression*` vs `Expression`. From points of my
>>>> view I
>>>>>> we
>>>>>>>> can
>>>>>>>>>>> using
>>>>>>>>>>>>>>> `Expression`, and after the discussion decided to
>>> use
>>>>>>>>>> Expression*,
>>>>>>>>>>>> then
>>>>>>>>>>>>>>> improve it. In any case, we can use Expression,
>> and
>>>>> there
>>>>>>> is
>>>>>>>> an
>>>>>>>>>>>>>> opportunity
>>>>>>>>>>>>>>> to become Expression* (compatibility). If we use
>>>>>>> Expression*
>>>>>>>>>>>> directly,
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> is difficult for us to become Expression, which
>>> will
>>>>>> break
>>>>>>>> the
>>>>>>>>>>>>>>> compatibility between versions.  What do you
>> think?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> If there anything not clearly, welcome any
>>>>>>>>> feedback!Agains,thanks
>>>>>>>>>>> for
>>>>>>>>>>>>>> share
>>>>>>>>>>>>>>> your thoughts!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com>
>>>>> 于2018年11月21日周三
>>>>>>>>>> 下午9:37写道:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> #1) No,watermark solves the issue of the late
>>>> event.
>>>>>>> Here,
>>>>>>>>> the
>>>>>>>>>>>>>>>> performance
>>>>>>>>>>>>>>>>> problem is caused by the update emit mode.
>> i.e.:
>>>> When
>>>>>>>> current
>>>>>>>>>>>>>> calculation
>>>>>>>>>>>>>>>>> result is output, the previous calculation
>> result
>>>>> needs
>>>>>>> to
>>>>>>>> be
>>>>>>>>>>>>>> retracted.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hmm, yes I missed this. For time-windowed cases
>>>> (some
>>>>>>>>>>>>>>>> aggregate/flatAggregate cases) emitting only on
>>>>>> watermark
>>>>>>>>> should
>>>>>>>>>>>> solve
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> problem. For non time windowed cases it would
>>> reduce
>>>>> the
>>>>>>>>> amount
>>>>>>>>>> of
>>>>>>>>>>>>>>>> retractions, right? Or am I still missing
>>> something?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> #3)I still hope to keep the simplicity that
>>> select
>>>>> only
>>>>>>>>> support
>>>>>>>>>>>>>> projected
>>>>>>>>>>>>>>>>> scalar, we can hardly tell the semantics of
>>>>>>>>>>> tab.select(flatmap('a),
>>>>>>>>>>>>> 'b,
>>>>>>>>>>>>>>>>> flatmap('d)).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest())
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Could be rejected during some validation phase.
>> On
>>>> the
>>>>>>> other
>>>>>>>>>> hand:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> table.select(F(‘a).unnest(), ‘b, scalarG(‘c))
>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>> table.flatMap(F(‘a), ‘b, scalarG(‘c))
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Could work and be more or less a syntax sugar
>> for
>>>>> cross
>>>>>>>> apply.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 21 Nov 2018, at 12:16, jincheng sun <
>>>>>>>>>> sunjincheng...@gmail.com
>>>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi shaoxuan & Hequn,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks for your suggestion,I'll file the JIRAs
>>>> later.
>>>>>>>>>>>>>>>>> We can prepare PRs while continuing to move
>>> forward
>>>>> the
>>>>>>>>> ongoing
>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> jincheng sun <sunjincheng...@gmail.com>
>>>>> 于2018年11月21日周三
>>>>>>>>>> 下午7:07写道:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>>>>>>>>> Thanks for your feedback, and thanks for
>> share
>>>> your
>>>>>>>>> thoughts!
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> #1) No,watermark solves the issue of the late
>>>> event.
>>>>>>> Here,
>>>>>>>>> the
>>>>>>>>>>>>>>>> performance
>>>>>>>>>>>>>>>>>> problem is caused by the update emit mode.
>> i.e.:
>>>>> When
>>>>>>>>> current
>>>>>>>>>>>>>>>> calculation
>>>>>>>>>>>>>>>>>> result is output, the previous calculation
>>> result
>>>>>> needs
>>>>>>> to
>>>>>>>>> be
>>>>>>>>>>>>>> retracted.
>>>>>>>>>>>>>>>>>> #2) As I mentioned above we should continue
>> the
>>>>>>> discussion
>>>>>>>>>> until
>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> solve
>>>>>>>>>>>>>>>>>> the problems raised by Xiaowei and Fabian.
>>>>>>>>>>>>>>>>>> #3)I still hope to keep the simplicity that
>>> select
>>>>>> only
>>>>>>>>>> support
>>>>>>>>>>>>>>>> projected
>>>>>>>>>>>>>>>>>> scalar, we can hardly tell the semantics of
>>>>>>>>>>>> tab.select(flatmap('a),
>>>>>>>>>>>>>> 'b,
>>>>>>>>>>>>>>>>>> flatmap('d)).
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com>
>>>>>> 于2018年11月21日周三
>>>>>>>>>>> 下午5:24写道:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> In fact, in addition to the design of APIs,
>>>> there
>>>>>> will
>>>>>>>> be
>>>>>>>>>>>> various
>>>>>>>>>>>>>>>>>>>> performance optimization details, such as:
>>> table
>>>>>>>> Aggregate
>>>>>>>>>>>>> function
>>>>>>>>>>>>>>>>>>>> emitValue will generate multiple calculation
>>>>>> results,
>>>>>>> in
>>>>>>>>>>> extreme
>>>>>>>>>>>>>>>> cases,
>>>>>>>>>>>>>>>>>>>> each record will trigger a large number of
>>>> retract
>>>>>>>>> messages,
>>>>>>>>>>>> this
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>> poor performance
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Can this be solved/mitigated by emitting the
>>>>> results
>>>>>>> only
>>>>>>>>> on
>>>>>>>>>>>>>>>> watermarks?
>>>>>>>>>>>>>>>>>>> I think that was the path that we decided to
>>> take
>>>>>> both
>>>>>>>> for
>>>>>>>>>>>> Temporal
>>>>>>>>>>>>>>>> Joins
>>>>>>>>>>>>>>>>>>> and upsert stream conversion. I know that
>> this
>>>>>>> increases
>>>>>>>>> the
>>>>>>>>>>>>> latency
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> there is a place for a future global
>>> setting/user
>>>>>>>>> preference
>>>>>>>>>>>> “emit
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>> ASAP mode”, but emitting only on watermarks
>>> seems
>>>>> to
>>>>>> me
>>>>>>>> as
>>>>>>>>> a
>>>>>>>>>>>>>>>> better/more
>>>>>>>>>>>>>>>>>>> sane default.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> With respect to the API discussion and
>> implicit
>>>>>>> columns.
>>>>>>>>> The
>>>>>>>>>>>>> problem
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> me so far is I’m not sure if I like the
>>>>> additionally
>>>>>>>>>> complexity
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> `append()` solution, while implicit columns
>> are
>>>>>>>> definitely
>>>>>>>>>> not
>>>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> spirit of SQL. Neither joins nor aggregations
>>> add
>>>>>> extra
>>>>>>>>>>>> unexpected
>>>>>>>>>>>>>>>> columns
>>>>>>>>>>>>>>>>>>> to the result without asking. This definitely
>>> can
>>>>> be
>>>>>>>>>> confusing
>>>>>>>>>>>> for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> users since it brakes the convention. Thus I
>>>> would
>>>>>> lean
>>>>>>>>>> towards
>>>>>>>>>>>>>>>> Fabian’s
>>>>>>>>>>>>>>>>>>> proposal of multi-argument `map(Expression*)`
>>>> from
>>>>>>> those
>>>>>>>> 3
>>>>>>>>>>>> options.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 3.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Another topic is that I’m not 100% convinced
>>> that
>>>>> we
>>>>>>>> should
>>>>>>>>>> be
>>>>>>>>>>>>> adding
>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>> api functions for `map`,`aggregate`,`flatMap`
>>> and
>>>>>>>>>>>> `flatAggregate`.
>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>> the same could be achieved by changing
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> table.map(F('x))
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> table.select(F('x)).unnest()
>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> table.select(F('x).unnest())
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Where `unnest()` means unnest row/tuple type
>>>> into a
>>>>>>>>> columnar
>>>>>>>>>>>> table.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> table.flatMap(F('x))
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Could be on the other hand also handled by
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> table.select(F('x))
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> By correctly deducing that F(x) is a multi
>> row
>>>>> output
>>>>>>>>>> function
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Same might apply to `aggregate(F('x))`, but
>>> this
>>>>>> maybe
>>>>>>>>> could
>>>>>>>>>> be
>>>>>>>>>>>>>>>> replaced
>>>>>>>>>>>>>>>>>>> by:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> table.groupBy(…).select(F('x).unnest())
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Adding scalar functions should also be
>>> possible:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> table.groupBy('k).select(F('x).unnest(), ‘k)
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Maybe such approach would allow us to
>> implement
>>>> the
>>>>>>> same
>>>>>>>>>>> features
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> SQL as well?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <
>>>>>>>>> chenghe...@gmail.com
>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thank you all for the great proposal and
>>>>> discussion!
>>>>>>>>>>>>>>>>>>>> I also prefer to move on to the next step,
>> so
>>> +1
>>>>> for
>>>>>>>>> opening
>>>>>>>>>>> the
>>>>>>>>>>>>>> JIRAs
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> start the work.
>>>>>>>>>>>>>>>>>>>> We can have more detailed discussion there.
>>> Btw,
>>>>> we
>>>>>>> can
>>>>>>>>>> start
>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> JIRAs
>>>>>>>>>>>>>>>>>>>> which we have agreed on.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Hequn
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan
>>> Wang <
>>>>>>>>>>>>> wshaox...@gmail.com
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> +1. I agree that we should open the JIRAs
>> to
>>>>> start
>>>>>>> the
>>>>>>>>>> work.
>>>>>>>>>>> We
>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>>>> have better ideas on the flavor of the
>>>> interface
>>>>>> when
>>>>>>>>>>>>>>>> implement/review
>>>>>>>>>>>>>>>>>>>>> the code.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>> shaoxuan
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On 11/20/18, jincheng sun <
>>>>>> sunjincheng...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Thanks all for the feedback.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> @Piotr About not using abbreviations
>> naming,
>>>>> +1,I
>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>> your proposal!Currently both DataSet and
>>>>>> DataStream
>>>>>>>> API
>>>>>>>>>> are
>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>>> `aggregate`,
>>>>>>>>>>>>>>>>>>>>>> BTW,I find other language also not using
>>>>>>> abbreviations
>>>>>>>>>>>>> naming,such
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> R.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Sometimes the interface of the API is
>> really
>>>>>>> difficult
>>>>>>>>> to
>>>>>>>>>>>>> perfect,
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>> to spend a lot of time thinking and
>> feedback
>>>>> from
>>>>>> a
>>>>>>>>> large
>>>>>>>>>>>> number
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> users,
>>>>>>>>>>>>>>>>>>>>>> and constantly improve, but for backward
>>>>>>> compatibility
>>>>>>>>>>> issues,
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> have to
>>>>>>>>>>>>>>>>>>>>>> adopt the most conservative approach when
>>>>>> designing
>>>>>>>> the
>>>>>>>>>>> API(Of
>>>>>>>>>>>>>>>>>>> course, I
>>>>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>>> more in favor of developing more rich
>>>> features,
>>>>>> when
>>>>>>>> we
>>>>>>>>>>>> discuss
>>>>>>>>>>>>>>>>>>> clearly).
>>>>>>>>>>>>>>>>>>>>>> Therefore, I propose to divide the
>> function
>>>>>>>>> implementation
>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> map/faltMap/agg/flatAgg into basic
>> functions
>>>> of
>>>>>>> JIRAs
>>>>>>>>> and
>>>>>>>>>>>> JIRAs
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> support time attributes and groupKeys. We
>>> can
>>>>>>> develop
>>>>>>>>> the
>>>>>>>>>>>>> features
>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>> we  have already agreed on the design. And
>>> we
>>>>> will
>>>>>>>>>> continue
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> discuss
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> uncertain design.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> In fact, in addition to the design of
>> APIs,
>>>>> there
>>>>>>> will
>>>>>>>>> be
>>>>>>>>>>>>> various
>>>>>>>>>>>>>>>>>>>>>> performance optimization details, such as:
>>>> table
>>>>>>>>> Aggregate
>>>>>>>>>>>>>> function
>>>>>>>>>>>>>>>>>>>>>> emitValue will generate multiple
>> calculation
>>>>>>> results,
>>>>>>>> in
>>>>>>>>>>>> extreme
>>>>>>>>>>>>>>>>>>> cases,
>>>>>>>>>>>>>>>>>>>>>> each record will trigger a large number of
>>>>> retract
>>>>>>>>>> messages,
>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>> poor performance,so we will also optimize
>>> the
>>>>>>>> interface
>>>>>>>>>>>> design,
>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>> adding the emitWithRetractValue interface
>> (I
>>>>> have
>>>>>>>>> updated
>>>>>>>>>>> the
>>>>>>>>>>>>>> google
>>>>>>>>>>>>>>>>>>> doc)
>>>>>>>>>>>>>>>>>>>>>> to allow the user to optionally perform
>>>>>> incremental
>>>>>>>>>>>>> calculations,
>>>>>>>>>>>>>>>> thus
>>>>>>>>>>>>>>>>>>>>>> avoiding a large number of retracts.
>> Details
>>>>> like
>>>>>>> this
>>>>>>>>> are
>>>>>>>>>>>>>> difficult
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> fully discuss in the mail list, so I
>>> recommend
>>>>>>>> creating
>>>>>>>>>>>>> JIRAs/FLIP
>>>>>>>>>>>>>>>>>>> first,
>>>>>>>>>>>>>>>>>>>>>> we develop designs that have been agreed
>>> upon
>>>>> and
>>>>>>>>> continue
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> discuss
>>>>>>>>>>>>>>>>>>>>>> non-deterministic designs!  What do you
>>> think?
>>>>>>>> @Fabian &
>>>>>>>>>>>> Piotr &
>>>>>>>>>>>>>>>>>>> XiaoWei
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com>
>>>>> 于2018年11月19日周一
>>>>>>>>>>> 上午12:07写道:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian & Piotr, thanks for the
>> feedback!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> I appreciate your concerns, both on
>>> timestamp
>>>>>>>>> attributes
>>>>>>>>>> as
>>>>>>>>>>>>> well
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>> implicit group keys. At the same time,
>> I'm
>>>> also
>>>>>>>>> concerned
>>>>>>>>>>>> with
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>>> approach of allowing Expression* as
>>>> parameters,
>>>>>>>>>> especially
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> flatMap/flatAgg. So far, we never
>> allowed a
>>>>>> scalar
>>>>>>>>>>> expression
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> appear
>>>>>>>>>>>>>>>>>>>>>>> together with table expressions. With the
>>>>>>> Expression*
>>>>>>>>>>>> approach,
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>> happen for the parameters to
>>> flatMap/flatAgg.
>>>>>> I'm a
>>>>>>>> bit
>>>>>>>>>>>>> concerned
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> fully understand the consequences when we
>>> try
>>>>> to
>>>>>>>> extend
>>>>>>>>>> our
>>>>>>>>>>>>>> system
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> future. I would be extra cautious in
>> doing
>>>>> this.
>>>>>> To
>>>>>>>>> avoid
>>>>>>>>>>>>> this, I
>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>> implicit group key for flatAgg is safer.
>>> For
>>>>>>> flatMap,
>>>>>>>>> if
>>>>>>>>>>>> users
>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>>>>>>>>>> the rowtime column, he can use
>>>> crossApply/join
>>>>>>>> instead.
>>>>>>>>>> So
>>>>>>>>>>> we
>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>> losing any real functionality here.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Also a clarification on the following
>>>> example:
>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a
>>>> group
>>>>>>> key.
>>>>>>>>>>>>>>>>>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2,
>>>> 'col1,
>>>>>>>> 'col2)
>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as
>> 'rtime)
>>>>>>>>>>>>>>>>>>>>>>> If we did not have the select clause in
>>> this
>>>>>>> example,
>>>>>>>>> we
>>>>>>>>>>> will
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> 'w as
>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>> regular column in the output. It should
>> not
>>>>>>> magically
>>>>>>>>>>>>> disappear.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> The concern is not as strong for
>>>>>>> Table.map/Table.agg
>>>>>>>>>>> because
>>>>>>>>>>>> we
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>> mixing scalar and table expressions. But
>> we
>>>>> also
>>>>>>> want
>>>>>>>>> to
>>>>>>>>>>> be a
>>>>>>>>>>>>> bit
>>>>>>>>>>>>>>>>>>>>>>> consistent with these methods. If we used
>>>>>> implicit
>>>>>>>>> group
>>>>>>>>>>> keys
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> Table.flatAgg, we probably should do the
>>> same
>>>>> for
>>>>>>>>>>> Table.agg.
>>>>>>>>>>>>> Now
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>> have to choose what to do with
>> Table.map. I
>>>> can
>>>>>> see
>>>>>>>>> good
>>>>>>>>>>>>>> arguments
>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>> both sides. But starting with a single
>>>>> Expression
>>>>>>>> seems
>>>>>>>>>>> safer
>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> we can always extend to Expression* in
>> the
>>>>>> future.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> While thinking about this problem, it
>>> appears
>>>>>> that
>>>>>>> we
>>>>>>>>> may
>>>>>>>>>>>> need
>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> our handling of watermarks for SQL/Table
>>> API.
>>>>> Our
>>>>>>>>> current
>>>>>>>>>>> way
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> propagating the watermarks from source
>> all
>>>> the
>>>>>> way
>>>>>>> to
>>>>>>>>>> sink
>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> optimal. For example, after a tumbling
>>>> window,
>>>>>> the
>>>>>>>>>>> watermark
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>>>> be advanced to just before the expiring
>> of
>>>> next
>>>>>>>>> window. I
>>>>>>>>>>>> think
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> general, each operator may need to
>> generate
>>>> new
>>>>>>>>>> watermarks
>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> simply propagating them. Once we accept
>>> that
>>>>>>>> watermarks
>>>>>>>>>> may
>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>>> during
>>>>>>>>>>>>>>>>>>>>>>> the execution, it appears that the
>>> timestamp
>>>>>>> columns
>>>>>>>>> may
>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>> change, as
>>>>>>>>>>>>>>>>>>>>>>> long as we have some way to associate
>>>> watermark
>>>>>>> with
>>>>>>>>> it.
>>>>>>>>>> My
>>>>>>>>>>>>>>>>>>> intuition is
>>>>>>>>>>>>>>>>>>>>>>> that once we have a through solution for
>>> the
>>>>>>>> watermark
>>>>>>>>>>> issue,
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> able to solve the problem we encountered
>>> for
>>>>>>>> Table.map
>>>>>>>>>> in a
>>>>>>>>>>>>>> cleaner
>>>>>>>>>>>>>>>>>>> way.
>>>>>>>>>>>>>>>>>>>>>>> But this is a complex issue which
>> deserves
>>> a
>>>>>>>> discussion
>>>>>>>>>> on
>>>>>>>>>>>> its
>>>>>>>>>>>>>> own.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr
>>>>> Nowojski <
>>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Isn’t the problem of multiple
>> expressions
>>>>>> limited
>>>>>>>> only
>>>>>>>>>> to
>>>>>>>>>>>>>>>> `flat***`
>>>>>>>>>>>>>>>>>>>>>>>> functions and to be more specific only
>> to
>>>>> having
>>>>>>> two
>>>>>>>>> (or
>>>>>>>>>>>> more)
>>>>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>> table functions passed as an
>> expressions?
>>>>>>>>>>>>>> `.flatAgg(TableAggA('a),
>>>>>>>>>>>>>>>>>>>>>>>> scalarFunction1(‘b),
>> scalarFunction2(‘c))`
>>>>> seems
>>>>>>> to
>>>>>>>> be
>>>>>>>>>>> well
>>>>>>>>>>>>>>>> defined
>>>>>>>>>>>>>>>>>>>>>>>> (duplicate result of every scalar
>> function
>>>> to
>>>>>>> every
>>>>>>>>>>> record.
>>>>>>>>>>>> Or
>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>> missing
>>>>>>>>>>>>>>>>>>>>>>>> something?
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Another remark, I would be in favour of
>>> not
>>>>>> using
>>>>>>>>>>>>> abbreviations
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> naming
>>>>>>>>>>>>>>>>>>>>>>>> `agg` -> `aggregate`, `flatAgg` ->
>>>>>>> `flatAggregate`.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian
>> Hueske <
>>>>>>>>>>> fhue...@gmail.com
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I said before, that I think that the
>>>> append()
>>>>>>>> method
>>>>>>>>> is
>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>>>> implicitly forwarding keys, but still,
>> I
>>>>>> believe
>>>>>>> it
>>>>>>>>>> adds
>>>>>>>>>>>>>>>>>>> unnecessary
>>>>>>>>>>>>>>>>>>>>>>>> boiler
>>>>>>>>>>>>>>>>>>>>>>>>> plate code.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, I haven't seen a convincing
>>>>> argument
>>>>>>> why
>>>>>>>>>>>>>>>> map(Expression*)
>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>> worse than map(Expression). In either
>>> case
>>>> we
>>>>>>> need
>>>>>>>> to
>>>>>>>>>> do
>>>>>>>>>>>> all
>>>>>>>>>>>>>>>> kinds
>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> checks to prevent invalid use of
>>> functions.
>>>>>>>>>>>>>>>>>>>>>>>>> If the method is not correctly used, we
>>> can
>>>>>> emit
>>>>>>> a
>>>>>>>>> good
>>>>>>>>>>>> error
>>>>>>>>>>>>>>>>>>>>> message
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> documenting map(Expression*) will be
>>> easier
>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>>> map(append(Expression*)),
>>>>>>>>>>>>>>>>>>>>>>>>> in my opinion.
>>>>>>>>>>>>>>>>>>>>>>>>> I think we should not add unnessary
>>> syntax
>>>>>> unless
>>>>>>>>> there
>>>>>>>>>>> is
>>>>>>>>>>>> a
>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>>> reason
>>>>>>>>>>>>>>>>>>>>>>>>> and to be honest, I haven't seen this
>>>> reason
>>>>>> yet.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the groupBy.agg() method, I
>>> think
>>>>> it
>>>>>>>> should
>>>>>>>>>>>> behave
>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>>> other method, i.e., not do any implicit
>>>>>>> forwarding.
>>>>>>>>>>>>>>>>>>>>>>>>> Let's take the example of the windowed
>>>> group
>>>>>> by,
>>>>>>>> that
>>>>>>>>>> you
>>>>>>>>>>>>>> posted
>>>>>>>>>>>>>>>>>>>>>>> before.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be
>> a
>>>>> group
>>>>>>> key.
>>>>>>>>>>>>>>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1,
>>>> 'col2)
>>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as
>> 'rtime)
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> What happens if 'w.rowtime is not
>>> selected?
>>>>>> What
>>>>>>> is
>>>>>>>>> the
>>>>>>>>>>>> data
>>>>>>>>>>>>>> type
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> field 'w in the resulting Table? Is it
>> a
>>>>>> regular
>>>>>>>>> field
>>>>>>>>>> at
>>>>>>>>>>>> all
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>> system field that disappears if it is
>> not
>>>>>>> selected?
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> IMO, the following syntax is shorter,
>>> more
>>>>>>>> explicit,
>>>>>>>>>> and
>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>>>>>>>>>>> aligned
>>>>>>>>>>>>>>>>>>>>>>>>> with the regular window.groupBy.select
>>>>>>> aggregations
>>>>>>>>>> that
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>>>>> today.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be
>> a
>>>>> group
>>>>>>> key.
>>>>>>>>>>>>>>>>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2,
>>>> agg('a))
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr
>>> schrieb
>>>>>>> jincheng
>>>>>>>>>> sun <
>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian/Xiaowei,
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> I am very sorry for my late reply!
>> Glad
>>> to
>>>>> see
>>>>>>>> your
>>>>>>>>>>> reply,
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>> sounds
>>>>>>>>>>>>>>>>>>>>>>>>>> pretty good!
>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the approach with
>> append()
>>>>> which
>>>>>>> can
>>>>>>>>>>> clearly
>>>>>>>>>>>>>>>> defined
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> result schema is better which Fabian
>>>>>> mentioned.
>>>>>>>>>>>>>>>>>>>>>>>>>> In addition and append() and also
>>> contains
>>>>>>>> non-time
>>>>>>>>>>>>>> attributes,
>>>>>>>>>>>>>>>>>>>>>>>>>> e.g.:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime)
>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(append(udf('name), 'address,
>>>>>>>>>> 'rowtime).as('col1,
>>>>>>>>>>>>>> 'col2,
>>>>>>>>>>>>>>>>>>>>>>>>>> 'address, 'rowtime)
>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on
>> 'rowtime
>>>> as
>>>>>> 'w)
>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'address)
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> In this way the append() is very
>> useful,
>>>> and
>>>>>> the
>>>>>>>>>>> behavior
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>>> similar
>>>>>>>>>>>>>>>>>>>>>>>>>> to withForwardedFields() in DataSet.
>>>>>>>>>>>>>>>>>>>>>>>>>> So +1 to using append() approach for
>> the
>>>>>>>>>>> map()&flatmap()!
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> But how about the agg() and flatAgg()?
>>> In
>>>>>>>>> agg/flatAgg
>>>>>>>>>>>> case I
>>>>>>>>>>>>>>>> agree
>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei's approach that define the
>> keys
>>> to
>>>>> be
>>>>>>>>> implied
>>>>>>>>>> in
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> result
>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>> and appears at the beginning, for
>>> example
>>>> as
>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should
>> be a
>>>>> group
>>>>>>>> key.
>>>>>>>>>>>>>>>>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1,
>>>> 'col2)
>>>>>>>>>>>>>>>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as
>>> 'rtime)
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> What to you think? @Fabian @Xiaowei
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com>
>>>>>> 于2018年11月9日周五
>>>>>>>>>>> 下午6:35写道:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the summary!
>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the approach with append()
>>> better
>>>>> than
>>>>>>> the
>>>>>>>>>>>> implicit
>>>>>>>>>>>>>>>>>>>>>>>>>>> forwarding
>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>> clearly indicates which fields are
>>>>> forwarded.
>>>>>>>>>>>>>>>>>>>>>>>>>>> However, I don't see much benefit
>> over
>>>> the
>>>>>>>>>>>>>> flatMap(Expression*)
>>>>>>>>>>>>>>>>>>>>>>>> variant,
>>>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>> we would still need to analyze the
>> full
>>>>>>>> expression
>>>>>>>>>> tree
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> ensure
>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>>>>>>>> most (or exactly?) one Scalar /
>>>>> TableFunction
>>>>>>> is
>>>>>>>>>> used.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr
>>> schrieb
>>>>>>>> jincheng
>>>>>>>>>> sun
>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>:
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> We are discussing very detailed
>>> content
>>>>>> about
>>>>>>>> this
>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>> trying
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to design the API in many aspects
>>>>>>>> (functionality,
>>>>>>>>>>>>>>>> compatibility,
>>>>>>>>>>>>>>>>>>>>>>> ease
>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>> use, etc.). I think this is a very
>>> good
>>>>>>> process.
>>>>>>>>>> Only
>>>>>>>>>>>>> such a
>>>>>>>>>>>>>>>>>>>>>>> detailed
>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion, In order to develop PR
>>> more
>>>>>>> clearly
>>>>>>>>> and
>>>>>>>>>>>>> smoothly
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>>>>>>>>>>>> stage. I am very grateful to @Fabian
>>> and
>>>>>>>> @Xiaowei
>>>>>>>>>> for
>>>>>>>>>>>>>>>> sharing a
>>>>>>>>>>>>>>>>>>>>>>>>>>>> lot
>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>> good ideas.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the definition of method
>>>> signatures
>>>>> I
>>>>>>> want
>>>>>>>>> to
>>>>>>>>>>>> share
>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>>>>> points
>>>>>>>>>>>>>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>>>>>>>>>>> which I am discussing with fabian in
>>>>> google
>>>>>>> doc
>>>>>>>>> (not
>>>>>>>>>>> yet
>>>>>>>>>>>>>>>>>>>>>>>>>>>> completed),
>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Assume we have a table:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> val tab = util.addTable[(Long,
>>>>>>>> String)]("MyTable",
>>>>>>>>>>>> 'long,
>>>>>>>>>>>>>>>>>>>>> 'string,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'proctime.proctime)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Approach 1:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> case1: Map follows Source Table
>>>>>>>>>>>>>>>>>>>>>>>>>>>> val result =
>>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(udf('string)).as('proctime,
>>>> 'col1,
>>>>>>>>> 'col2)//
>>>>>>>>>>>>> proctime
>>>>>>>>>>>>>>>>>>>>>>> implied
>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the output
>>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on
>>>> 'proctime
>>>>> as
>>>>>>> 'w)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian
>>>>>> mentioned
>>>>>>>>>> above)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> val result =
>>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .groupBy('w, 'k1, 'k2) // 'w
>> should
>>>> be
>>>>> a
>>>>>>>> group
>>>>>>>>>> key.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .flatAgg(tabAgg('a)).as('k1, 'k2,
>>> 'w,
>>>>>>> 'col1,
>>>>>>>>>> 'col2)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .select('k1, 'col1, 'w.rowtime as
>>>>> 'rtime)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s
>>>> approach,
>>>>>>> which
>>>>>>>>> the
>>>>>>>>>>>> result
>>>>>>>>>>>>>>>>>>> schema
>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>> clearly defined, but add a built-in
>>>> append
>>>>>>> UDF.
>>>>>>>>> That
>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface
>> only
>>>>>> accept
>>>>>>>> one
>>>>>>>>>>>>>> Expression.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> val result =
>>>>>>>>>>>>>>>>>>>>>>>>>>>> tab.map(append(udf('string), 'long,
>>>>>>> 'proctime))
>>>>>>>> as
>>>>>>>>>>>> ('col1,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'col2,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'long, 'proctime)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.millis on
>>>> 'proctime
>>>>>> as
>>>>>>>> 'w)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Note: Append is a special UDF for
>>>> built-in
>>>>>>> that
>>>>>>>>> can
>>>>>>>>>>> pass
>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>>>>>> column.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, May be we can defined the as
>>>>>>>>>>> table.map(Expression)
>>>>>>>>>>>>>>>> first,
>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>>>>>> necessary, we can extend to
>>>>>>>> table.map(Expression*)
>>>>>>>>>> in
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> future
>>>>>>>>>>>>>>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>>>>>>>>> Of
>>>>>>>>>>>>>>>>>>>>>>>>>>>> course, I also hope that we can do
>>> more
>>>>>>>> perfection
>>>>>>>>>> in
>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal
>>>>>>>>>>>>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com>
>>>>>>>> 于2018年11月7日周三
>>>>>>>>>>>>> 下午11:45写道:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that the key question you
>>>> raised
>>>>> is
>>>>>>> if
>>>>>>>> we
>>>>>>>>>>> allow
>>>>>>>>>>>>>> extra
>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the methods
>> map/flatMap/agg/flatAgg.
>>> I
>>>>> can
>>>>>>> see
>>>>>>>>> why
>>>>>>>>>>>>> allowing
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>>>>>>>>>>> appear
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more convenient in some cases.
>>> However,
>>>>> it
>>>>>>>> might
>>>>>>>>>> also
>>>>>>>>>>>>> cause
>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>>> confusions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we do that. For example, do we
>>> allow
>>>>>>>> multiple
>>>>>>>>>> UDFs
>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>>>>>>>> expressions?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we do, the semantics may be
>> weird
>>> to
>>>>>>> define,
>>>>>>>>>> e.g.
>>>>>>>>>>>> what
>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> table.groupBy('k).flatAgg(TableAggA('a),
>>>>>>>>>>> TableAggB('b))
>>>>>>>>>>>>>> mean?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even
>>>>>>>>>>>>>>>>>>>>>>>>>>> though
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not allowing it may appear less
>>>> powerful,
>>>>>> but
>>>>>>>> it
>>>>>>>>>> can
>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>> things
>>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> intuitive too. In the case of
>>>>> agg/flatAgg,
>>>>>> we
>>>>>>>> can
>>>>>>>>>>>> define
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> keys
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implied in the result table and
>>> appears
>>>>> at
>>>>>>> the
>>>>>>>>>>>> beginning.
>>>>>>>>>>>>>> You
>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>> use a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> select method if you want to modify
>>>> this
>>>>>>>>> behavior.
>>>>>>>>>> I
>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>> eventually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we will have some API which allows
>>>> other
>>>>>>>>>> expressions
>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters, but I think it's better
>>> to
>>>> do
>>>>>>> that
>>>>>>>>>> after
>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> introduce
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concept of nested tables. A lot of
>>>> things
>>>>>> we
>>>>>>>>>>> suggested
>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> considered as special cases of
>> that.
>>>> But
>>>>>>> things
>>>>>>>>> are
>>>>>>>>>>>> much
>>>>>>>>>>>>>>>>>>> simpler
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave that to later.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM
>> Fabian
>>>>>> Hueske
>>>>>>> <
>>>>>>>>>>>>>>>>>>> fhue...@gmail.com
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re emit:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think we should start with a
>> well
>>>>>>> understood
>>>>>>>>>>>> semantics
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> full
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replacement. This is how the other
>>> agg
>>>>>>>> functions
>>>>>>>>>>> work.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As was said before, there are open
>>>>>> questions
>>>>>>>>>>> regarding
>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>> append
>>>>>>>>>>>>>>>>>>>>>>>>>> mode
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (checkpointing, whether supporting
>>>>>>> retractions
>>>>>>>>> or
>>>>>>>>>>> not
>>>>>>>>>>>>> and
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>> yes
>>>>>>>>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare them, ...).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Since this seems to be an
>>>> optimization,
>>>>>> I'd
>>>>>>>>>> postpone
>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re grouping keys:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think we should
>>> automatically
>>>>> add
>>>>>>> them
>>>>>>>>>>> because
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> result
>>>>>>>>>>>>>>>>>>>>>>>>>>>> schema
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would not be intuitive.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would they be added at the
>> beginning
>>>> of
>>>>>> the
>>>>>>>>> tuple
>>>>>>>>>> or
>>>>>>>>>>>> at
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> end?
>>>>>>>>>>>>>>>>>>>>>>>>>> What
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields of windows would
>> be
>>>>> added?
>>>>>>> In
>>>>>>>>>> which
>>>>>>>>>>>>> order
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, we could support syntax
>>> like
>>>>>> this:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val t: Table = ???
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> t
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .window(Tumble ... as 'w)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .groupBy('a, 'b)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)),
>>>> 'w.end
>>>>> as
>>>>>>>>> 'wend,
>>>>>>>>>>>>>> 'w.rowtime
>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'rtime)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The result schema would be clearly
>>>>> defined
>>>>>>> as
>>>>>>>>> [b,
>>>>>>>>>> a,
>>>>>>>>>>>> f1,
>>>>>>>>>>>>>> f2,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ...,
>>>>>>>>>>>>>>>>>>>>>>>>>> fn,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wend,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the
>>> result
>>>>>>>>> attributes
>>>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> UDF.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Re Multi-staged evaluation:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think this should be an
>>> optimization
>>>>>> that
>>>>>>>> can
>>>>>>>>> be
>>>>>>>>>>>>> applied
>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> UDF
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implements the merge() method.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr
>>>>> schrieb
>>>>>>>>> Shaoxuan
>>>>>>>>>>>> Wang
>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wshaox...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi xiaowei,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I agree with you that the
>>>>> semantics
>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>> TableAggregateFunction
>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> much more complex than
>>>>> AggregateFunction.
>>>>>>> The
>>>>>>>>>>>>> fundamental
>>>>>>>>>>>>>>>>>>>>>>>>>>> difference
>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that TableAggregateFunction
>> emits a
>>>>>> "table"
>>>>>>>>> while
>>>>>>>>>>>>>>>>>>>>>>>>>> AggregateFunction
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outputs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (a column of) a "row". In the
>> case
>>> of
>>>>>>>>>>>> AggregateFunction
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mode which is “replacing”
>> (complete
>>>>>>> update).
>>>>>>>>> But
>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableAggregateFunction, it could
>> be
>>>>>>>> incremental
>>>>>>>>>>> (only
>>>>>>>>>>>>>> emit
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> updated
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results) update or complete
>> update
>>>>>> (always
>>>>>>>> emit
>>>>>>>>>> the
>>>>>>>>>>>>>> entire
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “emit" is triggered).  From the
>>>>>> performance
>>>>>>>>>>>>> perspective,
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use incremental update. But we
>> need
>>>>>> review
>>>>>>>> and
>>>>>>>>>>> design
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>> carefully,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> especially taking into account
>> the
>>>>> cases
>>>>>> of
>>>>>>>> the
>>>>>>>>>>>>> failover
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (instead
>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> back-up the ACC it may also needs
>>> to
>>>>>>> remember
>>>>>>>>> the
>>>>>>>>>>>> emit
>>>>>>>>>>>>>>>>>>> offset)
>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> retractions, as the semantics of
>>>>>>>>>>>> TableAggregateFunction
>>>>>>>>>>>>>>>> emit
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> than other UDFs. TableFunction
>> also
>>>>>> emits a
>>>>>>>>>> table,
>>>>>>>>>>>> but
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> worry this due to the nature of
>>>>>> stateless.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shaoxuan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM
>>>> Xiaowei
>>>>>>> Jiang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <xiaow...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for adding the public
>>>>>> interfaces! I
>>>>>>>>> think
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> it's a
>>>>>>>>>>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> start. There are a few points
>> that
>>>> we
>>>>>> need
>>>>>>>> to
>>>>>>>>>> have
>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - TableAggregateFunction - this
>>> is a
>>>>>> very
>>>>>>>>>> complex
>>>>>>>>>>>>> beast,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> definitely
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most complex user defined
>> objects
>>> we
>>>>>>>>> introduced
>>>>>>>>>> so
>>>>>>>>>>>>> far.
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite some interesting questions
>>>> here.
>>>>>> For
>>>>>>>>>>> example,
>>>>>>>>>>>> do
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multi-staged TableAggregate in
>>> this
>>>>>> case?
>>>>>>>> What
>>>>>>>>>> is
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit? Is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it amendments to the previous
>>>> output,
>>>>> or
>>>>>>>>>> replacing
>>>>>>>>>>>>> it? I
>>>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> subject itself is worth a
>>> discussion
>>>>> to
>>>>>>> make
>>>>>>>>>> sure
>>>>>>>>>>> we
>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the
>>> group
>>>>> keys
>>>>>>>>>>>> automatically
>>>>>>>>>>>>>>>>>>>>>>>>>> appear
>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output? how about the case of
>>>>> windowing
>>>>>>>>>>> aggregation?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM
>>>>> jincheng
>>>>>>> sun
>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, Xiaowei,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss
>>> of
>>>>>> Table
>>>>>>>> API
>>>>>>>>>>>>>> Enhancement
>>>>>>>>>>>>>>>>>>>>>>>>>>> Outline
>>>>>>>>>>>>>>>>>>>>>>>>>>>> !
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I quickly looked at the overall
>>>>>> content,
>>>>>>>>> these
>>>>>>>>>>> are
>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>>>>>>>> expressions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offline discussions. But from
>> the
>>>>>> points
>>>>>>> of
>>>>>>>>> my
>>>>>>>>>>>> view,
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> usage of public interfaces that
>>> we
>>>>> will
>>>>>>>>>> introduce
>>>>>>>>>>>> in
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>> propose.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added the following usage
>>>> description
>>>>>> of
>>>>>>>>>>> interface
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>> operators
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> google doc:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Map Operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map operator is a new operator
>> of
>>>>>> Table,
>>>>>>>> Map
>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scalar function, and can return
>>>>>>>> multi-column.
>>>>>>>>>> The
>>>>>>>>>>>>> usage
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .map(fun:
>> ScalarFunction).as(‘a,
>>>> ‘b,
>>>>>> ‘c)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. FlatMap Operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FaltMap operator is a new
>>> operator
>>>> of
>>>>>>>> Table,
>>>>>>>>>>>> FlatMap
>>>>>>>>>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a table function, and can
>> return
>>>>>>> multi-row.
>>>>>>>>> The
>>>>>>>>>>>> usage
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .flatMap(fun:
>>>> TableFunction).as(‘a,
>>>>>> ‘b,
>>>>>>>> ‘c)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Agg Operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Agg operator is a new operator
>> of
>>>>>>>>>>>> Table/GroupedTable,
>>>>>>>>>>>>>>>> Agg
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply a aggregate function, and
>>> can
>>>>>>> return
>>>>>>>>>>>>>> multi-column.
>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>>>>> usage
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .groupBy(‘a) // leave
>>>>> groupBy-Clause
>>>>>>> out
>>>>>>>> to
>>>>>>>>>>>> define
>>>>>>>>>>>>>>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .agg(fun:
>>>> AggregateFunction).as(‘a,
>>>>>> ‘b,
>>>>>>>> ‘c)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4.  FlatAgg Operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FlatAgg operator is a new
>>> operator
>>>> of
>>>>>>>>>>>>>>>> Table/GroupedTable,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FaltAgg
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator can apply a table
>>>> aggregate
>>>>>>>>> function,
>>>>>>>>>>> and
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multi-row.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The usage as follows:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .groupBy(‘a) // leave
>>>>> groupBy-Clause
>>>>>>> out
>>>>>>>>> to
>>>>>>>>>>>> define
>>>>>>>>>>>>>>>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .flatAgg(fun:
>>>>>>>>> TableAggregateFunction).as(‘a,
>>>>>>>>>>> ‘b,
>>>>>>>>>>>>> ‘c)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The behavior of table
>> aggregates
>>>> is
>>>>>> most
>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> GroupReduceFunction
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> did,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which computed for a group of
>>>>> elements,
>>>>>>> and
>>>>>>>>>>>> output  a
>>>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elements.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The TableAggregateFunction can
>> be
>>>>>> applied
>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> GroupedTable.flatAgg() .
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface of
>>> TableAggregateFunction
>>>>>> has a
>>>>>>>> lot
>>>>>>>>>> of
>>>>>>>>>>>>>> content,
>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> copy
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it here, Please look at the
>>> detail
>>>> in
>>>>>>>> google
>>>>>>>>>> doc:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will be very appreciate to
>>> anyone
>>>>> for
>>>>>>>>>> reviewing
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>> commenting.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> -----------------------------------------------------------------------------------
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> *Rome was not built in one day*
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> -----------------------------------------------------------------------------------
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Reply via email to