Hey, Julian,

That's awesome! I read through all the examples and it is really easy to
express most of our use cases now! Thanks a lot!
I have just a few additional points here:

Q5. Aligned tumbling window

TUMBLE does not have an align argument, so you need to use HOP.

SELECT STREAM START(rowtime),
  COUNT(*)
FROM Orders
GROUP BY HOP(rowtime,
    INTERVAL '1' HOUR,
    INTERVAL '1' HOUR,
    TIME '0:30')

Emits a row at 00:30 containing rows in [23:30, 00:30);
emits a row at 01:30 containing rows in [00:30, 01:30), etc.

It would be easy to extend the TUMBLE to support align as well, right? This
is a small syntactic sugar but I think that it will be handy.

Q6. Decaying average

SELECT STREAM END(rowtime),
  productId,
  SUM(unitPrice * EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1'
HOUR))
   / SUM(EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1' HOUR))
FROM Orders
GROUP BY HOP(rowtime,
    INTERVAL '1' SECOND,
    INTERVAL '1' HOUR),
  productId

Emits a row at 00:00:00 containing rows in [23:00:00, 00:00:00);
emits a row at 00:00:01 containing rows in [23:00:01, 00:00:01).

So, START() and END() should not affect when the window result is emitted,
it is simply to signify the key that we can use to identify a window,
right? The question would be: how do I identify the window key from the
HOP() definition? Can I assume that the window key is applying START() /
END() to the first argument in HOP() definition? And is it reasonable to
make the assumption that the window key would be unique after applying
START()/END() function?

One last piece of puzzle: is it possible to introduce window definition
that support dynamic window length (e.g. a window defined by session ID and
is closed when no more messages in the same session coming for 3 minutes)?

Thanks a lot!

-Yi

On Thu, Jun 25, 2015 at 1:44 PM, Julian Hyde <jh...@apache.org> wrote:

> Glad you like it. I've filled out the spec with some more examples, see
> below.
>
> Here are the proposed functions:
>
> * HOP(t, emit, retain)
> * HOP(t, emit, retain, align)
> * TUMBLE(t, emit)
>
> TUMBLE(t, e) is equivalent to HOP(t, e, e).
>
> HOP(t, e, r) is equivalent to HOP(t, e, r, TIME '00:00:00').
>
> Q1. One hour tumbling window:
>
> SELECT STREAM START(rowtime),
>   COUNT(*)
> FROM Orders
> GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR)
>
> Emits a row at 01:00 containing rows in [00:00, 01:00);
> emits a row at 02:00 containing rows in [01:00, 02:00), etc.
>
> Q2. Same as Q1, expressed using HOP:
>
> SELECT STREAM START(rowtime),
>   COUNT(*)
> FROM Orders
> GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
>
> Q3. Hopping window
>
> SELECT STREAM START(rowtime),
>   COUNT(*)
> FROM Orders
> GROUP BY HOP(rowtime,
>     INTERVAL '30' MINUTE,
>     INTERVAL '1:45' HOUR TO MINUTE)
>
> Emits a row at 01:00 containing rows in [23:15, 01:00);
> emits a row at 01:30 containing rows in [23:45, 01:30), etc.
>
> Q4. Aligned tumbling window
>
> SELECT STREAM START(rowtime),
>   COUNT(*)
> FROM Orders
> GROUP BY HOP(rowtime,
>     INTERVAL '1:30' HOUR TO MINUTE,
>     INTERVAL '2' HOUR, TIME '0:30')
>
> Emits a row at 00:30 containing rows in [22:30, 00:30);
> emits a row at 02:00 containing rows in [00:00, 02:00), etc.
>
> Q5. Aligned tumbling window
>
> TUMBLE does not have an align argument, so you need to use HOP.
>
> SELECT STREAM START(rowtime),
>   COUNT(*)
> FROM Orders
> GROUP BY HOP(rowtime,
>     INTERVAL '1' HOUR,
>     INTERVAL '1' HOUR,
>     TIME '0:30')
>
> Emits a row at 00:30 containing rows in [23:30, 00:30);
> emits a row at 01:30 containing rows in [00:30, 01:30), etc.
>
> Q6. Decaying average
>
> SELECT STREAM END(rowtime),
>   productId,
>   SUM(unitPrice * EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1'
> HOUR))
>    / SUM(EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1' HOUR))
> FROM Orders
> GROUP BY HOP(rowtime,
>     INTERVAL '1' SECOND,
>     INTERVAL '1' HOUR),
>   productId
>
> Emits a row at 00:00:00 containing rows in [23:00:00, 00:00:00);
> emits a row at 00:00:01 containing rows in [23:00:01, 00:00:01).
>
> The expression weighs recent orders more heavily than older orders.
> Extending the window from 1 hour to 2 hours or 1 year would have
> virtually no effect on the accuracy of the result (but use more memory
> and compute).
>
> Note that we use START inside an aggregate function (SUM) because it
> is a value that is constant for all rows within a sub-total. This
> would not be allowed for typical aggregate functions (SUM, COUNT
> etc.). START and END behave more like the GROUPING() function in this
> regard.
>
> Q7. Non-streaming query
>
> HOP and TUMBLE were devised for a use case that occurs in streaming
> SQL, but they can be used in non-streaming queries. For example,
>
> SELECT START(rowtime),
>   COUNT(*)
> FROM Orders
> WHERE rowtime BETWEEN '2015-01-01 00:00:00'
>     AND '2015-01-18 00:00:00'
> GROUP BY TUMBLE(rowtime,
>     INTERVAL '2' HOUR)
>
> This is the same as Q1, but omits the STREAM keyword, so it means
> query the table containing historical orders.
>
> Q8. Grouping sets
>
> It should be possible to mix HOP and TUMBLE in with GROUPING SETS but
> I haven't devised an example.
>
> While we're on the subject of GROUPING SETS, I should state for the record:
> * GROUPING SETS is valid for a streaming query provided that every
> grouping set contains a monotonic expression.
> * CUBE and ROLLUP are not valid for streaming query, because they will
> produce at least one grouping set that aggregates everything (like
> "GROUP BY ()").
>
> Maybe we should allow CUBE and ROLLUP with an understanding that some
> levels of aggregation will never complete (because they have no
> monotonic expressions) and thus will never be emitted.
>
> Julian
>
> On Thu, Jun 25, 2015 at 7:32 AM, Milinda Pathirage
> <mpath...@umail.iu.edu> wrote:
> > Hi Julian,
> >
> > This is a great improvement over the previous hopping window. Thanks for
> > thinking through this. I also like if we can introduce a TUMBLE function
> > with more control over how we define tumbling window size. With the
> current
> > FLOOR based model we have to perform date/time arithmetic to have
> tumbling
> > windows such as 5 minutes tumbling windows (May be there is a better way
> > that I don't know). But TUMBLE function that can specify the parameters
> > such as window size would be nice. I am +1 for other extensions as well.
> >
> > Thanks
> > Milinda
> >
> > On Wed, Jun 24, 2015 at 6:32 PM, Julian Hyde <jh...@apache.org> wrote:
> >
> >> Hi all,
> >>
> >> Forgive the cross-post. This is for Calcite devs interested in
> >> streaming and Samza devs interested in SQL.
> >>
> >> I've been thinking some more about how to implement hopping and
> >> tumbling windows in streaming SQL. I was previously at a loss to find
> >> a concise syntax that is consistent with how SQL semantics, but I have
> >> found a syntax that I think can please everyone.
> >>
> >> Recall that a hopping window emits a sub-total every X seconds of
> >> records that have arrived over the last Y seconds. A tumbling window
> >> is a hopping window where X and Y are equal.
> >>
> >> In
> https://calcite.incubator.apache.org/docs/stream.html#hopping-windows
> >> I give an example, "emit, every hour, the number of each product
> >> ordered over the past three hours".
> >>
> >> That example gives a query in terms of a GROUP BY (in the HourlyTotals
> >> view) followed by a moving sum. I didn't think that it was possible to
> >> express using just one GROUP BY, because that would violate one of the
> >> principles of SQL: that each record entering a GROUP BY contributes to
> >> precisely one output record.
> >>
> >> But I've just realized that the CUBE, ROLLUP and GROUPING SETS
> >> operators (already in SQL) violate that principle. And if they can do
> >> it, we can do the same. So we could add another grouping function,
> >> HOP(t, emit, retain).
> >>
> >> The query would look like this:
> >>
> >> SELECT STREAM START(rowtime) AS rowtime,
> >>   productId,
> >>   SUM(units) AS sumUnits,
> >>   COUNT(*) AS c
> >> FROM Orders
> >> GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR),
> >>   productId
> >>
> >> Much nicer than the one in stream.html!
> >>
> >> The "trick" is that the HOP function is returning a list of rowtime
> >> values. For example, for row 1 {rowtime: '09:33', ...} it will return
> >> ['09:00', '10:00', '11:00']; for row 2 {rowtime: '10:05', ...} it will
> >> return ['10:00', '11:00', '12:00']. The system adds each row to
> >> several sub-totals, and emits each sub-total when it is complete. The
> >> sub-total for '09:00' will contain only row 1, and will be emitted at
> >> '10:00'; the sub-total for '10:00' will contain row 1 and row 2, and
> >> will be emitted at '11:00', and so forth.
> >>
> >> Returning multiple values is related to the flatMap function in Spark
> >> (and earlier selectMany in LINQ) and makes HOP's semantics similar to
> >> GROUPING SETS and therefore sound.
> >>
> >> START is a new aggregate function that returns the lower bound of the
> >> current sub-total; END similarly.
> >>
> >> Note that the "retain" argument does not need to be a whole multiple
> >> of the "emit" argument. This was a major limitation in the previous
> >> proposal.
> >>
> >> There are some straightforward extensions:
> >> * Define a TUMBLE function;
> >> * Add an "align" argument to HOP, to allow windows to start at, say, 5
> >> minutes past each hour;
> >> * Apply HOP to windows based on row-counts;
> >> * Allow user-defined windowing functions that similarly return a list
> >> of interval start-end points.
> >>
> >> Julian
> >>
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
>

Reply via email to