On Thu, Oct 20, 2016 at 5:55 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi everybody,
>
> I cross posted the proposal also to the Apache Calcite dev mailing list to
> collect some feedback from the community.
> Tyler Akidau (Apache Beam committer) responded and commented on the
> proposal.
>
> I am moving our conversion from Google Doc comments to the mailing list
> with Tyler's consent to continue here.
>
> Tyler commented on this sentence:
>
> > "Result refinement does not affect the semantics of a query and should
> therefore not be part of the query. Instead it is a property of the query
> evaluation."
>
> Tyler:
> ---
>
> I don't think I agree with this statement. For streams, the nature of
> refinement is critical to defining their shape.
> And for tables, as soon as you provide a parameter to configure when late
> data are dropped, you've affected the semantics of the query.
> As such, I would argue refinement is essential to defining query semantics
> any time streams are involved, and thus a reasonable justification for
> syntax extension, e.g. the EMIT WHEN proposal:
> https://docs.google.com/document/d/1tSey4CeTrbb4VjWvtSA78OcU6BERXXDZ3t0HzSLij9Q/edit
>
> That said, extracting refinement semantics outside of the query seems like
> a fair compromise for the case where you're trying to support streaming
> within existing standard SQL syntax.
>
> Fabian:
> ---
>
> Yes, I know. We discussed this issue when drafting this proposal.
> When the query starts to drop late data, the computed result will only be
> an approximation of the defined query result. That should be clearly
> pointed out.
>
> Something that just came to my mind: shouldn't the watermark generation
> mode also be part of the query definition?
> Given the same query and same refinement and state cleanup configuration,
> different watermark generation modes could lead to different results
> because it essentially defines when data is late, right?
>
> Tyler:
> ---
>
> Re watermark generation: that's a very good question. Watermarks are going
> to be a characteristic of each table/stream, but I think there are two
> places where the user might have a voice in what the watermarks look like:
>
> 1. Sources: Watermarks are typically established at ingress time into the
> system.
> It's very reasonable that you might want to observe a given table/stream
> with different watermark strategies (e.g. a 100th %ile watermark when you
> want accuracy at the cost of latency, & a 99th %ile watermark when you want
> better latency at the cost of correctness).
> But it's not clear to me that this choice should be part of the query,
> aside from choosing the appropriate table/stream to query. Not all sources
> are going to support all watermark modes.
> So I'd tend to think it would make more sense for the watermark mode to be
> a property of the source, defined outside of the query (as other source
> metadata are, e.g. HBase table name for an HBase connector).
>
> 2. Grouping: Any time a grouping operation is performed (i.e. any time a
> table is created from a stream, if using the semantics proposed in my doc
> above), you can define how the grouping operation affects watermark
> progress. In general, when grouping within a window, the range of valid
> event times is [min timestamp of non-late data in the pane, ∞). In Beam, I
> believe we're going to provide the choice of { MIN_NON_LATE_TIMESTAP,
> MAX_NON_LATE_TIMESTAMP, END_OF_WINDOW }, since those are the most relevant
> ones (& you can always safely move the timestamp beyond the end of the
> window later on). This choice I do believe should be a part of query, as it
> directly relates to the way conversion from a stream to a table is
> happening, which will affect the shape of any stream derived from that
> table later on.
>
> I think the existing (largely implicit) proposal here is actually that the
> event time for a given window during grouping operations is defined by the
> user when they specify the value of the rowtime column in the result. I
> think the optimizer can figure out from that expression whether or not
> rowtime will evaluate to something >=
> MIN_TIMESTAMP_OF_ALL_NON_LATE_DATA_IN_THE_PANE. It might require a special
> aggregator that knows how to ignore late records. But being able to have
> the timestamp of an aggregate defined via the user-provided rowtime value
> would probably be preferable to a syntax extension.
>
> Thoughts?
>
> -----------
>
>
> For Flink's Stream SQL our plans were to require the input stream to
> provide watermarks, i.e. treat watermarks as a property of the source, just
> as you described.
> It definitely makes sense to have the same stream with different watermark
> accuracy. In addition, the "Complete Result Offset" (as proposed in our
> document) can also be used to tune the result accuracy by allowing to defer
> the evaluation of a window. This would be similar to having watermarks with
> higher accuracy.
>

Interesting. That makes me wonder if it really should be two separate
concepts then:

   1. Source watermark mode: basically a choice of the watermark algorithm
   used at the source. E.g. you could imagine different types of heuristics
   being made available for the same source.

   2. Watermark trigger characteristics: a choice of the way a watermark
   would be applied at trigger time. E.g. 100%, 90%, 100% + offset, etc.
   Essentially variations on the watermark theme that can be applied to
   *any* watermark, regardless of source implementation.

For the timestamp assignment of window operations, our plan so far was to
> use the semantics of Flink's DataStream API which is to assign
> END_OF_WINDOW. The timestamp would be addressable in a system column
> "rowtime", which cannot be modified to ensure that users do not mess-up the
> timestamp.
> But you are right, we could allow to make the timestamp of window results
> configurable, by explicitly defining a "rowtime" attribute in the output
> just as in the examples of the document you shared:
>
>
> *SELECT SUM(score), team, MIN(rowtime) AS rowtime // new rowtime is
> minimum rowtime of grouped recordsFROM StreamGROUP BY TUMBLE(rowtime,
> INTERVAL '1' HOUR), team*
>
> That would of course mean that we cannot build upon the DataStream API
> windows and need to implement our own window operators (which might be
> inevitable in any case).
> I think the way to go is to start with the implicit END_OF_WINDOW and add
> the possibility to configure timestamps later.
>

That sounds like a very reasonable way to go.

-Tyler


>
>
> Best, Fabian
>
>
> 2016-10-18 16:43 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
> Hi everybody,
>
> at Flink Forward we had a BOF session about StreamSQL.
> After the conference, some folks and I sat down and drafted a proposal for
> Flink's StreamSQL semantics.
>
> -->
> https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU
>
> The proposal includes:
> - A definition for dynamic tables
> - How to convert streams into dynamic tables
> - How to convert dynamic tables into streams
> - How to query dynamic tables
> - Which types of queries to support
> - How to specify early results, update rate, and late refinements
> - How to control the size of the query state
> - How to write query results to external systems (Kafka, files, Cassandra,
> HBase, ...)
> - How to make a query result accessible via Flink's queryable kv-state
> - A few examples how StreamSQL queries can be defined
>
> The proposal does not include a workplan or task breakdown yet.
> This is something I'll work on in the next days.
>
> Please share your thoughts and opinions about the proposal on the mailing
> list.
>
> Thanks,
> Fabian
>
>
>

Reply via email to