On Thu, Oct 20, 2016 at 5:55 AM Fabian Hueske <fhue...@gmail.com> wrote:
> Hi everybody, > > I cross posted the proposal also to the Apache Calcite dev mailing list to > collect some feedback from the community. > Tyler Akidau (Apache Beam committer) responded and commented on the > proposal. > > I am moving our conversion from Google Doc comments to the mailing list > with Tyler's consent to continue here. > > Tyler commented on this sentence: > > > "Result refinement does not affect the semantics of a query and should > therefore not be part of the query. Instead it is a property of the query > evaluation." > > Tyler: > --- > > I don't think I agree with this statement. For streams, the nature of > refinement is critical to defining their shape. > And for tables, as soon as you provide a parameter to configure when late > data are dropped, you've affected the semantics of the query. > As such, I would argue refinement is essential to defining query semantics > any time streams are involved, and thus a reasonable justification for > syntax extension, e.g. the EMIT WHEN proposal: > https://docs.google.com/document/d/1tSey4CeTrbb4VjWvtSA78OcU6BERXXDZ3t0HzSLij9Q/edit > > That said, extracting refinement semantics outside of the query seems like > a fair compromise for the case where you're trying to support streaming > within existing standard SQL syntax. > > Fabian: > --- > > Yes, I know. We discussed this issue when drafting this proposal. > When the query starts to drop late data, the computed result will only be > an approximation of the defined query result. That should be clearly > pointed out. > > Something that just came to my mind: shouldn't the watermark generation > mode also be part of the query definition? > Given the same query and same refinement and state cleanup configuration, > different watermark generation modes could lead to different results > because it essentially defines when data is late, right? > > Tyler: > --- > > Re watermark generation: that's a very good question. Watermarks are going > to be a characteristic of each table/stream, but I think there are two > places where the user might have a voice in what the watermarks look like: > > 1. Sources: Watermarks are typically established at ingress time into the > system. > It's very reasonable that you might want to observe a given table/stream > with different watermark strategies (e.g. a 100th %ile watermark when you > want accuracy at the cost of latency, & a 99th %ile watermark when you want > better latency at the cost of correctness). > But it's not clear to me that this choice should be part of the query, > aside from choosing the appropriate table/stream to query. Not all sources > are going to support all watermark modes. > So I'd tend to think it would make more sense for the watermark mode to be > a property of the source, defined outside of the query (as other source > metadata are, e.g. HBase table name for an HBase connector). > > 2. Grouping: Any time a grouping operation is performed (i.e. any time a > table is created from a stream, if using the semantics proposed in my doc > above), you can define how the grouping operation affects watermark > progress. In general, when grouping within a window, the range of valid > event times is [min timestamp of non-late data in the pane, ∞). In Beam, I > believe we're going to provide the choice of { MIN_NON_LATE_TIMESTAP, > MAX_NON_LATE_TIMESTAMP, END_OF_WINDOW }, since those are the most relevant > ones (& you can always safely move the timestamp beyond the end of the > window later on). This choice I do believe should be a part of query, as it > directly relates to the way conversion from a stream to a table is > happening, which will affect the shape of any stream derived from that > table later on. > > I think the existing (largely implicit) proposal here is actually that the > event time for a given window during grouping operations is defined by the > user when they specify the value of the rowtime column in the result. I > think the optimizer can figure out from that expression whether or not > rowtime will evaluate to something >= > MIN_TIMESTAMP_OF_ALL_NON_LATE_DATA_IN_THE_PANE. It might require a special > aggregator that knows how to ignore late records. But being able to have > the timestamp of an aggregate defined via the user-provided rowtime value > would probably be preferable to a syntax extension. > > Thoughts? > > ----------- > > > For Flink's Stream SQL our plans were to require the input stream to > provide watermarks, i.e. treat watermarks as a property of the source, just > as you described. > It definitely makes sense to have the same stream with different watermark > accuracy. In addition, the "Complete Result Offset" (as proposed in our > document) can also be used to tune the result accuracy by allowing to defer > the evaluation of a window. This would be similar to having watermarks with > higher accuracy. > Interesting. That makes me wonder if it really should be two separate concepts then: 1. Source watermark mode: basically a choice of the watermark algorithm used at the source. E.g. you could imagine different types of heuristics being made available for the same source. 2. Watermark trigger characteristics: a choice of the way a watermark would be applied at trigger time. E.g. 100%, 90%, 100% + offset, etc. Essentially variations on the watermark theme that can be applied to *any* watermark, regardless of source implementation. For the timestamp assignment of window operations, our plan so far was to > use the semantics of Flink's DataStream API which is to assign > END_OF_WINDOW. The timestamp would be addressable in a system column > "rowtime", which cannot be modified to ensure that users do not mess-up the > timestamp. > But you are right, we could allow to make the timestamp of window results > configurable, by explicitly defining a "rowtime" attribute in the output > just as in the examples of the document you shared: > > > *SELECT SUM(score), team, MIN(rowtime) AS rowtime // new rowtime is > minimum rowtime of grouped recordsFROM StreamGROUP BY TUMBLE(rowtime, > INTERVAL '1' HOUR), team* > > That would of course mean that we cannot build upon the DataStream API > windows and need to implement our own window operators (which might be > inevitable in any case). > I think the way to go is to start with the implicit END_OF_WINDOW and add > the possibility to configure timestamps later. > That sounds like a very reasonable way to go. -Tyler > > > Best, Fabian > > > 2016-10-18 16:43 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > > Hi everybody, > > at Flink Forward we had a BOF session about StreamSQL. > After the conference, some folks and I sat down and drafted a proposal for > Flink's StreamSQL semantics. > > --> > https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU > > The proposal includes: > - A definition for dynamic tables > - How to convert streams into dynamic tables > - How to convert dynamic tables into streams > - How to query dynamic tables > - Which types of queries to support > - How to specify early results, update rate, and late refinements > - How to control the size of the query state > - How to write query results to external systems (Kafka, files, Cassandra, > HBase, ...) > - How to make a query result accessible via Flink's queryable kv-state > - A few examples how StreamSQL queries can be defined > > The proposal does not include a workplan or task breakdown yet. > This is something I'll work on in the next days. > > Please share your thoughts and opinions about the proposal on the mailing > list. > > Thanks, > Fabian > > >