I don't think we came to a conclusion yet.
Are you suggesting that 'w would disappear if we do the following:

val tabX = tab.window(Tumble ... as 'w)
    .groupBy('w, 'k1, 'k2)
    .flatAgg(tableAgg('a))
    .select('*)

Given that tableAgg returns [col1, col2], what would the schema of tabX be?

1) [col1, col2]: fine with me
2) [k1, k2, col1, col2]: I'm very skeptical about this option because it is
IMO inconsistent. Users will ask, where did 'w go.
2) [w, k1, k2, col1, col2]: back to our original problem. What's the data
type of 'w?

If we go for option 1, we can still allow to select keys.

val tabX2 = tab.window(Tumble ... as 'w)
    .groupBy('w, 'k1, 'k2)
    .flatAgg(tableAgg('a))
    .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)

The good thing about enforcing select is that the result of flatAgg is not
a Table, i.e., the definition of the operation is not completed yet.
On the other hand, we would need to list all result attributes of the table
function if we want to select keys. We could to that with a short cut:

val tabX3 = tab.window(Tumble ... as 'w)
    .groupBy('w, 'k1, 'k2)
    .flatAgg(tableAgg('a))
    .select('w.rowtime, 'k1, 'k2, '*)


Am Di., 27. Nov. 2018 um 10:46 Uhr schrieb jincheng sun <
sunjincheng...@gmail.com>:

> Thanks Fabian, if we enforcing select, as i said before user should using
> 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX' etc.  In this way we
> should not defined the type of 'w, we can keep the current way of using 'w.
> I'll file the JIRA. and open the PR ASAP.
>
> Thanks,
> Jincheng
>
> Fabian Hueske <fhue...@gmail.com> 于2018年11月27日周二 下午4:50写道:
>
> > I thought about it again.
> > Enforcing select won't help us with the choice of how to represent 'w.
> >
> > select('*) and
> > select('a, 'b, 'w).
> >
> > would still be valid expressions and we need to decide how 'w is
> > represented.
> > As I said before, Tuple, Row, and Map have disadvantages because the
> syntax
> > 'w.rowtime or 'w.end would not be supported.
> >
> > Am Di., 27. Nov. 2018 um 01:05 Uhr schrieb jincheng sun <
> > sunjincheng...@gmail.com>:
> >
> > > Before we have a good support for nest-table, may be forcing the use of
> > > select is good way, at least not causing compatibility issues.
> > >
> > > Fabian Hueske <fhue...@gmail.com> 于2018年11月26日周一 下午6:48写道:
> > >
> > > > I think the question is what is the data type of 'w.
> > > >
> > > > Until now, I assumed it would be a nested tuple (Row or Tuple).
> > > > Accessing nested fields in Row, Tuple or Pojo is done with get, i.e.,
> > > > 'w.get("rowtime").
> > > > Using a Map would not help because the access would be 'w.at
> > ("rowtime").
> > > >
> > > > We can of course also enforce the select() if aggregate /
> flatAggregate
> > > do
> > > > not return a Table but some kind of AggregatedTable that does not
> > provide
> > > > any other function than select().
> > > >
> > > > Am Mo., 26. Nov. 2018 um 01:12 Uhr schrieb jincheng sun <
> > > > sunjincheng...@gmail.com>:
> > > >
> > > > > Yes,I agree the problem is needs attention. IMO. It depends on how
> we
> > > > > define the ‘w type. The way you above defines the 'w type as a
> tuple.
> > > If
> > > > > you serialize 'w to a Map, the compatibility will be better. Even
> > more
> > > we
> > > > > can define ‘w as a special type. UDF and Sink can't be used
> directly.
> > > > Must
> > > > > use 'w.start, 'w.end, 'w.rowtime, 'w.proctime, 'w.XXX', and I will
> be
> > > > very
> > > > > grateful if you can share your solution to this problem,  and we
> also
> > > can
> > > > > discuss it carefully in the PR to be opened. What to you think?
> > > > >
> > > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午6:21写道:
> > > > >
> > > > > > Something like:
> > > > > >
> > > > > > val x = tab.window(Tumble ... as 'w)
> > > > > >     .groupBy('w, 'k1, 'k2)
> > > > > >     .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > > > > >
> > > > > > x.insertInto("sinkTable") // fails because result schema has
> > changed
> > > > from
> > > > > > ((start, end, rowtime), k1, k2, col1, col2) to ((start, end,
> > rowtime,
> > > > > > newProperty), k1, k2, col1, col2)
> > > > > > x.select(myUdf('w)) // fails because UDF expects (start, end,
> > > rowtime)
> > > > > and
> > > > > > not (start, end, rowtime, newProperty)
> > > > > >
> > > > > > Basically, every time when the composite type 'w is used as a
> > whole.
> > > > > >
> > > > > >
> > > > > > Am Fr., 23. Nov. 2018 um 10:45 Uhr schrieb jincheng sun <
> > > > > > sunjincheng...@gmail.com>:
> > > > > >
> > > > > > > Hi Fabian,
> > > > > > >
> > > > > > > I don't fully understand the question you mentioned:
> > > > > > >
> > > > > > > Any query that relies on the composite type with three fields
> > will
> > > > fail
> > > > > > >
> > > > > > > after adding a forth field.
> > > > > > >
> > > > > > >
> > > > > > > I am appreciate if you can give some detail examples ?
> > > > > > >
> > > > > > > Regards,
> > > > > > > JIncheng
> > > > > > >
> > > > > > >
> > > > > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月23日周五 下午4:41写道:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > My concerns are about the case when there is no additional
> > > select()
> > > > > > > method,
> > > > > > > > i.e.,
> > > > > > > >
> > > > > > > > tab.window(Tumble ... as 'w)
> > > > > > > >     .groupBy('w, 'k1, 'k2)
> > > > > > > >     .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> > > > > > > >
> > > > > > > > In this case, 'w is a composite field consisting of three
> > fields
> > > > > (end,
> > > > > > > > start, rowtime).
> > > > > > > > Once we add a new property, it would need to be added to the
> > > > > composite
> > > > > > > > type.
> > > > > > > > Any query that relies on the composite type with three fields
> > > will
> > > > > fail
> > > > > > > > after adding a forth field.
> > > > > > > >
> > > > > > > > Best, Fabian
> > > > > > > >
> > > > > > > > Am Fr., 23. Nov. 2018 um 02:01 Uhr schrieb jincheng sun <
> > > > > > > > sunjincheng...@gmail.com>:
> > > > > > > >
> > > > > > > > > Thanks Fabian,
> > > > > > > > >
> > > > > > > > > Thanks a lot for your feedback, and very important and
> > > necessary
> > > > > > design
> > > > > > > > > reminders!
> > > > > > > > >
> > > > > > > > > Yes, your are right!  Spark is the specified grouping
> columns
> > > > > > displayed
> > > > > > > > > before 1.3, but the grouping columns are implicitly passed
> in
> > > > > > spark1.4
> > > > > > > > and
> > > > > > > > > later. The reason for changing this behavior is that due to
> > the
> > > > > user
> > > > > > > > > feedback. Although implicit delivery will have the
> drawbacks
> > > you
> > > > > > > > mentioned,
> > > > > > > > > this approach is really convenient for the user.
> > > > > > > > > I agree that grouping on windows we have to pay attention
> to
> > > the
> > > > > > > handling
> > > > > > > > > of the window's properties, because we may introduce new
> > window
> > > > > > > property.
> > > > > > > > > So, from the points of view, We delay the processing of the
> > > > window
> > > > > > > > > property, ie: we pass the complex type 'w on the tableAPI,
> > and
> > > > > > provide
> > > > > > > > > different property method operations in the SELECT
> according
> > to
> > > > the
> > > > > > > type
> > > > > > > > of
> > > > > > > > > 'w, such as: 'w.start, 'w.end, 'w.xxx , in the TableAPI
> will
> > > > limit
> > > > > > and
> > > > > > > > > verify the attribute operations that 'w has. An example is
> as
> > > > > > follows:
> > > > > > > > >
> > > > > > > > > tab.window(Tumble ... as 'w)
> > > > > > > > >     .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > > > > > > > >     .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> //
> > 'w
> > > > is
> > > > > > > > > composite field
> > > > > > > > >     .select('k1, 'col1, 'w.rowtime as 'ts, 'w.xxx as 'xx)
> //
> > In
> > > > > > select
> > > > > > > we
> > > > > > > > > will limit and verify  that ’w.xx is allowed
> > > > > > > > >
> > > > > > > > > I am not sure if I fully understand your concerns, if there
> > any
> > > > > > > > understand
> > > > > > > > > are mistakes, please correct me. Any feedback is
> appreciate!
> > > > > > > > >
> > > > > > > > > Bests,
> > > > > > > > > Jincheng
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Fabian Hueske <fhue...@gmail.com> 于2018年11月22日周四
> 下午10:13写道:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > First of all, it is correct that the flatMap(Expression*)
> > and
> > > > > > > > > > flatAggregate(Expression*) methods would mix scalar and
> > table
> > > > > > values.
> > > > > > > > > > This would be a new concept that is not present in the
> > > current
> > > > > API.
> > > > > > > > > > From my point of view, the semantics are quite clear,
> but I
> > > > > > > understand
> > > > > > > > > that
> > > > > > > > > > others are more careful and worry about future
> extensions.
> > > > > > > > > >
> > > > > > > > > > I am fine with going for single expression arguments for
> > > map()
> > > > > and
> > > > > > > > > > flatMap(). We can later expand them to Expression* if we
> > feel
> > > > the
> > > > > > > need
> > > > > > > > > and
> > > > > > > > > > are more comfortable about the implications.
> > > > > > > > > > Whenever, a time attribute needs to be forwarded, users
> can
> > > > fall
> > > > > > back
> > > > > > > > to
> > > > > > > > > > join(TableFunction) as Xiaowei mentioned.
> > > > > > > > > > So we restrict the usability of the new methods but don't
> > > lose
> > > > > > > > > > functionality and don't prevent future extensions.
> > > > > > > > > >
> > > > > > > > > > The aggregate() and flatAggregate() case is more
> difficult
> > > > > because
> > > > > > > > > implicit
> > > > > > > > > > forwarding of grouping fields cannot be changed later
> > without
> > > > > > > breaking
> > > > > > > > > the
> > > > > > > > > > API.
> > > > > > > > > > There are other APIs (e.g., Spark) that also implicitly
> > > forward
> > > > > the
> > > > > > > > > > grouping columns. So this is not uncommon.
> > > > > > > > > > However, I personally don't like that approach, because
> it
> > is
> > > > > > > implicit
> > > > > > > > > and
> > > > > > > > > > introduces a new behavior that is not present in the
> > current
> > > > API.
> > > > > > > > > >
> > > > > > > > > > One thing to consider here is the handling of grouping on
> > > > > windows.
> > > > > > > > > > If I understood Xiaowei correctly, a composite field that
> > is
> > > > > named
> > > > > > > like
> > > > > > > > > the
> > > > > > > > > > window alias (e.g., 'w) would be implicitly added to the
> > > result
> > > > > of
> > > > > > > > > > aggregate() or flatAggregate().
> > > > > > > > > > The composite field would have fields like (start, end,
> > > > rowtime)
> > > > > or
> > > > > > > > > (start,
> > > > > > > > > > end, proctime) depending on the window type.
> > > > > > > > > > If we would ever introduce a fourth window property, we
> > might
> > > > > break
> > > > > > > > > > existing queries.
> > > > > > > > > > Is this something that we should worry about?
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Fabian
> > > > > > > > > >
> > > > > > > > > > Am Do., 22. Nov. 2018 um 14:03 Uhr schrieb Piotr
> Nowojski <
> > > > > > > > > > pi...@data-artisans.com>:
> > > > > > > > > >
> > > > > > > > > > > Hi Jincheng,
> > > > > > > > > > >
> > > > > > > > > > > #1) ok, got it.
> > > > > > > > > > >
> > > > > > > > > > > #3)
> > > > > > > > > > > > From points of my view I we can using
> > > > > > > > > > > > `Expression`, and after the discussion decided to use
> > > > > > > Expression*,
> > > > > > > > > then
> > > > > > > > > > > > improve it. In any case, we can use Expression, and
> > there
> > > > is
> > > > > an
> > > > > > > > > > > opportunity
> > > > > > > > > > > > to become Expression* (compatibility). If we use
> > > > Expression*
> > > > > > > > > directly,
> > > > > > > > > > it
> > > > > > > > > > > > is difficult for us to become Expression, which will
> > > break
> > > > > the
> > > > > > > > > > > > compatibility between versions.  What do you think?
> > > > > > > > > > >
> > > > > > > > > > > I don’t think that’s the case here. If we start with
> > single
> > > > > param
> > > > > > > > > > > `flatMap(Expression)`, it will need implicit columns to
> > be
> > > > > > present
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > result, which:
> > > > > > > > > > >
> > > > > > > > > > > a) IMO it brakes SQL convention (that’s why I’m against
> > > this)
> > > > > > > > > > > b) we can not later easily introduce
> > `flatMap(Expression*)`
> > > > > > without
> > > > > > > > > those
> > > > > > > > > > > implicit columns, without braking the compatibility or
> at
> > > > least
> > > > > > > > without
> > > > > > > > > > > making `flatMap(Expression*)` and `flatMap(Expression)`
> > > > > terribly
> > > > > > > > > > > inconsistent.
> > > > > > > > > > >
> > > > > > > > > > > To elaborate on (a). It’s not nice if our own API is
> > > > > inconsistent
> > > > > > > and
> > > > > > > > > it
> > > > > > > > > > > sometimes behaves one way and sometimes another way:
> > > > > > > > > > >
> > > > > > > > > > > table.groupBy(‘k).select(scalarAggregateFunction(‘v))
> =>
> > > > single
> > > > > > > > column
> > > > > > > > > > > result, just the output of `scalarAggregateFunction`
> > > > > > > > > > > vs
> > > > > > > > > > >
> > table.groupBy(‘k).flatAggregate(tableAggregateFunction(‘v))
> > > > =>
> > > > > > both
> > > > > > > > > > result
> > > > > > > > > > > of `tableAggregateFunction` plus key (and an optional
> > > window
> > > > > > > context
> > > > > > > > ?)
> > > > > > > > > > >
> > > > > > > > > > > Thus I think we have to now decide which way we want to
> > > jump,
> > > > > > since
> > > > > > > > > later
> > > > > > > > > > > will be too late. Or again, am I missing something? :)
> > > > > > > > > > >
> > > > > > > > > > > Piotrek
> > > > > > > > > > >
> > > > > > > > > > > > On 22 Nov 2018, at 02:07, jincheng sun <
> > > > > > sunjincheng...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi Piotrek,
> > > > > > > > > > > > #1)We have unbounded and bounded group window
> > aggregate,
> > > > for
> > > > > > > > > unbounded
> > > > > > > > > > > case
> > > > > > > > > > > > we should early fire the result with retract message,
> > we
> > > > can
> > > > > > not
> > > > > > > > > using
> > > > > > > > > > > > watermark, because unbounded aggregate never
> finished.
> > > (for
> > > > > > > > > improvement
> > > > > > > > > > > we
> > > > > > > > > > > > can introduce micro-batch in feature),  for bounded
> > > window
> > > > we
> > > > > > > never
> > > > > > > > > > > support
> > > > > > > > > > > > early fire, so we do not need retract.
> > > > > > > > > > > > #3)  About validation of
> `table.select(F(‘a).unnest(),
> > > ‘b,
> > > > > > > > > > > > G(‘c).unnest())/table.flatMap(F(‘a), ‘b,
> scalarG(‘c))`
> > > > Fabian
> > > > > > had
> > > > > > > > > > > mentioned
> > > > > > > > > > > > above, please look at the prior mail.  For
> > > > > > `table.flatMap(F(‘a),
> > > > > > > > ‘b,
> > > > > > > > > > > > scalarG(‘c))` that we concerned, i.e.:  we should
> > discuss
> > > > the
> > > > > > > issue
> > > > > > > > > of
> > > > > > > > > > > > `Expression*` vs `Expression`. From points of my
> view I
> > > we
> > > > > can
> > > > > > > > using
> > > > > > > > > > > > `Expression`, and after the discussion decided to use
> > > > > > > Expression*,
> > > > > > > > > then
> > > > > > > > > > > > improve it. In any case, we can use Expression, and
> > there
> > > > is
> > > > > an
> > > > > > > > > > > opportunity
> > > > > > > > > > > > to become Expression* (compatibility). If we use
> > > > Expression*
> > > > > > > > > directly,
> > > > > > > > > > it
> > > > > > > > > > > > is difficult for us to become Expression, which will
> > > break
> > > > > the
> > > > > > > > > > > > compatibility between versions.  What do you think?
> > > > > > > > > > > >
> > > > > > > > > > > > If there anything not clearly, welcome any
> > > > > > feedback!Agains,thanks
> > > > > > > > for
> > > > > > > > > > > share
> > > > > > > > > > > > your thoughts!
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Jincheng
> > > > > > > > > > > >
> > > > > > > > > > > > Piotr Nowojski <pi...@data-artisans.com>
> > 于2018年11月21日周三
> > > > > > > 下午9:37写道:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hi Jincheng,
> > > > > > > > > > > >>
> > > > > > > > > > > >>> #1) No,watermark solves the issue of the late
> event.
> > > > Here,
> > > > > > the
> > > > > > > > > > > >> performance
> > > > > > > > > > > >>> problem is caused by the update emit mode. i.e.:
> When
> > > > > current
> > > > > > > > > > > calculation
> > > > > > > > > > > >>> result is output, the previous calculation result
> > needs
> > > > to
> > > > > be
> > > > > > > > > > > retracted.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Hmm, yes I missed this. For time-windowed cases
> (some
> > > > > > > > > > > >> aggregate/flatAggregate cases) emitting only on
> > > watermark
> > > > > > should
> > > > > > > > > solve
> > > > > > > > > > > the
> > > > > > > > > > > >> problem. For non time windowed cases it would reduce
> > the
> > > > > > amount
> > > > > > > of
> > > > > > > > > > > >> retractions, right? Or am I still missing something?
> > > > > > > > > > > >>
> > > > > > > > > > > >>> #3)I still hope to keep the simplicity that select
> > only
> > > > > > support
> > > > > > > > > > > projected
> > > > > > > > > > > >>> scalar, we can hardly tell the semantics of
> > > > > > > > tab.select(flatmap('a),
> > > > > > > > > > 'b,
> > > > > > > > > > > >>> flatmap('d)).
> > > > > > > > > > > >>
> > > > > > > > > > > >> table.select(F(‘a).unnest(), ‘b, G(‘c).unnest())
> > > > > > > > > > > >>
> > > > > > > > > > > >> Could be rejected during some validation phase. On
> the
> > > > other
> > > > > > > hand:
> > > > > > > > > > > >>
> > > > > > > > > > > >> table.select(F(‘a).unnest(), ‘b, scalarG(‘c))
> > > > > > > > > > > >> or
> > > > > > > > > > > >> table.flatMap(F(‘a), ‘b, scalarG(‘c))
> > > > > > > > > > > >>
> > > > > > > > > > > >> Could work and be more or less a syntax sugar for
> > cross
> > > > > apply.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Piotrek
> > > > > > > > > > > >>
> > > > > > > > > > > >>> On 21 Nov 2018, at 12:16, jincheng sun <
> > > > > > > sunjincheng...@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Hi shaoxuan & Hequn,
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Thanks for your suggestion,I'll file the JIRAs
> later.
> > > > > > > > > > > >>> We can prepare PRs while continuing to move forward
> > the
> > > > > > ongoing
> > > > > > > > > > > >> discussion.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Regards,
> > > > > > > > > > > >>> Jincheng
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> jincheng sun <sunjincheng...@gmail.com>
> > 于2018年11月21日周三
> > > > > > > 下午7:07写道:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>> Hi Piotrek,
> > > > > > > > > > > >>>> Thanks for your feedback, and thanks for  share
> your
> > > > > > thoughts!
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> #1) No,watermark solves the issue of the late
> event.
> > > > Here,
> > > > > > the
> > > > > > > > > > > >> performance
> > > > > > > > > > > >>>> problem is caused by the update emit mode. i.e.:
> > When
> > > > > > current
> > > > > > > > > > > >> calculation
> > > > > > > > > > > >>>> result is output, the previous calculation result
> > > needs
> > > > to
> > > > > > be
> > > > > > > > > > > retracted.
> > > > > > > > > > > >>>> #2) As I mentioned above we should continue the
> > > > discussion
> > > > > > > until
> > > > > > > > > we
> > > > > > > > > > > >> solve
> > > > > > > > > > > >>>> the problems raised by Xiaowei and Fabian.
> > > > > > > > > > > >>>> #3)I still hope to keep the simplicity that select
> > > only
> > > > > > > support
> > > > > > > > > > > >> projected
> > > > > > > > > > > >>>> scalar, we can hardly tell the semantics of
> > > > > > > > > tab.select(flatmap('a),
> > > > > > > > > > > 'b,
> > > > > > > > > > > >>>> flatmap('d)).
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Thanks,
> > > > > > > > > > > >>>> Jincheng
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Piotr Nowojski <pi...@data-artisans.com>
> > > 于2018年11月21日周三
> > > > > > > > 下午5:24写道:
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> Hi,
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> 1.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>> In fact, in addition to the design of APIs,
> there
> > > will
> > > > > be
> > > > > > > > > various
> > > > > > > > > > > >>>>>> performance optimization details, such as: table
> > > > > Aggregate
> > > > > > > > > > function
> > > > > > > > > > > >>>>>> emitValue will generate multiple calculation
> > > results,
> > > > in
> > > > > > > > extreme
> > > > > > > > > > > >> cases,
> > > > > > > > > > > >>>>>> each record will trigger a large number of
> retract
> > > > > > messages,
> > > > > > > > > this
> > > > > > > > > > > will
> > > > > > > > > > > >>>>> have
> > > > > > > > > > > >>>>>> poor performance
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Can this be solved/mitigated by emitting the
> > results
> > > > only
> > > > > > on
> > > > > > > > > > > >> watermarks?
> > > > > > > > > > > >>>>> I think that was the path that we decided to take
> > > both
> > > > > for
> > > > > > > > > Temporal
> > > > > > > > > > > >> Joins
> > > > > > > > > > > >>>>> and upsert stream conversion. I know that this
> > > > increases
> > > > > > the
> > > > > > > > > > latency
> > > > > > > > > > > >> and
> > > > > > > > > > > >>>>> there is a place for a future global setting/user
> > > > > > preference
> > > > > > > > > “emit
> > > > > > > > > > > the
> > > > > > > > > > > >> data
> > > > > > > > > > > >>>>> ASAP mode”, but emitting only on watermarks seems
> > to
> > > me
> > > > > as
> > > > > > a
> > > > > > > > > > > >> better/more
> > > > > > > > > > > >>>>> sane default.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> 2.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> With respect to the API discussion and implicit
> > > > columns.
> > > > > > The
> > > > > > > > > > problem
> > > > > > > > > > > >> for
> > > > > > > > > > > >>>>> me so far is I’m not sure if I like the
> > additionally
> > > > > > > complexity
> > > > > > > > > of
> > > > > > > > > > > >>>>> `append()` solution, while implicit columns are
> > > > > definitely
> > > > > > > not
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > >>>>> spirit of SQL. Neither joins nor aggregations add
> > > extra
> > > > > > > > > unexpected
> > > > > > > > > > > >> columns
> > > > > > > > > > > >>>>> to the result without asking. This definitely can
> > be
> > > > > > > confusing
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > >>>>> users since it brakes the convention. Thus I
> would
> > > lean
> > > > > > > towards
> > > > > > > > > > > >> Fabian’s
> > > > > > > > > > > >>>>> proposal of multi-argument `map(Expression*)`
> from
> > > > those
> > > > > 3
> > > > > > > > > options.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> 3.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Another topic is that I’m not 100% convinced that
> > we
> > > > > should
> > > > > > > be
> > > > > > > > > > adding
> > > > > > > > > > > >> new
> > > > > > > > > > > >>>>> api functions for `map`,`aggregate`,`flatMap` and
> > > > > > > > > `flatAggregate`.
> > > > > > > > > > I
> > > > > > > > > > > >> think
> > > > > > > > > > > >>>>> the same could be achieved by changing
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> table.map(F('x))
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> into
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> table.select(F('x)).unnest()
> > > > > > > > > > > >>>>> or
> > > > > > > > > > > >>>>> table.select(F('x).unnest())
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Where `unnest()` means unnest row/tuple type
> into a
> > > > > > columnar
> > > > > > > > > table.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> table.flatMap(F('x))
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Could be on the other hand also handled by
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> table.select(F('x))
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> By correctly deducing that F(x) is a multi row
> > output
> > > > > > > function
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Same might apply to `aggregate(F('x))`, but this
> > > maybe
> > > > > > could
> > > > > > > be
> > > > > > > > > > > >> replaced
> > > > > > > > > > > >>>>> by:
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> table.groupBy(…).select(F('x).unnest())
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Adding scalar functions should also be possible:
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> table.groupBy('k).select(F('x).unnest(), ‘k)
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Maybe such approach would allow us to implement
> the
> > > > same
> > > > > > > > features
> > > > > > > > > > in
> > > > > > > > > > > >> the
> > > > > > > > > > > >>>>> SQL as well?
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Piotrek
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <
> > > > > > chenghe...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Hi,
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Thank you all for the great proposal and
> > discussion!
> > > > > > > > > > > >>>>>> I also prefer to move on to the next step, so +1
> > for
> > > > > > opening
> > > > > > > > the
> > > > > > > > > > > JIRAs
> > > > > > > > > > > >>>>> to
> > > > > > > > > > > >>>>>> start the work.
> > > > > > > > > > > >>>>>> We can have more detailed discussion there. Btw,
> > we
> > > > can
> > > > > > > start
> > > > > > > > > with
> > > > > > > > > > > >> JIRAs
> > > > > > > > > > > >>>>>> which we have agreed on.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Best,
> > > > > > > > > > > >>>>>> Hequn
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <
> > > > > > > > > > wshaox...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > >>>>> wrote:
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> +1. I agree that we should open the JIRAs to
> > start
> > > > the
> > > > > > > work.
> > > > > > > > We
> > > > > > > > > > may
> > > > > > > > > > > >>>>>>> have better ideas on the flavor of the
> interface
> > > when
> > > > > > > > > > > >> implement/review
> > > > > > > > > > > >>>>>>> the code.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Regards,
> > > > > > > > > > > >>>>>>> shaoxuan
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> On 11/20/18, jincheng sun <
> > > sunjincheng...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > > > >>>>>>>> Hi all,
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Thanks all for the feedback.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> @Piotr About not using abbreviations naming,
> > +1,I
> > > > > like
> > > > > > > > > > > >>>>>>>> your proposal!Currently both DataSet and
> > > DataStream
> > > > > API
> > > > > > > are
> > > > > > > > > > using
> > > > > > > > > > > >>>>>>>> `aggregate`,
> > > > > > > > > > > >>>>>>>> BTW,I find other language also not using
> > > > abbreviations
> > > > > > > > > > naming,such
> > > > > > > > > > > >> as
> > > > > > > > > > > >>>>> R.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Sometimes the interface of the API is really
> > > > difficult
> > > > > > to
> > > > > > > > > > perfect,
> > > > > > > > > > > >> we
> > > > > > > > > > > >>>>>>> need
> > > > > > > > > > > >>>>>>>> to spend a lot of time thinking and feedback
> > from
> > > a
> > > > > > large
> > > > > > > > > number
> > > > > > > > > > > of
> > > > > > > > > > > >>>>>>> users,
> > > > > > > > > > > >>>>>>>> and constantly improve, but for backward
> > > > compatibility
> > > > > > > > issues,
> > > > > > > > > > we
> > > > > > > > > > > >>>>> have to
> > > > > > > > > > > >>>>>>>> adopt the most conservative approach when
> > > designing
> > > > > the
> > > > > > > > API(Of
> > > > > > > > > > > >>>>> course, I
> > > > > > > > > > > >>>>>>> am
> > > > > > > > > > > >>>>>>>> more in favor of developing more rich
> features,
> > > when
> > > > > we
> > > > > > > > > discuss
> > > > > > > > > > > >>>>> clearly).
> > > > > > > > > > > >>>>>>>> Therefore, I propose to divide the function
> > > > > > implementation
> > > > > > > > of
> > > > > > > > > > > >>>>>>>> map/faltMap/agg/flatAgg into basic functions
> of
> > > > JIRAs
> > > > > > and
> > > > > > > > > JIRAs
> > > > > > > > > > > that
> > > > > > > > > > > >>>>>>>> support time attributes and groupKeys. We can
> > > > develop
> > > > > > the
> > > > > > > > > > features
> > > > > > > > > > > >>>>> which
> > > > > > > > > > > >>>>>>>> we  have already agreed on the design. And we
> > will
> > > > > > > continue
> > > > > > > > to
> > > > > > > > > > > >> discuss
> > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > >>>>>>>> uncertain design.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> In fact, in addition to the design of APIs,
> > there
> > > > will
> > > > > > be
> > > > > > > > > > various
> > > > > > > > > > > >>>>>>>> performance optimization details, such as:
> table
> > > > > > Aggregate
> > > > > > > > > > > function
> > > > > > > > > > > >>>>>>>> emitValue will generate multiple calculation
> > > > results,
> > > > > in
> > > > > > > > > extreme
> > > > > > > > > > > >>>>> cases,
> > > > > > > > > > > >>>>>>>> each record will trigger a large number of
> > retract
> > > > > > > messages,
> > > > > > > > > > this
> > > > > > > > > > > >> will
> > > > > > > > > > > >>>>>>> have
> > > > > > > > > > > >>>>>>>> poor performance,so we will also optimize the
> > > > > interface
> > > > > > > > > design,
> > > > > > > > > > > such
> > > > > > > > > > > >>>>> as
> > > > > > > > > > > >>>>>>>> adding the emitWithRetractValue interface (I
> > have
> > > > > > updated
> > > > > > > > the
> > > > > > > > > > > google
> > > > > > > > > > > >>>>> doc)
> > > > > > > > > > > >>>>>>>> to allow the user to optionally perform
> > > incremental
> > > > > > > > > > calculations,
> > > > > > > > > > > >> thus
> > > > > > > > > > > >>>>>>>> avoiding a large number of retracts. Details
> > like
> > > > this
> > > > > > are
> > > > > > > > > > > difficult
> > > > > > > > > > > >>>>> to
> > > > > > > > > > > >>>>>>>> fully discuss in the mail list, so I recommend
> > > > > creating
> > > > > > > > > > JIRAs/FLIP
> > > > > > > > > > > >>>>> first,
> > > > > > > > > > > >>>>>>>> we develop designs that have been agreed upon
> > and
> > > > > > continue
> > > > > > > > to
> > > > > > > > > > > >> discuss
> > > > > > > > > > > >>>>>>>> non-deterministic designs!  What do you think?
> > > > > @Fabian &
> > > > > > > > > Piotr &
> > > > > > > > > > > >>>>> XiaoWei
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Best,
> > > > > > > > > > > >>>>>>>> Jincheng
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Xiaowei Jiang <xiaow...@gmail.com>
> > 于2018年11月19日周一
> > > > > > > > 上午12:07写道:
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>> Hi Fabian & Piotr, thanks for the feedback!
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> I appreciate your concerns, both on timestamp
> > > > > > attributes
> > > > > > > as
> > > > > > > > > > well
> > > > > > > > > > > as
> > > > > > > > > > > >>>>> on
> > > > > > > > > > > >>>>>>>>> implicit group keys. At the same time, I'm
> also
> > > > > > concerned
> > > > > > > > > with
> > > > > > > > > > > the
> > > > > > > > > > > >>>>>>>>> proposed
> > > > > > > > > > > >>>>>>>>> approach of allowing Expression* as
> parameters,
> > > > > > > especially
> > > > > > > > > for
> > > > > > > > > > > >>>>>>>>> flatMap/flatAgg. So far, we never allowed a
> > > scalar
> > > > > > > > expression
> > > > > > > > > > to
> > > > > > > > > > > >>>>> appear
> > > > > > > > > > > >>>>>>>>> together with table expressions. With the
> > > > Expression*
> > > > > > > > > approach,
> > > > > > > > > > > >> this
> > > > > > > > > > > >>>>>>> will
> > > > > > > > > > > >>>>>>>>> happen for the parameters to flatMap/flatAgg.
> > > I'm a
> > > > > bit
> > > > > > > > > > concerned
> > > > > > > > > > > >> on
> > > > > > > > > > > >>>>> if
> > > > > > > > > > > >>>>>>>>> we
> > > > > > > > > > > >>>>>>>>> fully understand the consequences when we try
> > to
> > > > > extend
> > > > > > > our
> > > > > > > > > > > system
> > > > > > > > > > > >> in
> > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > >>>>>>>>> future. I would be extra cautious in doing
> > this.
> > > To
> > > > > > avoid
> > > > > > > > > > this, I
> > > > > > > > > > > >>>>> think
> > > > > > > > > > > >>>>>>>>> an
> > > > > > > > > > > >>>>>>>>> implicit group key for flatAgg is safer. For
> > > > flatMap,
> > > > > > if
> > > > > > > > > users
> > > > > > > > > > > want
> > > > > > > > > > > >>>>> to
> > > > > > > > > > > >>>>>>>>> keep
> > > > > > > > > > > >>>>>>>>> the rowtime column, he can use
> crossApply/join
> > > > > instead.
> > > > > > > So
> > > > > > > > we
> > > > > > > > > > are
> > > > > > > > > > > >> not
> > > > > > > > > > > >>>>>>>>> losing any real functionality here.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Also a clarification on the following
> example:
> > > > > > > > > > > >>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > > > > > > >>>>>>>>>  .groupBy('w, 'k1, 'k2) // 'w should be a
> group
> > > > key.
> > > > > > > > > > > >>>>>>>>>  .flatAgg(tableAgg('a)).as('w, 'k1, 'k2,
> 'col1,
> > > > > 'col2)
> > > > > > > > > > > >>>>>>>>>  .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > > > > > > > > > >>>>>>>>> If we did not have the select clause in this
> > > > example,
> > > > > > we
> > > > > > > > will
> > > > > > > > > > > have
> > > > > > > > > > > >>>>> 'w as
> > > > > > > > > > > >>>>>>>>> a
> > > > > > > > > > > >>>>>>>>> regular column in the output. It should not
> > > > magically
> > > > > > > > > > disappear.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> The concern is not as strong for
> > > > Table.map/Table.agg
> > > > > > > > because
> > > > > > > > > we
> > > > > > > > > > > are
> > > > > > > > > > > >>>>> not
> > > > > > > > > > > >>>>>>>>> mixing scalar and table expressions. But we
> > also
> > > > want
> > > > > > to
> > > > > > > > be a
> > > > > > > > > > bit
> > > > > > > > > > > >>>>>>>>> consistent with these methods. If we used
> > > implicit
> > > > > > group
> > > > > > > > keys
> > > > > > > > > > for
> > > > > > > > > > > >>>>>>>>> Table.flatAgg, we probably should do the same
> > for
> > > > > > > > Table.agg.
> > > > > > > > > > Now
> > > > > > > > > > > we
> > > > > > > > > > > >>>>> only
> > > > > > > > > > > >>>>>>>>> have to choose what to do with Table.map. I
> can
> > > see
> > > > > > good
> > > > > > > > > > > arguments
> > > > > > > > > > > >>>>> from
> > > > > > > > > > > >>>>>>>>> both sides. But starting with a single
> > Expression
> > > > > seems
> > > > > > > > safer
> > > > > > > > > > > >> because
> > > > > > > > > > > >>>>>>>>> that
> > > > > > > > > > > >>>>>>>>> we can always extend to Expression* in the
> > > future.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> While thinking about this problem, it appears
> > > that
> > > > we
> > > > > > may
> > > > > > > > > need
> > > > > > > > > > > more
> > > > > > > > > > > >>>>> work
> > > > > > > > > > > >>>>>>>>> in
> > > > > > > > > > > >>>>>>>>> our handling of watermarks for SQL/Table API.
> > Our
> > > > > > current
> > > > > > > > way
> > > > > > > > > > of
> > > > > > > > > > > >>>>>>>>> propagating the watermarks from source all
> the
> > > way
> > > > to
> > > > > > > sink
> > > > > > > > > > might
> > > > > > > > > > > >> not
> > > > > > > > > > > >>>>> be
> > > > > > > > > > > >>>>>>>>> optimal. For example, after a tumbling
> window,
> > > the
> > > > > > > > watermark
> > > > > > > > > > can
> > > > > > > > > > > >>>>>>> actually
> > > > > > > > > > > >>>>>>>>> be advanced to just before the expiring of
> next
> > > > > > window. I
> > > > > > > > > think
> > > > > > > > > > > >> that
> > > > > > > > > > > >>>>> in
> > > > > > > > > > > >>>>>>>>> general, each operator may need to generate
> new
> > > > > > > watermarks
> > > > > > > > > > > instead
> > > > > > > > > > > >> of
> > > > > > > > > > > >>>>>>>>> simply propagating them. Once we accept that
> > > > > watermarks
> > > > > > > may
> > > > > > > > > > > change
> > > > > > > > > > > >>>>>>> during
> > > > > > > > > > > >>>>>>>>> the execution, it appears that the timestamp
> > > > columns
> > > > > > may
> > > > > > > > also
> > > > > > > > > > > >>>>> change, as
> > > > > > > > > > > >>>>>>>>> long as we have some way to associate
> watermark
> > > > with
> > > > > > it.
> > > > > > > My
> > > > > > > > > > > >>>>> intuition is
> > > > > > > > > > > >>>>>>>>> that once we have a through solution for the
> > > > > watermark
> > > > > > > > issue,
> > > > > > > > > > we
> > > > > > > > > > > >> may
> > > > > > > > > > > >>>>> be
> > > > > > > > > > > >>>>>>>>> able to solve the problem we encountered for
> > > > > Table.map
> > > > > > > in a
> > > > > > > > > > > cleaner
> > > > > > > > > > > >>>>> way.
> > > > > > > > > > > >>>>>>>>> But this is a complex issue which deserves a
> > > > > discussion
> > > > > > > on
> > > > > > > > > its
> > > > > > > > > > > own.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Regards,
> > > > > > > > > > > >>>>>>>>> Xiaowei
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr
> > Nowojski <
> > > > > > > > > > > >>>>>>> pi...@data-artisans.com>
> > > > > > > > > > > >>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Hi,
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Isn’t the problem of multiple expressions
> > > limited
> > > > > only
> > > > > > > to
> > > > > > > > > > > >> `flat***`
> > > > > > > > > > > >>>>>>>>>> functions and to be more specific only to
> > having
> > > > two
> > > > > > (or
> > > > > > > > > more)
> > > > > > > > > > > >>>>>>>>>> different
> > > > > > > > > > > >>>>>>>>>> table functions passed as an expressions?
> > > > > > > > > > > `.flatAgg(TableAggA('a),
> > > > > > > > > > > >>>>>>>>>> scalarFunction1(‘b), scalarFunction2(‘c))`
> > seems
> > > > to
> > > > > be
> > > > > > > > well
> > > > > > > > > > > >> defined
> > > > > > > > > > > >>>>>>>>>> (duplicate result of every scalar function
> to
> > > > every
> > > > > > > > record.
> > > > > > > > > Or
> > > > > > > > > > > am
> > > > > > > > > > > >> I
> > > > > > > > > > > >>>>>>>>> missing
> > > > > > > > > > > >>>>>>>>>> something?
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Another remark, I would be in favour of not
> > > using
> > > > > > > > > > abbreviations
> > > > > > > > > > > >> and
> > > > > > > > > > > >>>>>>>>> naming
> > > > > > > > > > > >>>>>>>>>> `agg` -> `aggregate`, `flatAgg` ->
> > > > `flatAggregate`.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Piotrek
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <
> > > > > > > > fhue...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Hi Jincheng,
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> I said before, that I think that the
> append()
> > > > > method
> > > > > > is
> > > > > > > > > > better
> > > > > > > > > > > >> than
> > > > > > > > > > > >>>>>>>>>>> implicitly forwarding keys, but still, I
> > > believe
> > > > it
> > > > > > > adds
> > > > > > > > > > > >>>>> unnecessary
> > > > > > > > > > > >>>>>>>>>> boiler
> > > > > > > > > > > >>>>>>>>>>> plate code.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Moreover, I haven't seen a convincing
> > argument
> > > > why
> > > > > > > > > > > >> map(Expression*)
> > > > > > > > > > > >>>>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>> worse than map(Expression). In either case
> we
> > > > need
> > > > > to
> > > > > > > do
> > > > > > > > > all
> > > > > > > > > > > >> kinds
> > > > > > > > > > > >>>>>>> of
> > > > > > > > > > > >>>>>>>>>>> checks to prevent invalid use of functions.
> > > > > > > > > > > >>>>>>>>>>> If the method is not correctly used, we can
> > > emit
> > > > a
> > > > > > good
> > > > > > > > > error
> > > > > > > > > > > >>>>>>> message
> > > > > > > > > > > >>>>>>>>> and
> > > > > > > > > > > >>>>>>>>>>> documenting map(Expression*) will be easier
> > > than
> > > > > > > > > > > >>>>>>>>>> map(append(Expression*)),
> > > > > > > > > > > >>>>>>>>>>> in my opinion.
> > > > > > > > > > > >>>>>>>>>>> I think we should not add unnessary syntax
> > > unless
> > > > > > there
> > > > > > > > is
> > > > > > > > > a
> > > > > > > > > > > good
> > > > > > > > > > > >>>>>>>>> reason
> > > > > > > > > > > >>>>>>>>>>> and to be honest, I haven't seen this
> reason
> > > yet.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Regarding the groupBy.agg() method, I think
> > it
> > > > > should
> > > > > > > > > behave
> > > > > > > > > > > just
> > > > > > > > > > > >>>>>>>>>>> like
> > > > > > > > > > > >>>>>>>>>> any
> > > > > > > > > > > >>>>>>>>>>> other method, i.e., not do any implicit
> > > > forwarding.
> > > > > > > > > > > >>>>>>>>>>> Let's take the example of the windowed
> group
> > > by,
> > > > > that
> > > > > > > you
> > > > > > > > > > > posted
> > > > > > > > > > > >>>>>>>>> before.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > > > > > > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a
> > group
> > > > key.
> > > > > > > > > > > >>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1,
> 'col2)
> > > > > > > > > > > >>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> What happens if 'w.rowtime is not selected?
> > > What
> > > > is
> > > > > > the
> > > > > > > > > data
> > > > > > > > > > > type
> > > > > > > > > > > >>>>> of
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>> field 'w in the resulting Table? Is it a
> > > regular
> > > > > > field
> > > > > > > at
> > > > > > > > > all
> > > > > > > > > > > or
> > > > > > > > > > > >>>>>>> just
> > > > > > > > > > > >>>>>>>>>>> a
> > > > > > > > > > > >>>>>>>>>>> system field that disappears if it is not
> > > > selected?
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> IMO, the following syntax is shorter, more
> > > > > explicit,
> > > > > > > and
> > > > > > > > > > better
> > > > > > > > > > > >>>>>>>>>>> aligned
> > > > > > > > > > > >>>>>>>>>>> with the regular window.groupBy.select
> > > > aggregations
> > > > > > > that
> > > > > > > > > are
> > > > > > > > > > > >>>>>>>>>>> supported
> > > > > > > > > > > >>>>>>>>>>> today.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > > > > > > >>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a
> > group
> > > > key.
> > > > > > > > > > > >>>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2,
> agg('a))
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Best, Fabian
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb
> > > > jincheng
> > > > > > > sun <
> > > > > > > > > > > >>>>>>>>>>> sunjincheng...@gmail.com>:
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Hi Fabian/Xiaowei,
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> I am very sorry for my late reply! Glad to
> > see
> > > > > your
> > > > > > > > reply,
> > > > > > > > > > and
> > > > > > > > > > > >>>>>>>>>>>> sounds
> > > > > > > > > > > >>>>>>>>>>>> pretty good!
> > > > > > > > > > > >>>>>>>>>>>> I agree that the approach with append()
> > which
> > > > can
> > > > > > > > clearly
> > > > > > > > > > > >> defined
> > > > > > > > > > > >>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>> result schema is better which Fabian
> > > mentioned.
> > > > > > > > > > > >>>>>>>>>>>> In addition and append() and also contains
> > > > > non-time
> > > > > > > > > > > attributes,
> > > > > > > > > > > >>>>>>>>>>>> e.g.:
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> tab('name, 'age, 'address, 'rowtime)
> > > > > > > > > > > >>>>>>>>>>>> tab.map(append(udf('name), 'address,
> > > > > > > 'rowtime).as('col1,
> > > > > > > > > > > 'col2,
> > > > > > > > > > > >>>>>>>>>>>> 'address, 'rowtime)
> > > > > > > > > > > >>>>>>>>>>>> .window(Tumble over 5.millis on 'rowtime
> as
> > > 'w)
> > > > > > > > > > > >>>>>>>>>>>> .groupBy('w, 'address)
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> In this way the append() is very useful,
> and
> > > the
> > > > > > > > behavior
> > > > > > > > > is
> > > > > > > > > > > >> very
> > > > > > > > > > > >>>>>>>>>> similar
> > > > > > > > > > > >>>>>>>>>>>> to withForwardedFields() in DataSet.
> > > > > > > > > > > >>>>>>>>>>>> So +1 to using append() approach for the
> > > > > > > > map()&flatmap()!
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> But how about the agg() and flatAgg()? In
> > > > > > agg/flatAgg
> > > > > > > > > case I
> > > > > > > > > > > >> agree
> > > > > > > > > > > >>>>>>>>>>>> Xiaowei's approach that define the keys to
> > be
> > > > > > implied
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > >>>>> result
> > > > > > > > > > > >>>>>>>>>> table
> > > > > > > > > > > >>>>>>>>>>>> and appears at the beginning, for example
> as
> > > > > > follows:
> > > > > > > > > > > >>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > > > > > > >>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a
> > group
> > > > > key.
> > > > > > > > > > > >>>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1,
> 'col2)
> > > > > > > > > > > >>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> What to you think? @Fabian @Xiaowei
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Thanks,
> > > > > > > > > > > >>>>>>>>>>>> Jincheng
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Fabian Hueske <fhue...@gmail.com>
> > > 于2018年11月9日周五
> > > > > > > > 下午6:35写道:
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> Hi Jincheng,
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> Thanks for the summary!
> > > > > > > > > > > >>>>>>>>>>>>> I like the approach with append() better
> > than
> > > > the
> > > > > > > > > implicit
> > > > > > > > > > > >>>>>>>>>>>>> forwarding
> > > > > > > > > > > >>>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>> it
> > > > > > > > > > > >>>>>>>>>>>>> clearly indicates which fields are
> > forwarded.
> > > > > > > > > > > >>>>>>>>>>>>> However, I don't see much benefit over
> the
> > > > > > > > > > > flatMap(Expression*)
> > > > > > > > > > > >>>>>>>>>> variant,
> > > > > > > > > > > >>>>>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>>> we would still need to analyze the full
> > > > > expression
> > > > > > > tree
> > > > > > > > > to
> > > > > > > > > > > >> ensure
> > > > > > > > > > > >>>>>>>>> that
> > > > > > > > > > > >>>>>>>>>> at
> > > > > > > > > > > >>>>>>>>>>>>> most (or exactly?) one Scalar /
> > TableFunction
> > > > is
> > > > > > > used.
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> Best,
> > > > > > > > > > > >>>>>>>>>>>>> Fabian
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb
> > > > > jincheng
> > > > > > > sun
> > > > > > > > <
> > > > > > > > > > > >>>>>>>>>>>>> sunjincheng...@gmail.com>:
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Hi all,
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> We are discussing very detailed content
> > > about
> > > > > this
> > > > > > > > > > proposal.
> > > > > > > > > > > >> We
> > > > > > > > > > > >>>>>>>>>>>>>> are
> > > > > > > > > > > >>>>>>>>>>>>> trying
> > > > > > > > > > > >>>>>>>>>>>>>> to design the API in many aspects
> > > > > (functionality,
> > > > > > > > > > > >> compatibility,
> > > > > > > > > > > >>>>>>>>> ease
> > > > > > > > > > > >>>>>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>>>> use, etc.). I think this is a very good
> > > > process.
> > > > > > > Only
> > > > > > > > > > such a
> > > > > > > > > > > >>>>>>>>> detailed
> > > > > > > > > > > >>>>>>>>>>>>>> discussion, In order to develop PR more
> > > > clearly
> > > > > > and
> > > > > > > > > > smoothly
> > > > > > > > > > > >> in
> > > > > > > > > > > >>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>> later
> > > > > > > > > > > >>>>>>>>>>>>>> stage. I am very grateful to @Fabian and
> > > > > @Xiaowei
> > > > > > > for
> > > > > > > > > > > >> sharing a
> > > > > > > > > > > >>>>>>>>>>>>>> lot
> > > > > > > > > > > >>>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>>>> good ideas.
> > > > > > > > > > > >>>>>>>>>>>>>> About the definition of method
> signatures
> > I
> > > > want
> > > > > > to
> > > > > > > > > share
> > > > > > > > > > my
> > > > > > > > > > > >>>>>>>>>>>>>> points
> > > > > > > > > > > >>>>>>>>>>>> here
> > > > > > > > > > > >>>>>>>>>>>>>> which I am discussing with fabian in
> > google
> > > > doc
> > > > > > (not
> > > > > > > > yet
> > > > > > > > > > > >>>>>>>>>>>>>> completed),
> > > > > > > > > > > >>>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>>>> follows:
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Assume we have a table:
> > > > > > > > > > > >>>>>>>>>>>>>> val tab = util.addTable[(Long,
> > > > > String)]("MyTable",
> > > > > > > > > 'long,
> > > > > > > > > > > >>>>>>> 'string,
> > > > > > > > > > > >>>>>>>>>>>>>> 'proctime.proctime)
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Approach 1:
> > > > > > > > > > > >>>>>>>>>>>>>> case1: Map follows Source Table
> > > > > > > > > > > >>>>>>>>>>>>>> val result =
> > > > > > > > > > > >>>>>>>>>>>>>> tab.map(udf('string)).as('proctime,
> 'col1,
> > > > > > 'col2)//
> > > > > > > > > > proctime
> > > > > > > > > > > >>>>>>>>> implied
> > > > > > > > > > > >>>>>>>>>>>> in
> > > > > > > > > > > >>>>>>>>>>>>>> the output
> > > > > > > > > > > >>>>>>>>>>>>>> .window(Tumble over 5.millis on
> 'proctime
> > as
> > > > 'w)
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> case2: FatAgg follows Window (Fabian
> > > mentioned
> > > > > > > above)
> > > > > > > > > > > >>>>>>>>>>>>>> val result =
> > > > > > > > > > > >>>>>>>>>>>>>> tab.window(Tumble ... as 'w)
> > > > > > > > > > > >>>>>>>>>>>>>>    .groupBy('w, 'k1, 'k2) // 'w should
> be
> > a
> > > > > group
> > > > > > > key.
> > > > > > > > > > > >>>>>>>>>>>>>>    .flatAgg(tabAgg('a)).as('k1, 'k2, 'w,
> > > > 'col1,
> > > > > > > 'col2)
> > > > > > > > > > > >>>>>>>>>>>>>>    .select('k1, 'col1, 'w.rowtime as
> > 'rtime)
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Approach 2: Similar to Fabian‘s
> approach,
> > > > which
> > > > > > the
> > > > > > > > > result
> > > > > > > > > > > >>>>> schema
> > > > > > > > > > > >>>>>>>>>> would
> > > > > > > > > > > >>>>>>>>>>>>> be
> > > > > > > > > > > >>>>>>>>>>>>>> clearly defined, but add a built-in
> append
> > > > UDF.
> > > > > > That
> > > > > > > > > make
> > > > > > > > > > > >>>>>>>>>>>>>> map/flatmap/agg/flatAgg interface only
> > > accept
> > > > > one
> > > > > > > > > > > Expression.
> > > > > > > > > > > >>>>>>>>>>>>>> val result =
> > > > > > > > > > > >>>>>>>>>>>>>> tab.map(append(udf('string), 'long,
> > > > 'proctime))
> > > > > as
> > > > > > > > > ('col1,
> > > > > > > > > > > >>>>>>>>>>>>>> 'col2,
> > > > > > > > > > > >>>>>>>>>>>>>> 'long, 'proctime)
> > > > > > > > > > > >>>>>>>>>>>>>>  .window(Tumble over 5.millis on
> 'proctime
> > > as
> > > > > 'w)
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Note: Append is a special UDF for
> built-in
> > > > that
> > > > > > can
> > > > > > > > pass
> > > > > > > > > > > >> through
> > > > > > > > > > > >>>>>>>>>>>>>> any
> > > > > > > > > > > >>>>>>>>>>>>>> column.
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> So, May be we can defined the as
> > > > > > > > table.map(Expression)
> > > > > > > > > > > >> first,
> > > > > > > > > > > >>>>>>> If
> > > > > > > > > > > >>>>>>>>>>>>>> necessary, we can extend to
> > > > > table.map(Expression*)
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > >>>>> future
> > > > > > > > > > > >>>>>>>>>>>>>> ?
> > > > > > > > > > > >>>>>>>>>> Of
> > > > > > > > > > > >>>>>>>>>>>>>> course, I also hope that we can do more
> > > > > perfection
> > > > > > > in
> > > > > > > > > this
> > > > > > > > > > > >>>>>>>>>>>>>> proposal
> > > > > > > > > > > >>>>>>>>>>>>> through
> > > > > > > > > > > >>>>>>>>>>>>>> discussion.
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Thanks,
> > > > > > > > > > > >>>>>>>>>>>>>> Jincheng
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Xiaowei Jiang <xiaow...@gmail.com>
> > > > > 于2018年11月7日周三
> > > > > > > > > > 下午11:45写道:
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> Hi Fabian,
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> I think that the key question you
> raised
> > is
> > > > if
> > > > > we
> > > > > > > > allow
> > > > > > > > > > > extra
> > > > > > > > > > > >>>>>>>>>>>>> parameters
> > > > > > > > > > > >>>>>>>>>>>>>> in
> > > > > > > > > > > >>>>>>>>>>>>>>> the methods map/flatMap/agg/flatAgg. I
> > can
> > > > see
> > > > > > why
> > > > > > > > > > allowing
> > > > > > > > > > > >>>>> that
> > > > > > > > > > > >>>>>>>>> may
> > > > > > > > > > > >>>>>>>>>>>>>> appear
> > > > > > > > > > > >>>>>>>>>>>>>>> more convenient in some cases. However,
> > it
> > > > > might
> > > > > > > also
> > > > > > > > > > cause
> > > > > > > > > > > >>>>> some
> > > > > > > > > > > >>>>>>>>>>>>>> confusions
> > > > > > > > > > > >>>>>>>>>>>>>>> if we do that. For example, do we allow
> > > > > multiple
> > > > > > > UDFs
> > > > > > > > > in
> > > > > > > > > > > >> these
> > > > > > > > > > > >>>>>>>>>>>>>> expressions?
> > > > > > > > > > > >>>>>>>>>>>>>>> If we do, the semantics may be weird to
> > > > define,
> > > > > > > e.g.
> > > > > > > > > what
> > > > > > > > > > > >> does
> > > > > > > > > > > >>>>>>>>>>>>>>>
> table.groupBy('k).flatAgg(TableAggA('a),
> > > > > > > > TableAggB('b))
> > > > > > > > > > > mean?
> > > > > > > > > > > >>>>>>>>>>>>>>> Even
> > > > > > > > > > > >>>>>>>>>>>>> though
> > > > > > > > > > > >>>>>>>>>>>>>>> not allowing it may appear less
> powerful,
> > > but
> > > > > it
> > > > > > > can
> > > > > > > > > make
> > > > > > > > > > > >>>>> things
> > > > > > > > > > > >>>>>>>>> more
> > > > > > > > > > > >>>>>>>>>>>>>>> intuitive too. In the case of
> > agg/flatAgg,
> > > we
> > > > > can
> > > > > > > > > define
> > > > > > > > > > > the
> > > > > > > > > > > >>>>>>> keys
> > > > > > > > > > > >>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>> be
> > > > > > > > > > > >>>>>>>>>>>>>>> implied in the result table and appears
> > at
> > > > the
> > > > > > > > > beginning.
> > > > > > > > > > > You
> > > > > > > > > > > >>>>>>> can
> > > > > > > > > > > >>>>>>>>>>>> use a
> > > > > > > > > > > >>>>>>>>>>>>>>> select method if you want to modify
> this
> > > > > > behavior.
> > > > > > > I
> > > > > > > > > > think
> > > > > > > > > > > >> that
> > > > > > > > > > > >>>>>>>>>>>>>> eventually
> > > > > > > > > > > >>>>>>>>>>>>>>> we will have some API which allows
> other
> > > > > > > expressions
> > > > > > > > as
> > > > > > > > > > > >>>>>>>>>>>>>>> additional
> > > > > > > > > > > >>>>>>>>>>>>>>> parameters, but I think it's better to
> do
> > > > that
> > > > > > > after
> > > > > > > > we
> > > > > > > > > > > >>>>>>> introduce
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>> concept of nested tables. A lot of
> things
> > > we
> > > > > > > > suggested
> > > > > > > > > > here
> > > > > > > > > > > >> can
> > > > > > > > > > > >>>>>>>>>>>>>>> be
> > > > > > > > > > > >>>>>>>>>>>>>>> considered as special cases of that.
> But
> > > > things
> > > > > > are
> > > > > > > > > much
> > > > > > > > > > > >>>>> simpler
> > > > > > > > > > > >>>>>>>>>>>>>>> if
> > > > > > > > > > > >>>>>>>>>>>> we
> > > > > > > > > > > >>>>>>>>>>>>>>> leave that to later.
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> Regards,
> > > > > > > > > > > >>>>>>>>>>>>>>> Xiaowei
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian
> > > Hueske
> > > > <
> > > > > > > > > > > >>>>> fhue...@gmail.com
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> Hi,
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> * Re emit:
> > > > > > > > > > > >>>>>>>>>>>>>>>> I think we should start with a well
> > > > understood
> > > > > > > > > semantics
> > > > > > > > > > > of
> > > > > > > > > > > >>>>>>> full
> > > > > > > > > > > >>>>>>>>>>>>>>>> replacement. This is how the other agg
> > > > > functions
> > > > > > > > work.
> > > > > > > > > > > >>>>>>>>>>>>>>>> As was said before, there are open
> > > questions
> > > > > > > > regarding
> > > > > > > > > > an
> > > > > > > > > > > >>>>>>> append
> > > > > > > > > > > >>>>>>>>>>>> mode
> > > > > > > > > > > >>>>>>>>>>>>>>>> (checkpointing, whether supporting
> > > > retractions
> > > > > > or
> > > > > > > > not
> > > > > > > > > > and
> > > > > > > > > > > if
> > > > > > > > > > > >>>>>>> yes
> > > > > > > > > > > >>>>>>>>>>>> how
> > > > > > > > > > > >>>>>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>>>>> declare them, ...).
> > > > > > > > > > > >>>>>>>>>>>>>>>> Since this seems to be an
> optimization,
> > > I'd
> > > > > > > postpone
> > > > > > > > > it.
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> * Re grouping keys:
> > > > > > > > > > > >>>>>>>>>>>>>>>> I don't think we should automatically
> > add
> > > > them
> > > > > > > > because
> > > > > > > > > > the
> > > > > > > > > > > >>>>>>>>>>>>>>>> result
> > > > > > > > > > > >>>>>>>>>>>>>> schema
> > > > > > > > > > > >>>>>>>>>>>>>>>> would not be intuitive.
> > > > > > > > > > > >>>>>>>>>>>>>>>> Would they be added at the beginning
> of
> > > the
> > > > > > tuple
> > > > > > > or
> > > > > > > > > at
> > > > > > > > > > > the
> > > > > > > > > > > >>>>>>> end?
> > > > > > > > > > > >>>>>>>>>>>> What
> > > > > > > > > > > >>>>>>>>>>>>>>>> metadata fields of windows would be
> > added?
> > > > In
> > > > > > > which
> > > > > > > > > > order
> > > > > > > > > > > >>>>> would
> > > > > > > > > > > >>>>>>>>>>>> they
> > > > > > > > > > > >>>>>>>>>>>>> be
> > > > > > > > > > > >>>>>>>>>>>>>>>> added?
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> However, we could support syntax like
> > > this:
> > > > > > > > > > > >>>>>>>>>>>>>>>> val t: Table = ???
> > > > > > > > > > > >>>>>>>>>>>>>>>> t
> > > > > > > > > > > >>>>>>>>>>>>>>>> .window(Tumble ... as 'w)
> > > > > > > > > > > >>>>>>>>>>>>>>>> .groupBy('a, 'b)
> > > > > > > > > > > >>>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)),
> 'w.end
> > as
> > > > > > 'wend,
> > > > > > > > > > > 'w.rowtime
> > > > > > > > > > > >>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>>>> 'rtime)
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> The result schema would be clearly
> > defined
> > > > as
> > > > > > [b,
> > > > > > > a,
> > > > > > > > > f1,
> > > > > > > > > > > f2,
> > > > > > > > > > > >>>>>>>>>>>>>>>> ...,
> > > > > > > > > > > >>>>>>>>>>>> fn,
> > > > > > > > > > > >>>>>>>>>>>>>>> wend,
> > > > > > > > > > > >>>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result
> > > > > > attributes
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > >> UDF.
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> * Re Multi-staged evaluation:
> > > > > > > > > > > >>>>>>>>>>>>>>>> I think this should be an optimization
> > > that
> > > > > can
> > > > > > be
> > > > > > > > > > applied
> > > > > > > > > > > >> if
> > > > > > > > > > > >>>>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>> UDF
> > > > > > > > > > > >>>>>>>>>>>>>>>> implements the merge() method.
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> Best, Fabian
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr
> > schrieb
> > > > > > Shaoxuan
> > > > > > > > > Wang
> > > > > > > > > > <
> > > > > > > > > > > >>>>>>>>>>>>>>>> wshaox...@gmail.com
> > > > > > > > > > > >>>>>>>>>>>>>>>>> :
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi xiaowei,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Yes, I agree with you that the
> > semantics
> > > of
> > > > > > > > > > > >>>>>>>>>>>> TableAggregateFunction
> > > > > > > > > > > >>>>>>>>>>>>>> emit
> > > > > > > > > > > >>>>>>>>>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>>>>>>>> much more complex than
> > AggregateFunction.
> > > > The
> > > > > > > > > > fundamental
> > > > > > > > > > > >>>>>>>>>>>>> difference
> > > > > > > > > > > >>>>>>>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>>>>>>>> that TableAggregateFunction emits a
> > > "table"
> > > > > > while
> > > > > > > > > > > >>>>>>>>>>>> AggregateFunction
> > > > > > > > > > > >>>>>>>>>>>>>>>> outputs
> > > > > > > > > > > >>>>>>>>>>>>>>>>> (a column of) a "row". In the case of
> > > > > > > > > AggregateFunction
> > > > > > > > > > > it
> > > > > > > > > > > >>>>>>> only
> > > > > > > > > > > >>>>>>>>>>>> has
> > > > > > > > > > > >>>>>>>>>>>>>> one
> > > > > > > > > > > >>>>>>>>>>>>>>>>> mode which is “replacing” (complete
> > > > update).
> > > > > > But
> > > > > > > > for
> > > > > > > > > > > >>>>>>>>>>>>>>>>> TableAggregateFunction, it could be
> > > > > incremental
> > > > > > > > (only
> > > > > > > > > > > emit
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>>>>>> new
> > > > > > > > > > > >>>>>>>>>>>>>>>> updated
> > > > > > > > > > > >>>>>>>>>>>>>>>>> results) update or complete update
> > > (always
> > > > > emit
> > > > > > > the
> > > > > > > > > > > entire
> > > > > > > > > > > >>>>>>>>>>>>>>>>> table
> > > > > > > > > > > >>>>>>>>>>>>> when
> > > > > > > > > > > >>>>>>>>>>>>>>>>> “emit" is triggered).  From the
> > > performance
> > > > > > > > > > perspective,
> > > > > > > > > > > we
> > > > > > > > > > > >>>>>>>>>>>>>>>>> might
> > > > > > > > > > > >>>>>>>>>>>>>> want
> > > > > > > > > > > >>>>>>>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>>>>>> use incremental update. But we need
> > > review
> > > > > and
> > > > > > > > design
> > > > > > > > > > > this
> > > > > > > > > > > >>>>>>>>>>>>> carefully,
> > > > > > > > > > > >>>>>>>>>>>>>>>>> especially taking into account the
> > cases
> > > of
> > > > > the
> > > > > > > > > > failover
> > > > > > > > > > > >>>>>>>>>>>>>>>>> (instead
> > > > > > > > > > > >>>>>>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>>>>> just
> > > > > > > > > > > >>>>>>>>>>>>>>>>> back-up the ACC it may also needs to
> > > > remember
> > > > > > the
> > > > > > > > > emit
> > > > > > > > > > > >>>>> offset)
> > > > > > > > > > > >>>>>>>>>>>> and
> > > > > > > > > > > >>>>>>>>>>>>>>>>> retractions, as the semantics of
> > > > > > > > > TableAggregateFunction
> > > > > > > > > > > >> emit
> > > > > > > > > > > >>>>>>>>>>>>>>>>> are
> > > > > > > > > > > >>>>>>>>>>>>>>>> different
> > > > > > > > > > > >>>>>>>>>>>>>>>>> than other UDFs. TableFunction also
> > > emits a
> > > > > > > table,
> > > > > > > > > but
> > > > > > > > > > it
> > > > > > > > > > > >>>>> does
> > > > > > > > > > > >>>>>>>>>>>> not
> > > > > > > > > > > >>>>>>>>>>>>>> need
> > > > > > > > > > > >>>>>>>>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>>>>>> worry this due to the nature of
> > > stateless.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Regards,
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Shaoxuan
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM
> Xiaowei
> > > > Jiang
> > > > > > > > > > > >>>>>>>>>>>>>>>>> <xiaow...@gmail.com
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> Hi Jincheng,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> Thanks for adding the public
> > > interfaces! I
> > > > > > think
> > > > > > > > > that
> > > > > > > > > > > >> it's a
> > > > > > > > > > > >>>>>>>>>>>> very
> > > > > > > > > > > >>>>>>>>>>>>>>> good
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> start. There are a few points that
> we
> > > need
> > > > > to
> > > > > > > have
> > > > > > > > > > more
> > > > > > > > > > > >>>>>>>>>>>>>> discussions.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> - TableAggregateFunction - this is a
> > > very
> > > > > > > complex
> > > > > > > > > > beast,
> > > > > > > > > > > >>>>>>>>>>>>>>> definitely
> > > > > > > > > > > >>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> most complex user defined objects we
> > > > > > introduced
> > > > > > > so
> > > > > > > > > > far.
> > > > > > > > > > > I
> > > > > > > > > > > >>>>>>>>>>>>> think
> > > > > > > > > > > >>>>>>>>>>>>>>>> there
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> are
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> quite some interesting questions
> here.
> > > For
> > > > > > > > example,
> > > > > > > > > do
> > > > > > > > > > > we
> > > > > > > > > > > >>>>>>>>>>>>> allow
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> multi-staged TableAggregate in this
> > > case?
> > > > > What
> > > > > > > is
> > > > > > > > > the
> > > > > > > > > > > >>>>>>>>>>>>> semantics
> > > > > > > > > > > >>>>>>>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> emit? Is
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> it amendments to the previous
> output,
> > or
> > > > > > > replacing
> > > > > > > > > > it? I
> > > > > > > > > > > >>>>>>>>>>>> think
> > > > > > > > > > > >>>>>>>>>>>>>>> that
> > > > > > > > > > > >>>>>>>>>>>>>>>>> this
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> subject itself is worth a discussion
> > to
> > > > make
> > > > > > > sure
> > > > > > > > we
> > > > > > > > > > get
> > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>> details
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> right.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> - GroupedTable.agg - does the group
> > keys
> > > > > > > > > automatically
> > > > > > > > > > > >>>>>>>>>>>> appear
> > > > > > > > > > > >>>>>>>>>>>>> in
> > > > > > > > > > > >>>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> output? how about the case of
> > windowing
> > > > > > > > aggregation?
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> Regards,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> Xiaowei
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM
> > jincheng
> > > > sun
> > > > > <
> > > > > > > > > > > >>>>>>>>>>>>>>> sunjincheng...@gmail.com>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Hi, Xiaowei,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Thanks for bring up the discuss of
> > > Table
> > > > > API
> > > > > > > > > > > Enhancement
> > > > > > > > > > > >>>>>>>>>>>>> Outline
> > > > > > > > > > > >>>>>>>>>>>>>> !
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> I quickly looked at the overall
> > > content,
> > > > > > these
> > > > > > > > are
> > > > > > > > > > good
> > > > > > > > > > > >>>>>>>>>>>>>> expressions
> > > > > > > > > > > >>>>>>>>>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> our
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> offline discussions. But from the
> > > points
> > > > of
> > > > > > my
> > > > > > > > > view,
> > > > > > > > > > we
> > > > > > > > > > > >>>>>>>>>>>> should
> > > > > > > > > > > >>>>>>>>>>>>>> add
> > > > > > > > > > > >>>>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> usage of public interfaces that we
> > will
> > > > > > > introduce
> > > > > > > > > in
> > > > > > > > > > > this
> > > > > > > > > > > >>>>>>>>>>>>>> propose.
> > > > > > > > > > > >>>>>>>>>>>>>>>>> So, I
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> added the following usage
> description
> > > of
> > > > > > > > interface
> > > > > > > > > > and
> > > > > > > > > > > >>>>>>>>>>>>> operators
> > > > > > > > > > > >>>>>>>>>>>>>>> in
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> google doc:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 1. Map Operator
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Map operator is a new operator of
> > > Table,
> > > > > Map
> > > > > > > > > operator
> > > > > > > > > > > >> can
> > > > > > > > > > > >>>>>>>>>>>>>>> apply a
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> scalar function, and can return
> > > > > multi-column.
> > > > > > > The
> > > > > > > > > > usage
> > > > > > > > > > > >> as
> > > > > > > > > > > >>>>>>>>>>>>>> follows:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>  .map(fun: ScalarFunction).as(‘a,
> ‘b,
> > > ‘c)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>  .select(‘a, ‘c)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 2. FlatMap Operator
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> FaltMap operator is a new operator
> of
> > > > > Table,
> > > > > > > > > FlatMap
> > > > > > > > > > > >>>>>>>>>>>>> operator
> > > > > > > > > > > >>>>>>>>>>>>>>> can
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> apply
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> a table function, and can return
> > > > multi-row.
> > > > > > The
> > > > > > > > > usage
> > > > > > > > > > > as
> > > > > > > > > > > >>>>>>>>>>>>> follows:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>   .flatMap(fun:
> TableFunction).as(‘a,
> > > ‘b,
> > > > > ‘c)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 3. Agg Operator
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Agg operator is a new operator of
> > > > > > > > > Table/GroupedTable,
> > > > > > > > > > > >> Agg
> > > > > > > > > > > >>>>>>>>>>>>>>>> operator
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> can
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> apply a aggregate function, and can
> > > > return
> > > > > > > > > > > multi-column.
> > > > > > > > > > > >>>>> The
> > > > > > > > > > > >>>>>>>>>>>>>> usage
> > > > > > > > > > > >>>>>>>>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> follows:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>   .groupBy(‘a) // leave
> > groupBy-Clause
> > > > out
> > > > > to
> > > > > > > > > define
> > > > > > > > > > > >>>>>>>>>>>> global
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> aggregates
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>   .agg(fun:
> AggregateFunction).as(‘a,
> > > ‘b,
> > > > > ‘c)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>   .select(‘a, ‘c)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 4.  FlatAgg Operator
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> FlatAgg operator is a new operator
> of
> > > > > > > > > > > >> Table/GroupedTable,
> > > > > > > > > > > >>>>>>>>>>>>>>> FaltAgg
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> operator can apply a table
> aggregate
> > > > > > function,
> > > > > > > > and
> > > > > > > > > > can
> > > > > > > > > > > >>>>>>> return
> > > > > > > > > > > >>>>>>>>>>>>>>>>> multi-row.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> The usage as follows:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> val res = tab
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>    .groupBy(‘a) // leave
> > groupBy-Clause
> > > > out
> > > > > > to
> > > > > > > > > define
> > > > > > > > > > > >>>>>>>>>>>>> global
> > > > > > > > > > > >>>>>>>>>>>>>>>> table
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> aggregates
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>    .flatAgg(fun:
> > > > > > TableAggregateFunction).as(‘a,
> > > > > > > > ‘b,
> > > > > > > > > > ‘c)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>    .select(‘a, ‘c)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> 5. TableAggregateFunction
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>  The behavior of table aggregates
> is
> > > most
> > > > > > like
> > > > > > > > > > > >>>>>>>>>>>>>>>> GroupReduceFunction
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> did,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> which computed for a group of
> > elements,
> > > > and
> > > > > > > > > output  a
> > > > > > > > > > > >> group
> > > > > > > > > > > >>>>>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>>>>>>> elements.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> The TableAggregateFunction can be
> > > applied
> > > > > on
> > > > > > > > > > > >>>>>>>>>>>>>>> GroupedTable.flatAgg() .
> > > > > > > > > > > >>>>>>>>>>>>>>>>> The
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> interface of TableAggregateFunction
> > > has a
> > > > > lot
> > > > > > > of
> > > > > > > > > > > content,
> > > > > > > > > > > >>>>> so
> > > > > > > > > > > >>>>>>>>>>>> I
> > > > > > > > > > > >>>>>>>>>>>>>>> don't
> > > > > > > > > > > >>>>>>>>>>>>>>>>> copy
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> it here, Please look at the detail
> in
> > > > > google
> > > > > > > doc:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> I will be very appreciate to anyone
> > for
> > > > > > > reviewing
> > > > > > > > > and
> > > > > > > > > > > >>>>>>>>>>>>> commenting.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Best,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>> Jincheng
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> --
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> -----------------------------------------------------------------------------------
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> *Rome was not built in one day*
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> -----------------------------------------------------------------------------------
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to