Hey, Julian, That's awesome! I read through all the examples and it is really easy to express most of our use cases now! Thanks a lot!
I have just a few additional points here: Q5. Aligned tumbling window TUMBLE does not have an align argument, so you need to use HOP. SELECT STREAM START(rowtime), COUNT(*) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR, TIME '0:30') Emits a row at 00:30 containing rows in [23:30, 00:30); emits a row at 01:30 containing rows in [00:30, 01:30), etc. It would be easy to extend the TUMBLE to support align as well, right? This is a small syntactic sugar but I think that it will be handy. Q6. Decaying average SELECT STREAM END(rowtime), productId, SUM(unitPrice * EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1' HOUR)) / SUM(EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1' HOUR)) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' HOUR), productId Emits a row at 00:00:00 containing rows in [23:00:00, 00:00:00); emits a row at 00:00:01 containing rows in [23:00:01, 00:00:01). So, START() and END() should not affect when the window result is emitted, it is simply to signify the key that we can use to identify a window, right? The question would be: how do I identify the window key from the HOP() definition? Can I assume that the window key is applying START() / END() to the first argument in HOP() definition? And is it reasonable to make the assumption that the window key would be unique after applying START()/END() function? One last piece of puzzle: is it possible to introduce window definition that support dynamic window length (e.g. a window defined by session ID and is closed when no more messages in the same session coming for 3 minutes)? Thanks a lot! -Yi On Thu, Jun 25, 2015 at 1:44 PM, Julian Hyde <jh...@apache.org> wrote: > Glad you like it. I've filled out the spec with some more examples, see > below. > > Here are the proposed functions: > > * HOP(t, emit, retain) > * HOP(t, emit, retain, align) > * TUMBLE(t, emit) > > TUMBLE(t, e) is equivalent to HOP(t, e, e). > > HOP(t, e, r) is equivalent to HOP(t, e, r, TIME '00:00:00'). > > Q1. One hour tumbling window: > > SELECT STREAM START(rowtime), > COUNT(*) > FROM Orders > GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR) > > Emits a row at 01:00 containing rows in [00:00, 01:00); > emits a row at 02:00 containing rows in [01:00, 02:00), etc. > > Q2. Same as Q1, expressed using HOP: > > SELECT STREAM START(rowtime), > COUNT(*) > FROM Orders > GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) > > Q3. Hopping window > > SELECT STREAM START(rowtime), > COUNT(*) > FROM Orders > GROUP BY HOP(rowtime, > INTERVAL '30' MINUTE, > INTERVAL '1:45' HOUR TO MINUTE) > > Emits a row at 01:00 containing rows in [23:15, 01:00); > emits a row at 01:30 containing rows in [23:45, 01:30), etc. > > Q4. Aligned tumbling window > > SELECT STREAM START(rowtime), > COUNT(*) > FROM Orders > GROUP BY HOP(rowtime, > INTERVAL '1:30' HOUR TO MINUTE, > INTERVAL '2' HOUR, TIME '0:30') > > Emits a row at 00:30 containing rows in [22:30, 00:30); > emits a row at 02:00 containing rows in [00:00, 02:00), etc. > > Q5. Aligned tumbling window > > TUMBLE does not have an align argument, so you need to use HOP. > > SELECT STREAM START(rowtime), > COUNT(*) > FROM Orders > GROUP BY HOP(rowtime, > INTERVAL '1' HOUR, > INTERVAL '1' HOUR, > TIME '0:30') > > Emits a row at 00:30 containing rows in [23:30, 00:30); > emits a row at 01:30 containing rows in [00:30, 01:30), etc. > > Q6. Decaying average > > SELECT STREAM END(rowtime), > productId, > SUM(unitPrice * EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1' > HOUR)) > / SUM(EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1' HOUR)) > FROM Orders > GROUP BY HOP(rowtime, > INTERVAL '1' SECOND, > INTERVAL '1' HOUR), > productId > > Emits a row at 00:00:00 containing rows in [23:00:00, 00:00:00); > emits a row at 00:00:01 containing rows in [23:00:01, 00:00:01). > > The expression weighs recent orders more heavily than older orders. > Extending the window from 1 hour to 2 hours or 1 year would have > virtually no effect on the accuracy of the result (but use more memory > and compute). > > Note that we use START inside an aggregate function (SUM) because it > is a value that is constant for all rows within a sub-total. This > would not be allowed for typical aggregate functions (SUM, COUNT > etc.). START and END behave more like the GROUPING() function in this > regard. > > Q7. Non-streaming query > > HOP and TUMBLE were devised for a use case that occurs in streaming > SQL, but they can be used in non-streaming queries. For example, > > SELECT START(rowtime), > COUNT(*) > FROM Orders > WHERE rowtime BETWEEN '2015-01-01 00:00:00' > AND '2015-01-18 00:00:00' > GROUP BY TUMBLE(rowtime, > INTERVAL '2' HOUR) > > This is the same as Q1, but omits the STREAM keyword, so it means > query the table containing historical orders. > > Q8. Grouping sets > > It should be possible to mix HOP and TUMBLE in with GROUPING SETS but > I haven't devised an example. > > While we're on the subject of GROUPING SETS, I should state for the record: > * GROUPING SETS is valid for a streaming query provided that every > grouping set contains a monotonic expression. > * CUBE and ROLLUP are not valid for streaming query, because they will > produce at least one grouping set that aggregates everything (like > "GROUP BY ()"). > > Maybe we should allow CUBE and ROLLUP with an understanding that some > levels of aggregation will never complete (because they have no > monotonic expressions) and thus will never be emitted. > > Julian > > On Thu, Jun 25, 2015 at 7:32 AM, Milinda Pathirage > <mpath...@umail.iu.edu> wrote: > > Hi Julian, > > > > This is a great improvement over the previous hopping window. Thanks for > > thinking through this. I also like if we can introduce a TUMBLE function > > with more control over how we define tumbling window size. With the > current > > FLOOR based model we have to perform date/time arithmetic to have > tumbling > > windows such as 5 minutes tumbling windows (May be there is a better way > > that I don't know). But TUMBLE function that can specify the parameters > > such as window size would be nice. I am +1 for other extensions as well. > > > > Thanks > > Milinda > > > > On Wed, Jun 24, 2015 at 6:32 PM, Julian Hyde <jh...@apache.org> wrote: > > > >> Hi all, > >> > >> Forgive the cross-post. This is for Calcite devs interested in > >> streaming and Samza devs interested in SQL. > >> > >> I've been thinking some more about how to implement hopping and > >> tumbling windows in streaming SQL. I was previously at a loss to find > >> a concise syntax that is consistent with how SQL semantics, but I have > >> found a syntax that I think can please everyone. > >> > >> Recall that a hopping window emits a sub-total every X seconds of > >> records that have arrived over the last Y seconds. A tumbling window > >> is a hopping window where X and Y are equal. > >> > >> In > https://calcite.incubator.apache.org/docs/stream.html#hopping-windows > >> I give an example, "emit, every hour, the number of each product > >> ordered over the past three hours". > >> > >> That example gives a query in terms of a GROUP BY (in the HourlyTotals > >> view) followed by a moving sum. I didn't think that it was possible to > >> express using just one GROUP BY, because that would violate one of the > >> principles of SQL: that each record entering a GROUP BY contributes to > >> precisely one output record. > >> > >> But I've just realized that the CUBE, ROLLUP and GROUPING SETS > >> operators (already in SQL) violate that principle. And if they can do > >> it, we can do the same. So we could add another grouping function, > >> HOP(t, emit, retain). > >> > >> The query would look like this: > >> > >> SELECT STREAM START(rowtime) AS rowtime, > >> productId, > >> SUM(units) AS sumUnits, > >> COUNT(*) AS c > >> FROM Orders > >> GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR), > >> productId > >> > >> Much nicer than the one in stream.html! > >> > >> The "trick" is that the HOP function is returning a list of rowtime > >> values. For example, for row 1 {rowtime: '09:33', ...} it will return > >> ['09:00', '10:00', '11:00']; for row 2 {rowtime: '10:05', ...} it will > >> return ['10:00', '11:00', '12:00']. The system adds each row to > >> several sub-totals, and emits each sub-total when it is complete. The > >> sub-total for '09:00' will contain only row 1, and will be emitted at > >> '10:00'; the sub-total for '10:00' will contain row 1 and row 2, and > >> will be emitted at '11:00', and so forth. > >> > >> Returning multiple values is related to the flatMap function in Spark > >> (and earlier selectMany in LINQ) and makes HOP's semantics similar to > >> GROUPING SETS and therefore sound. > >> > >> START is a new aggregate function that returns the lower bound of the > >> current sub-total; END similarly. > >> > >> Note that the "retain" argument does not need to be a whole multiple > >> of the "emit" argument. This was a major limitation in the previous > >> proposal. > >> > >> There are some straightforward extensions: > >> * Define a TUMBLE function; > >> * Add an "align" argument to HOP, to allow windows to start at, say, 5 > >> minutes past each hour; > >> * Apply HOP to windows based on row-counts; > >> * Allow user-defined windowing functions that similarly return a list > >> of interval start-end points. > >> > >> Julian > >> > > > > > > > > -- > > Milinda Pathirage > > > > PhD Student | Research Assistant > > School of Informatics and Computing | Data to Insight Center > > Indiana University > > > > twitter: milindalakmal > > skype: milinda.pathirage > > blog: http://milinda.pathirage.org >