Starting with a prototype would be great, Jark. We had some trouble with Calcite's StreamableTable interface anyways. A few things can be simplified if we do not declare our tables as streamable. I would try to implement DataStreamTable (and all related classes and methods) equivalent to DataSetTables if possible.
Best, Fabian 2016-08-24 6:27 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>: > Hi Fabian, > > You are right, the main thing we need to change for removing STREAM > keyword is the table registration. If you would like, I can do a prototype. > > Hi Timo, > > I’m glad to contribute our work back to Flink. I will look into it and > create JIRAs next days. > > - Jark Wu > > > 在 2016年8月24日,上午12:13,Fabian Hueske <fhue...@gmail.com> 写道: > > > > Hi Jark, > > > > We can think about removing the STREAM keyword or not. In principle, > > Calcite should allow the same windowing syntax on streaming and static > > tables (this is one of the main goals of Calcite). The Table API can also > > distinguish stream and batch without the STREAM keyword by looking at the > > ExecutionEnvironment. > > I think we would need to change the way that tables are registered in > > Calcite's catalog and also add more validation (check that time windows > > refer to a time column, etc). > > A prototype should help to see what the consequence of removing the > STREAM > > keyword (which is actually, changing the table registration, the parser > is > > the same) would be. > > > > Regarding streaming aggregates without window definition: We can > certainly > > implement this feature in the Table API. There are a few points that need > > to be considered like value expiration after a certain time of update > > inactivity (otherwise the state might grow infinitely). But these aspects > > should be rather easy to solve. I think for SQL, such running aggregates > > are a special case of the Sliding Windows as discussed in Calcite's > > StreamSQL document [1]. > > > > Thanks also for the document! I'll take that into account when sketching > > the FLIP for streaming aggregation support. > > > > Cheers, Fabian > > > > [1] http://calcite.apache.org/docs/stream.html#sliding-windows > > > > 2016-08-23 13:09 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>: > > > >> Hi Fabian, Timo, > >> > >> Sorry for the late response. > >> > >> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM > >> keyword and no agg-without-window. Which makes different syntax for > >> streaming and static tables. I don’t think Flink should have a custom > SQL > >> syntax, but it’s better to have a consistent syntax for batch and > >> streaming. Regarding window syntax , I think it’s good and reasonable to > >> follow Calcite’s syntax. Actually, we implement Blink SQL Window > following > >> Calcite’s syntax[1]. > >> > >> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, > >> Window in google doc[1]. Hope that can help for the upcoming Flink SQL > >> design. > >> > >> +1 for creating FLIP > >> > >> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb > >> buVFPZWBYuY1Ek > >> > >> > >> - Jark Wu > >> > >>> 在 2016年8月23日,下午3:47,Fabian Hueske <fhue...@gmail.com> 写道: > >>> > >>> Hi, > >>> > >>> I did a bit of prototyping yesterday to check to what extend Calcite > >>> supports window operations on streams if we would implement them for > the > >>> Table API. > >>> For the Table API we do not go through Calcite's SQL parser and > >> validator, > >>> but generate the logical plan (tree of RelNodes) ourselves mostly using > >>> Calcite's Relbuilder. > >>> It turns out that Calcite does not restrict grouped aggregations on > >> streams > >>> at this abstraction level, i.e., it does not perform any checks. > >>> > >>> I think it should be possible to implement windowed aggregates for the > >>> Table API. Once CALCITE-1345 [1] is implemented (and released), > windowed > >>> aggregates are also supported by the SQL parser, validator, and > >> optimizer. > >>> In order to make them work with our implementation we would need to > adapt > >>> our solution to it (only internally), but I am sure we could reuse a > lot > >> of > >>> our initial implementation (Table API, validation, execution). > >>> > >>> I drafted an API proposal a few months ago [2] and could convert this > >> into > >>> a FLIP to discuss the API and break it down into subtasks. > >>> > >>> What do you think? > >>> > >>> Cheers, Fabian > >>> > >>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 > >>> [2] > >>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o > >> 3AyCh2ePqr3V5E > >>> > >>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >>> > >>>> Hi Jark, > >>>> > >>>> thanks for starting this discussion. Actually, I think we are rather > >>>> "blocked" on the internal handling of streaming windows in Calcite > than > >> the > >>>> SQL parser. IMO, it should be possible to exchange or modify the > parser > >> if > >>>> we want that. > >>>> > >>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, > >>>> Calcite closely follows the SQL standard (e.g.,no special keywords > like > >>>> WINDOW. Instead stream specific aspects like tumbling windows are done > >> as > >>>> functions such as TUMBLE [1]). One main motivation of the Calcite > >> community > >>>> is to have the same syntax for streaming and static tables. This > >> includes > >>>> support for tables which are static and streaming at the same time > (the > >>>> example of [1] is a table about orders to which new order records are > >>>> added). When querying such a table, the STREAM keyword is required to > >>>> distinguish the cases of a batch query which returns a result set and > a > >>>> standing query which returns a result stream. In the context of Flink > we > >>>> can can do the distinction using the type of the TableEnvironment. So > we > >>>> could use the batch parser, but would need to change a couple things > >>>> internally and add checks for proper grouping on the timestamp column > >> when > >>>> doing windows, etc. So far the discussion about the StreamSQL syntax > >> rather > >>>> focused on the question whether 1) StreamSQL should follow the SQL > >> standard > >>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax > >> with > >>>> stream specific features. For instance a tumbling window is expressed > in > >>>> the GROUP BY clause [1] when following standard SQL but it could be > >> defined > >>>> using a special WINDOW keyword in a custom StreamSQL dialect. > >>>> > >>>> You are right that we have a dependency on Calcite. However, I think > >> this > >>>> dependency is rather in the internals than the parser, i.e., how does > >> the > >>>> validator/optimizer support and handle monotone / quasi-monotone > >> attributes > >>>> and windows. I am not sure how much is already supported but the > Calcite > >>>> community is working on this [2]. I think we need these features in > >> Calcite > >>>> unless we want to completely remove our dependency on Calcite for > >>>> StreamSQL. I would not be in favor of removing Calcite at this point. > We > >>>> put a lot of effort into refactoring the Table API internals. Instead > we > >>>> should start to talk to the Calcite community and see how far they > are, > >>>> what is missing, and how we can help. > >>>> > >>>> I will start a discussion on the Calcite dev mailing list in the next > >> days > >>>> and ask about the status of StreamSQL. > >>>> > >>>> Best, > >>>> Fabian > >>>> > >>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- > >> windows-improved > >>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 > >>>> > >> > >> > >