Hi Jark,
your code looks good and it also simplifies many parts. So the STREAM
keyword is not optional but invalid now, right? What happens if there is
keyword in the query?
Timo
Am 29/08/16 um 05:40 schrieb Jark Wu:
Hi Fabian, Timo,
I have created a prototype for removing STREAM keyword and using batch sql
parser for stream jobs.
This is the working brach: https://github.com/wuchong/flink/tree/remove-stream
<https://github.com/wuchong/flink/tree/remove-stream>
Looking forward to your feedback.
- Jark Wu
在 2016年8月24日,下午4:56,Fabian Hueske <fhue...@gmail.com> 写道:
Starting with a prototype would be great, Jark.
We had some trouble with Calcite's StreamableTable interface anyways. A few
things can be simplified if we do not declare our tables as streamable.
I would try to implement DataStreamTable (and all related classes and
methods) equivalent to DataSetTables if possible.
Best, Fabian
2016-08-24 6:27 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>:
Hi Fabian,
You are right, the main thing we need to change for removing STREAM
keyword is the table registration. If you would like, I can do a prototype.
Hi Timo,
I’m glad to contribute our work back to Flink. I will look into it and
create JIRAs next days.
- Jark Wu
在 2016年8月24日,上午12:13,Fabian Hueske <fhue...@gmail.com> 写道:
Hi Jark,
We can think about removing the STREAM keyword or not. In principle,
Calcite should allow the same windowing syntax on streaming and static
tables (this is one of the main goals of Calcite). The Table API can also
distinguish stream and batch without the STREAM keyword by looking at the
ExecutionEnvironment.
I think we would need to change the way that tables are registered in
Calcite's catalog and also add more validation (check that time windows
refer to a time column, etc).
A prototype should help to see what the consequence of removing the
STREAM
keyword (which is actually, changing the table registration, the parser
is
the same) would be.
Regarding streaming aggregates without window definition: We can
certainly
implement this feature in the Table API. There are a few points that need
to be considered like value expiration after a certain time of update
inactivity (otherwise the state might grow infinitely). But these aspects
should be rather easy to solve. I think for SQL, such running aggregates
are a special case of the Sliding Windows as discussed in Calcite's
StreamSQL document [1].
Thanks also for the document! I'll take that into account when sketching
the FLIP for streaming aggregation support.
Cheers, Fabian
[1] http://calcite.apache.org/docs/stream.html#sliding-windows
2016-08-23 13:09 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>:
Hi Fabian, Timo,
Sorry for the late response.
Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM
keyword and no agg-without-window. Which makes different syntax for
streaming and static tables. I don’t think Flink should have a custom
SQL
syntax, but it’s better to have a consistent syntax for batch and
streaming. Regarding window syntax , I think it’s good and reasonable to
follow Calcite’s syntax. Actually, we implement Blink SQL Window
following
Calcite’s syntax[1].
In addition, I describe the Blink SQL design including UDF, UDTF, UDAF,
Window in google doc[1]. Hope that can help for the upcoming Flink SQL
design.
+1 for creating FLIP
[1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb
buVFPZWBYuY1Ek
- Jark Wu
在 2016年8月23日,下午3:47,Fabian Hueske <fhue...@gmail.com> 写道:
Hi,
I did a bit of prototyping yesterday to check to what extend Calcite
supports window operations on streams if we would implement them for
the
Table API.
For the Table API we do not go through Calcite's SQL parser and
validator,
but generate the logical plan (tree of RelNodes) ourselves mostly using
Calcite's Relbuilder.
It turns out that Calcite does not restrict grouped aggregations on
streams
at this abstraction level, i.e., it does not perform any checks.
I think it should be possible to implement windowed aggregates for the
Table API. Once CALCITE-1345 [1] is implemented (and released),
windowed
aggregates are also supported by the SQL parser, validator, and
optimizer.
In order to make them work with our implementation we would need to
adapt
our solution to it (only internally), but I am sure we could reuse a
lot
of
our initial implementation (Table API, validation, execution).
I drafted an API proposal a few months ago [2] and could convert this
into
a FLIP to discuss the API and break it down into subtasks.
What do you think?
Cheers, Fabian
[1] https://issues.apache.org/jira/browse/CALCITE-1345
[2]
https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o
3AyCh2ePqr3V5E
2016-08-19 11:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
Hi Jark,
thanks for starting this discussion. Actually, I think we are rather
"blocked" on the internal handling of streaming windows in Calcite
than
the
SQL parser. IMO, it should be possible to exchange or modify the
parser
if
we want that.
Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword,
Calcite closely follows the SQL standard (e.g.,no special keywords
like
WINDOW. Instead stream specific aspects like tumbling windows are done
as
functions such as TUMBLE [1]). One main motivation of the Calcite
community
is to have the same syntax for streaming and static tables. This
includes
support for tables which are static and streaming at the same time
(the
example of [1] is a table about orders to which new order records are
added). When querying such a table, the STREAM keyword is required to
distinguish the cases of a batch query which returns a result set and
a
standing query which returns a result stream. In the context of Flink
we
can can do the distinction using the type of the TableEnvironment. So
we
could use the batch parser, but would need to change a couple things
internally and add checks for proper grouping on the timestamp column
when
doing windows, etc. So far the discussion about the StreamSQL syntax
rather
focused on the question whether 1) StreamSQL should follow the SQL
standard
(as Calcite proposes) or 2) whether Flink should use a custom syntax
with
stream specific features. For instance a tumbling window is expressed
in
the GROUP BY clause [1] when following standard SQL but it could be
defined
using a special WINDOW keyword in a custom StreamSQL dialect.
You are right that we have a dependency on Calcite. However, I think
this
dependency is rather in the internals than the parser, i.e., how does
the
validator/optimizer support and handle monotone / quasi-monotone
attributes
and windows. I am not sure how much is already supported but the
Calcite
community is working on this [2]. I think we need these features in
Calcite
unless we want to completely remove our dependency on Calcite for
StreamSQL. I would not be in favor of removing Calcite at this point.
We
put a lot of effort into refactoring the Table API internals. Instead
we
should start to talk to the Calcite community and see how far they
are,
what is missing, and how we can help.
I will start a discussion on the Calcite dev mailing list in the next
days
and ask about the status of StreamSQL.
Best,
Fabian
[1] http://calcite.apache.org/docs/stream.html#tumbling-
windows-improved
[2] https://issues.apache.org/jira/browse/CALCITE-1345
--
Freundliche Grüße / Kind Regards
Timo Walther
Follow me: @twalthr
https://www.linkedin.com/in/twalthr