Hi Fabian, Timo, I have created a prototype for removing STREAM keyword and using batch sql parser for stream jobs.
This is the working brach: https://github.com/wuchong/flink/tree/remove-stream <https://github.com/wuchong/flink/tree/remove-stream> Looking forward to your feedback. - Jark Wu > 在 2016年8月24日,下午4:56,Fabian Hueske <fhue...@gmail.com> 写道: > > Starting with a prototype would be great, Jark. > We had some trouble with Calcite's StreamableTable interface anyways. A few > things can be simplified if we do not declare our tables as streamable. > I would try to implement DataStreamTable (and all related classes and > methods) equivalent to DataSetTables if possible. > > Best, Fabian > > 2016-08-24 6:27 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>: > >> Hi Fabian, >> >> You are right, the main thing we need to change for removing STREAM >> keyword is the table registration. If you would like, I can do a prototype. >> >> Hi Timo, >> >> I’m glad to contribute our work back to Flink. I will look into it and >> create JIRAs next days. >> >> - Jark Wu >> >>> 在 2016年8月24日,上午12:13,Fabian Hueske <fhue...@gmail.com> 写道: >>> >>> Hi Jark, >>> >>> We can think about removing the STREAM keyword or not. In principle, >>> Calcite should allow the same windowing syntax on streaming and static >>> tables (this is one of the main goals of Calcite). The Table API can also >>> distinguish stream and batch without the STREAM keyword by looking at the >>> ExecutionEnvironment. >>> I think we would need to change the way that tables are registered in >>> Calcite's catalog and also add more validation (check that time windows >>> refer to a time column, etc). >>> A prototype should help to see what the consequence of removing the >> STREAM >>> keyword (which is actually, changing the table registration, the parser >> is >>> the same) would be. >>> >>> Regarding streaming aggregates without window definition: We can >> certainly >>> implement this feature in the Table API. There are a few points that need >>> to be considered like value expiration after a certain time of update >>> inactivity (otherwise the state might grow infinitely). But these aspects >>> should be rather easy to solve. I think for SQL, such running aggregates >>> are a special case of the Sliding Windows as discussed in Calcite's >>> StreamSQL document [1]. >>> >>> Thanks also for the document! I'll take that into account when sketching >>> the FLIP for streaming aggregation support. >>> >>> Cheers, Fabian >>> >>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows >>> >>> 2016-08-23 13:09 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>: >>> >>>> Hi Fabian, Timo, >>>> >>>> Sorry for the late response. >>>> >>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >>>> keyword and no agg-without-window. Which makes different syntax for >>>> streaming and static tables. I don’t think Flink should have a custom >> SQL >>>> syntax, but it’s better to have a consistent syntax for batch and >>>> streaming. Regarding window syntax , I think it’s good and reasonable to >>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window >> following >>>> Calcite’s syntax[1]. >>>> >>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >>>> design. >>>> >>>> +1 for creating FLIP >>>> >>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >>>> buVFPZWBYuY1Ek >>>> >>>> >>>> - Jark Wu >>>> >>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <fhue...@gmail.com> 写道: >>>>> >>>>> Hi, >>>>> >>>>> I did a bit of prototyping yesterday to check to what extend Calcite >>>>> supports window operations on streams if we would implement them for >> the >>>>> Table API. >>>>> For the Table API we do not go through Calcite's SQL parser and >>>> validator, >>>>> but generate the logical plan (tree of RelNodes) ourselves mostly using >>>>> Calcite's Relbuilder. >>>>> It turns out that Calcite does not restrict grouped aggregations on >>>> streams >>>>> at this abstraction level, i.e., it does not perform any checks. >>>>> >>>>> I think it should be possible to implement windowed aggregates for the >>>>> Table API. Once CALCITE-1345 [1] is implemented (and released), >> windowed >>>>> aggregates are also supported by the SQL parser, validator, and >>>> optimizer. >>>>> In order to make them work with our implementation we would need to >> adapt >>>>> our solution to it (only internally), but I am sure we could reuse a >> lot >>>> of >>>>> our initial implementation (Table API, validation, execution). >>>>> >>>>> I drafted an API proposal a few months ago [2] and could convert this >>>> into >>>>> a FLIP to discuss the API and break it down into subtasks. >>>>> >>>>> What do you think? >>>>> >>>>> Cheers, Fabian >>>>> >>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>> [2] >>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >>>> 3AyCh2ePqr3V5E >>>>> >>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>>>> >>>>>> Hi Jark, >>>>>> >>>>>> thanks for starting this discussion. Actually, I think we are rather >>>>>> "blocked" on the internal handling of streaming windows in Calcite >> than >>>> the >>>>>> SQL parser. IMO, it should be possible to exchange or modify the >> parser >>>> if >>>>>> we want that. >>>>>> >>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords >> like >>>>>> WINDOW. Instead stream specific aspects like tumbling windows are done >>>> as >>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite >>>> community >>>>>> is to have the same syntax for streaming and static tables. This >>>> includes >>>>>> support for tables which are static and streaming at the same time >> (the >>>>>> example of [1] is a table about orders to which new order records are >>>>>> added). When querying such a table, the STREAM keyword is required to >>>>>> distinguish the cases of a batch query which returns a result set and >> a >>>>>> standing query which returns a result stream. In the context of Flink >> we >>>>>> can can do the distinction using the type of the TableEnvironment. So >> we >>>>>> could use the batch parser, but would need to change a couple things >>>>>> internally and add checks for proper grouping on the timestamp column >>>> when >>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax >>>> rather >>>>>> focused on the question whether 1) StreamSQL should follow the SQL >>>> standard >>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >>>> with >>>>>> stream specific features. For instance a tumbling window is expressed >> in >>>>>> the GROUP BY clause [1] when following standard SQL but it could be >>>> defined >>>>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>>>> >>>>>> You are right that we have a dependency on Calcite. However, I think >>>> this >>>>>> dependency is rather in the internals than the parser, i.e., how does >>>> the >>>>>> validator/optimizer support and handle monotone / quasi-monotone >>>> attributes >>>>>> and windows. I am not sure how much is already supported but the >> Calcite >>>>>> community is working on this [2]. I think we need these features in >>>> Calcite >>>>>> unless we want to completely remove our dependency on Calcite for >>>>>> StreamSQL. I would not be in favor of removing Calcite at this point. >> We >>>>>> put a lot of effort into refactoring the Table API internals. Instead >> we >>>>>> should start to talk to the Calcite community and see how far they >> are, >>>>>> what is missing, and how we can help. >>>>>> >>>>>> I will start a discussion on the Calcite dev mailing list in the next >>>> days >>>>>> and ask about the status of StreamSQL. >>>>>> >>>>>> Best, >>>>>> Fabian >>>>>> >>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >>>> windows-improved >>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>> >>>> >>>> >> >>