Hi everybody, I cross posted the proposal also to the Apache Calcite dev mailing list to collect some feedback from the community. Tyler Akidau (Apache Beam committer) responded and commented on the proposal.
I am moving our conversion from Google Doc comments to the mailing list with Tyler's consent to continue here. Tyler commented on this sentence: > "Result refinement does not affect the semantics of a query and should therefore not be part of the query. Instead it is a property of the query evaluation." Tyler: --- I don't think I agree with this statement. For streams, the nature of refinement is critical to defining their shape. And for tables, as soon as you provide a parameter to configure when late data are dropped, you've affected the semantics of the query. As such, I would argue refinement is essential to defining query semantics any time streams are involved, and thus a reasonable justification for syntax extension, e.g. the EMIT WHEN proposal: https://docs.google.com/do cument/d/1tSey4CeTrbb4VjWvtSA78OcU6BERXXDZ3t0HzSLij9Q/edit That said, extracting refinement semantics outside of the query seems like a fair compromise for the case where you're trying to support streaming within existing standard SQL syntax. Fabian: --- Yes, I know. We discussed this issue when drafting this proposal. When the query starts to drop late data, the computed result will only be an approximation of the defined query result. That should be clearly pointed out. Something that just came to my mind: shouldn't the watermark generation mode also be part of the query definition? Given the same query and same refinement and state cleanup configuration, different watermark generation modes could lead to different results because it essentially defines when data is late, right? Tyler: --- Re watermark generation: that's a very good question. Watermarks are going to be a characteristic of each table/stream, but I think there are two places where the user might have a voice in what the watermarks look like: 1. Sources: Watermarks are typically established at ingress time into the system. It's very reasonable that you might want to observe a given table/stream with different watermark strategies (e.g. a 100th %ile watermark when you want accuracy at the cost of latency, & a 99th %ile watermark when you want better latency at the cost of correctness). But it's not clear to me that this choice should be part of the query, aside from choosing the appropriate table/stream to query. Not all sources are going to support all watermark modes. So I'd tend to think it would make more sense for the watermark mode to be a property of the source, defined outside of the query (as other source metadata are, e.g. HBase table name for an HBase connector). 2. Grouping: Any time a grouping operation is performed (i.e. any time a table is created from a stream, if using the semantics proposed in my doc above), you can define how the grouping operation affects watermark progress. In general, when grouping within a window, the range of valid event times is [min timestamp of non-late data in the pane, ∞). In Beam, I believe we're going to provide the choice of { MIN_NON_LATE_TIMESTAP, MAX_NON_LATE_TIMESTAMP, END_OF_WINDOW }, since those are the most relevant ones (& you can always safely move the timestamp beyond the end of the window later on). This choice I do believe should be a part of query, as it directly relates to the way conversion from a stream to a table is happening, which will affect the shape of any stream derived from that table later on. I think the existing (largely implicit) proposal here is actually that the event time for a given window during grouping operations is defined by the user when they specify the value of the rowtime column in the result. I think the optimizer can figure out from that expression whether or not rowtime will evaluate to something >= MIN_TIMESTAMP_OF_ALL_NON_LATE_DATA_IN_THE_PANE. It might require a special aggregator that knows how to ignore late records. But being able to have the timestamp of an aggregate defined via the user-provided rowtime value would probably be preferable to a syntax extension. Thoughts? ----------- For Flink's Stream SQL our plans were to require the input stream to provide watermarks, i.e. treat watermarks as a property of the source, just as you described. It definitely makes sense to have the same stream with different watermark accuracy. In addition, the "Complete Result Offset" (as proposed in our document) can also be used to tune the result accuracy by allowing to defer the evaluation of a window. This would be similar to having watermarks with higher accuracy. For the timestamp assignment of window operations, our plan so far was to use the semantics of Flink's DataStream API which is to assign END_OF_WINDOW. The timestamp would be addressable in a system column "rowtime", which cannot be modified to ensure that users do not mess-up the timestamp. But you are right, we could allow to make the timestamp of window results configurable, by explicitly defining a "rowtime" attribute in the output just as in the examples of the document you shared: *SELECT SUM(score), team, MIN(rowtime) AS rowtime // new rowtime is minimum rowtime of grouped recordsFROM StreamGROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), team* That would of course mean that we cannot build upon the DataStream API windows and need to implement our own window operators (which might be inevitable in any case). I think the way to go is to start with the implicit END_OF_WINDOW and add the possibility to configure timestamps later. Best, Fabian 2016-10-18 16:43 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi everybody, > > at Flink Forward we had a BOF session about StreamSQL. > After the conference, some folks and I sat down and drafted a proposal for > Flink's StreamSQL semantics. > > --> https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQ > PW4tnl8THw6rzGUdaqU > > The proposal includes: > - A definition for dynamic tables > - How to convert streams into dynamic tables > - How to convert dynamic tables into streams > - How to query dynamic tables > - Which types of queries to support > - How to specify early results, update rate, and late refinements > - How to control the size of the query state > - How to write query results to external systems (Kafka, files, Cassandra, > HBase, ...) > - How to make a query result accessible via Flink's queryable kv-state > - A few examples how StreamSQL queries can be defined > > The proposal does not include a workplan or task breakdown yet. > This is something I'll work on in the next days. > > Please share your thoughts and opinions about the proposal on the mailing > list. > > Thanks, > Fabian >