Hi shaoxuan:
I think, the streamsql must be excuted in table environment. So I
call this table API ‘s StreamSQL. What do you call for this, stream
Table API or streamsql? It is fu
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tblEnv = TableEnvironment.getTableEnvironment(env)
val ds: DataStream[(String,Long, Long)] =
env.readTextFile("/home/demo") tblEnv.registerDataStream("Order", ds, 'userID,
'count, 'num)
.map(f=>(f, 1L, 1L))
val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
So in my opinion, the grammar which is marked red should be
compatible with calcite's StreamSQL grammar.
By the way, thanks very much for telling me the modified content in
Flink StreamSQL. I will look the new proposal .
Thanks!
发件人: Sean Wang [mailto:wshaox...@gmail.com]
发送时间: 2016年10月13日 16:29
收件人: dev@flink.apache.org; Zhangrucong
主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi zhangrucong,
I am not sure what you mean by "table API'S StreamSQL", I guess you
mean "stream TableAPI"?
TableAPI should be compatible with calcite SQL. (By compatible, My
understanding is that both TableAPI and SQL will be translated to the
same logical plan - the same set of REL and REX).
BTW, please note that we recently have merged a change to remove
STREAM keyword for flink stream SQL(FLINK-4546). In our opinion,
batch and stream are not necessarily to be differentiated at the SQL
level. The major difference between batch and stream is "WHEN and HOW to emit the
result".
We have been working on a new proposal with Fabian on this change. I
guess it will be sent out for review very soon.
Regards,
Shaoxuan
On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <zhangruc...@huawei.com
<mailto:zhangruc...@huawei.com>> wrote:
Hi shaoxuan:
Does the table API'S StreamSQL grammar is compatible with calcite's
StreamSQL grammar?
1、In calcite, the tumble window is realized by using function tumble
or hop. And the function must be used with group by, like this:
SELECT
TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
productId;
2、 The sliding window uses keywords "window" and "over". Like this:
SELECT *
FROM (
SELECT STREAM rowtime,
productId,
units,
AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
FROM Orders
WINDOW product AS (
ORDER BY rowtime
PARTITION BY productId))
Thanks!
-----邮件原件-----
发件人: 王绍翾(大沙)
[mailto:shaoxuan....@alibaba-inc.com<mailto:shaoxuan.wsx@ali
baba-inc.com>]
发送时间: 2016年10月13日 2:03
收件人: dev@flink.apache.org<mailto:dev@flink.apache.org>
主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a
really great and promising proposal. I have a few comments to the "window"
operator proposed in this FLIP (I am hoping it is not too late to
bring up this). First, window is not always needed for the stream
aggregation. There are cases where we want do an aggreation on a
stream, while the query/emit strategy decides when to emit a
streaming output. Second, window is needed when we want do an
aggregation for a certain rage, but window is not an operator. We
basically use window to define the range for aggregation. In tableAPI, a window should be defined
together with "groupby" and "select"
operators, either inside a "groupby" operator or after an "over"
clause in "select" operator. This will make the TableAPI in the similar manner
as SQL.
For instance,[A groupby without window] <Table API> val res = tab
.groupBy(‘a)
.select(‘a, ‘b.sum)
<SQL>
SELECT a, SUM(b)
FROM tab
GROUP BY a
[A tumble window inside groupby]
<Table API>val res = tab
.groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
<SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes ,
‘rowtime) [A row tumble window after OVER] <Table API>.groupby('a)
//optional .select(‘a, ‘b.count over rowTumble(10.minutes,
‘rowtime))<SQL>SELECT a,
COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a
Please let me know what you think.
Regards,Shaoxuan
------------------------------------------------------------------发件人
:Fabian
Hueske
<fhue...@gmail.com<mailto:fhue...@gmail.com>>发送时间:2016年9月26日(星期一)
21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> <
dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re: [DISCUSS]
FLIP-11: Table API Stream Aggregations Hi everybody,
Timo proposed our FLIP-11 a bit more than three weeks ago.
I will update the status of the FLIP to accepted.
Thanks,
Fabian
2016-09-19 9:16 GMT+02:00 Timo Walther <twal...@apache.org<mailto:twa
l...@apache.org>>:
Hi Jark,
yes I think enough time has passed. We can start implementing the
changes.
What do you think Fabian?
If there are no objections, I will create the subtasks in Jira today.
For
FLIP-11/1 I already have implemented a prototype, I just have to do
some refactoring/documentation before opening a PR.
Timo
Am 18/09/16 um 04:46 schrieb Jark Wu:
Hi all,
It seems that there’s no objections to the window design. So could
we open subtasks to start working on it now ?
- Jark Wu
在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com<mailto:
wuchong...@alibaba-inc.com>> 写道:
Hi Fabian,
Thanks for sharing your ideas.
They all make sense to me. Regarding to reassigning timestamp, I
do not have an use case. I come up with this because DataStream
has a TimestampAssigner :)
+1 for this FLIP.
- Jark Wu
在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com<mailto:fhue
s...@gmail.com> <mailto:
fhue...@gmail.com<mailto:fhue...@gmail.com>>> 写道:
Hi,
thanks for your comments and questions!
Actually, you are bringing up the points that Timo and I
discussed the most when designing the FLIP ;-)
- We also thought about the syntactic shortcut for running
aggregates like you proposed (table.groupBy(‘a).select(…)). Our
motivation to not allow this shortcut is to prevent users from
accidentally performing a "dangerous" operation. The problem
with unbounded sliding row-windows is that their state does
never expire. If you have an evolving key space, you will
likely run into problems at some point because the operator state
grows too large. IMO, a row-window session is a better approach,
because it defines a timeout after which state can be discarded.
groupBy.select is
a
very common operation in batch but its semantics in streaming
are very different. In my opinion it makes sense to make users
aware of these differences through the API.
- Reassigning timestamps and watermarks is a very delicate issue.
You
are
right, that Calcite exposes this field which is necessary due to
the semantics of SQL. However, also in Calcite you cannot freely
choose the timestamp attribute for streaming queries (it must be
a monotone or quasi-monotone attribute) which is hard to reason
about (and
guarantee)
after a few operators have been applied. Streaming tables in
Flink will likely have a time attribute which is identical to
the initial
rowtime.
However, Flink does modify timestamps internally, e.g., for
records that are emitted from time windows, in order to ensure
that consecutive windows perform as expected. Modify or
reassign timestamps in the middle of a job can result in
unexpected results which are very hard to reason about. Do you
have a concrete use case in mind for reassigning timestamps?
- The idea to represent rowtime and systime as object is good.
Our motivation to go for reserved Scala symbols was to have a
uniform syntax with windows over streaming and batch tables. On
batch tables you can compute time windows basically over every
time attribute (they are treated similar to grouping attributes
with a bit of extra logic to extract the grouping key for
sliding and session windows). If you write window(Tumble over
10.minutes on 'rowtime) on a streaming table, 'rowtime would
indicate event-time. On a batch table with a 'rowtime attribute,
the same operator would be internally converted into a group
by. By going for the object approach we would lose this
compatibility (or would need to introduce an additional column
attribute to specifiy the window attribute for batch tables).
As usual some of the design decisions are based on preferences.
Do they make sense to you? Let me know what you think.
Best, Fabian
2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com<ma
ilto:wuchong...@alibaba-inc.com> <mailto:
wuchong...@alibaba-inc.com<mailto:wuchong...@alibaba-inc.com>>>:
Hi all,
I'm on vacation for about five days , sorry to have missed this
great FLIP.
Yes, the non-windowed aggregates is a special case of row-window.
And
the
proposal looks really good. Can we have a simplified form for
the special case? Such as : table.groupBy(‘a).rowWindow(Sl
ideRows.unboundedPreceding).select(…)
can be simplified to table.groupBy(‘a).select(…). The latter
will actually call the former.
Another question is about the rowtime. As the FLIP said,
DataStream and StreamTableSource is responsible to assign
timestamps and watermarks, furthermore “rowtime” and
“systemtime” are not real column. IMO, it is different with
Calcite’s rowtime, which is a real column in the
table.
In
FLIP's way, we will lose some flexibility. Because the timestamp
column may
be created after some transformations or join operation, not
created at
beginning. So why do we have to define rowtime at beginning?
(because
of
watermark?) Can we have a way to define rowtime after source
table
like
TimestampAssinger?
Regarding to “allowLateness” method. I come up a trick that we can
make
‘rowtime and ‘system to be a Scala object, not a symbol expression.
The API
will looks like this :
window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
The implementation will look like this:
class TumblingWindow(size: Expression) extends Window {
def on(time: rowtime.type): TumblingEventTimeWindow =
new TumblingEventTimeWindow(alias, ‘rowtime, size) //
has
allowLateness() method
def on(time: systemtime.type): TumblingProcessingTimeWindow=
new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
// hasn’t allowLateness() method
}
object rowtime
object systemtime
What do you think about this?
- Jark Wu
在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org<mailto:twa
l...@apache.org> <mailto:
twal...@apache.org<mailto:twal...@apache.org>>> 写道:
Hi all,
I thought about the API of the FLIP again. If we allow the
"systemtime"
attribute, we cannot implement a nice method chaining where the
user
can
define a "allowLateness" only on event time. So even if the user
expressed
that "systemtime" is used we have to offer a "allowLateness"
method
because
we have to assume that this attribute can also be the batch event
time
column, which is not very nice.
class TumblingWindow(size: Expression) extends Window {
def on(timeField: Expression): TumblingEventTimeWindow =
new TumblingEventTimeWindow(alias, timeField, size) // has
allowLateness() method
}
What do you think?
Timo
Am 05/09/16 um 10:41 schrieb Fabian Hueske:
Hi Jark,
you had asked for non-windowed aggregates in the Table API a few
times.
FLIP-11 proposes row-window aggregates which are a
generalization of
running aggregates (SlideRow unboundedPreceding).
Can you have a look at the FLIP and give feedback whether this
is
what
you
are looking for?
Improvement suggestions are very welcome as well.
Thank you,
Fabian
2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo Walther <
twal...@apache.org<mailto:twal...@apache.org> <mailto:
twal...@apache.org<mailto:twal...@apache.org>>>:
Hi all!
Fabian and I worked on a FLIP for Stream Aggregations in the
Table
API.
You can find the FLIP-11 here:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
3A+Table+API+Stream+Aggregations
Motivation for the FLIP:
The Table API is a declarative API to define queries on static
and
streaming tables. So far, only projection, selection, and union
are
supported operations on streaming tables.
This FLIP proposes to add support for different types of
aggregations
on
top of streaming tables. In particular, we seek to support:
- Group-window aggregates, i.e., aggregates which are computed
for a
group
of elements. A (time or row-count) window is required to bound
the
infinite
input stream into a finite group.
- Row-window aggregates, i.e., aggregates which are computed
for
each
row,
based on a window (range) of preceding and succeeding rows.
Each type of aggregate shall be supported on keyed/grouped or
non-keyed/grouped data streams for streaming tables as well as
batch
tables.
We are looking forward to your feedback.
Timo
--
Freundliche Grüße / Kind Regards
Timo Walther
Follow me: @twalthr
https://www.linkedin.com/in/twalthr
<https://www.linkedin.com/in/t
walthr>
--
Freundliche Grüße / Kind Regards
Timo Walther
Follow me: @twalthr
https://www.linkedin.com/in/twalthr