Hi, Julian, Thanks for sharing your idea! It is interesting and well organized. Let me try to summarize the main difference between yours and the current proposal are: - removing the '[]' used to define the window specification, using OVER on the stream/table instead - join/select output can be either stream or table, since both become legitimate data sources in query.
There are still two points that I want to comment on: a. in the current proposal, the window operator can specify a step size for window advancement. With your examples, it seems that all windows will be incremented by step size 1. Hence, the output of the query will be "continuous" stream, which reports a moving average, instead of an fixed size window average across all rows. Is it easy to extend your model for that use case? b. for output streams, there is no definition of "partitions". I remember that we talked about it briefly in our early discussion and you commented that it should not be there in SQL. I would argue that a specification to how to partition the output stream is needed in the current system, since we are lacking of two things: i) automatically identify the key that can be used for optimal partitioning of a query's output; ii) auto-scaling of the number of partitions of a stream if the initial partition is not enough. Hence, we have to give user a way to tell the system how to partition. I am curious about what exactly are the reasons that you believe that partition should not be added in SQL syntax? BTW, it seems even with your proposed model, the current physical operators won't be affected, which sounds good. Thanks! -Yi On Thu, Jan 29, 2015 at 1:54 PM, Julian Hyde <julianh...@gmail.com> wrote: > Let me propose an alternative approach. The deliverables and the > technology stack would be different, but I think we still fulfill the > spirit of the proposal, and there are benefits from better > interoperability, standards compliance, and building on existing code that > already works. > > First, I propose that we implement not a SQL-like language, but standard > SQL with streaming extensions. The extensions should be minimal, and should > be consistent with the look and feel of the language (e.g. SQL tends to use > keywords like OVER rather than punctuation marks like '['). If there is a > way to achieve something within standard SQL, we should use it. And any > extensions we make should preserve SQL's principles of being a closed > language (you can nest structures to arbitrary depth, and you can re-use > queries as if they were relations) and of having a sound semantics. > > The language would allow queries involving streams, mixtures of streams > and relations, and only relations. A query that only used relations would > be 100% standard SQL, and a query that used a stream S would, with luck, be > very similar to one that used a table T with the same contents as S. > > Second, I propose that we use Calcite's SQL parser, validator, and logical > algebra. We could also use its JDBC driver infrastructure. > > I agree with the consensus that CQL is a good basis for extending > relational algebra to streams. The question is, can we shoehorn CQL's > algebra extensions into SQL? I believe we can, as follows: > The ISTREAM operator is represented by the STREAM keyword after SELECT > (DSTREAM and RSTREAM are much less important, so can be deferred) > Streams included in the FROM clause implicitly become relations (but they > are “streamable” relations, and the planner will very likely leverage this > when finding a viable implementation) > To use a particular window of a stream, not its entire history, follow it > with an OVER clause > > Use SQL standard constructs for datetime literals (TIMESTAMP ‘2015-01-29 > 12:18:34.123’, DATE ‘2015-01-29’), interval literals (INTERVAL ‘2:30’ HOUR > TO MINUTE, INTERVAL ‘5’ MONTH), windowed aggregates (“AVG(price) OVER > (PARTITION BY productId RANGE BETWEEN 10 ROWS PRECEDING AND 5 ROWS > FOLLOWING)”). > > Some examples. > > # Identity > SELECT STREAM * > FROM Orders; > > # Filter and project > SELECT STREAM state, quantity > FROM Orders > WHERE state = ‘CA’; > > # Windowed aggregation > SELECT STREAM product, > AVG(price) OVER this_week AS avg_price > FROM Orders > WINDOW this_week AS (PARTITION BY product ORDER BY rowtime RANGE INTERVAL > ‘7’ DAY PRECEDING); > > # Aggregation > # At the top of each hour, emit totals for each product > SELECT STREAM product, trunc(rowtime to hour) AS rowtime, COUNT(*) AS c > FROM Orders > GROUP BY product, trunc(rowtime to hour) > > # Relational query on recent history of a stream > SELECT product, COUNT(*) > FROM Orders OVER (ORDER BY rowtime RANGE ‘1’ HOUR PRECEDING) > GROUP BY product; > > or alternatively > > SELECT product, COUNT(*) > FROM Orders > WHERE rowtime BETWEEN now - INTERVAL ‘1’ HOUR AND now > GROUP BY product; > > # Stream-table join producing a stream > SELECT STREAM * > FROM Orders AS o > JOIN Products AS p ON o.productId = p.productId > WHERE o.price < p.list_price / 2; > > # Stream-stream join producing a stream > SELECT STREAM o.rowtime, o.orderId, s.shipmentId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1’ HOUR > ORDER BY o.rowtime; > > # Union > SELECT STREAM rowtime, customerId FROM Orders > UNION ALL > SELECT STREAM rowtime, customerId FROM SupportCalls; > > Note that, as in standard SQL, windowed aggregation emits the same number > of rows as it consumes, whereas GROUP BY collapses rows. The windowed > aggregation and GROUP BY examples both leverage the fact that “rowtime” is > sorted (and “trunc(rowtime to hour)” can be deduced to be sorted). If it > were not, the system would not allow the queries. > > Calcite already has a SQL parser, validator, metadata SPI (that you can > use to declare what schemas, tables, streams, columns are available), and a > logical algebra. The logical algebra consists of TableScan, Filter, > Project, Union, Join, Aggregate, Window, Sort, Values (and a few others). > Calcite allows you to define transformation rules that combine operator > calls to produce semantically equivalent operator calls, and has an engine > that applies lots of transformation rules, optionally guided by a cost > model. > > I am working on a prototype of Calcite that adds streaming [ > https://github.com/julianhyde/incubator-calcite/tree/chi < > https://github.com/julianhyde/incubator-calcite/tree/chi> ]. Just two new > operators are needed: Delta (converts a relation to a stream) and Chi > (converts a stream to a relation). And a few rules, such as one that maps > Delta(Filter(condition, r)) to Filter(condition, Delta(r)), are sufficient > to transform the logical algebra into something that could be implemented. > My prototype can parse > > SELECT STREAM * From Orders > > convert it to > > LogicalDelta > LogicalProject(id=[$0], product=[$1], quantity=[$2]) > EnumerableTableScan(table=[[STREAMS, ORDERS]]) > > and simplify to > > EnumerableStreamScan(table=[[STREAMS, ORDERS]]) > > The next step would be to write rules to convert EnumerableStreamScan (and > several other operators) to physical algebra operators SamzaStreamScan etc. > > > In summary, I think this approach should be given serious consideration. > An extended standard SQL is much more useful than a SQL-like language, and > I believe I have shown that we can add the necessary extensions to SQL > without destroying it. > > Building a SQL parser, validator, relational algebra, JDBC driver and > planning framework is a massive amount of work and 90% of the functionality > is identical in a streaming and non-streaming system. > > Lastly, building a stack based on extended standard SQL does not preclude > adding other high-level languages on top of the algebra at a later date. > > Julian > > > > On Jan 28, 2015, at 5:36 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > > Hi, Julian, > > > > Thanks for explanation. I got your point that the physical layer > > "stream-scan" can be used to get the delta(filter(..)) in the logical > > algebra. > > My question on this model is: > > If a window operation is implemented as filter(tuple.isInWindow(), > > stream-scan(Orders)) in the physical layer, it still involves > "stream-scan" > > on a stream and output a delta relation; while the output of the select > > operator is a delta relation and should have a scan(<select-result>) > > operation to output it to the stream. > > > > Or, unless that you were referring to the model that even in the physical > > operators for non-stream SQL, the operators should operate on "streams" > of > > tuples coming into the operator logic. Just that the "streams" are > > generated by scanning the regular relational table during the operation. > > Then, I agree that essentially the physical operators for non-stream and > > stream queries may be merged in one model. Am I interpreting your idea > > correctly? > > > > On Wed, Jan 28, 2015 at 4:52 PM, Julian Hyde <jul...@hydromatic.net> > wrote: > > > >> Consider this simple query (I'll express in 3 equivalent ways): > >> > >> * select stream * from Orders where state = 'CA' (in streaming SQL) > >> * istream [ select * from Orders where state = 'CA' ] (in CQL) > >> * delta(filter(state = 'CA', scan(Orders))) (in logical algebra) > >> > >> In CQL there are no named streams, just streamable tables. So we have to > >> ask for the istream of it. > >> > >> But in Samza or any other streaming system, Orders is a stream. You can > >> simply convert the logical algebra > >> > >> delta(filter(state = 'CA', scan(Orders))) > >> > >> to the physical algebra > >> > >> filter(state = 'CA', stream-scan(Orders)) > >> > >> In the physical algebra the data stays in streaming format all the way > >> through. > >> > >> My point was that stream-to-relation and relation-to-stream occur in > EVERY > >> CQL query (and logical algebra) but do not necessarily occur in the > >> physical algebra. > >> > >> Julian > >> > >> > >>> On Jan 28, 2015, at 2:18 PM, Yi Pan <nickpa...@gmail.com> wrote: > >>> > >>> Hi, Julian, > >>> > >>> Thanks! I think we all agreed on the point to isolate between SQL AST > and > >>> the logical algebra. > >>> > >>> Focusing on your comment below: > >>> "The stream-to-relation and relation-to-stream operators are in the > >> logical > >>> algebra but very likely have disappeared by the time you get to the > >>> physical algebra. And the physical algebra introduces new constructs > like > >>> lookups into time-varying materializations and partitioning." > >>> > >>> In our case, the physical algebra is the Samza operators. I found it > hard > >>> to understand how we can make the stream-to-relation and > >> relation-to-stream > >>> operators going away. For example, window operator is a construct to > >> create > >>> a time-varying materializations of relation and istream operators is a > >>> construct to take the insertions of new rows in a time-varying relation > >> and > >>> output to a stream of tuples. I agree on your comments on rstream, > which > >>> seems just have academic meanings. But I am not sure w/o the physical > >>> operators performing the relation/stream conversions, how do we > implement > >>> the window operator? > >>> > >>> -Yi > >>> > >>> > >>> On Wed, Jan 28, 2015 at 2:01 PM, Julian Hyde <jul...@hydromatic.net> > >> wrote: > >>> > >>>> > >>>> On Jan 28, 2015, at 10:02 AM, Yi Pan <nickpa...@gmail.com> wrote: > >>>> > >>>>> I try to understand your comments below: "But there is not a simple > >>>>> mapping between > >>>>> true SQL and a data-flow graph that you can execute." What is the > >>>> specific > >>>>> meaning of this statement? Could you elaborate on this a bit more? > >>>> > >>>> The structure of a SQL query (and its AST) is different to the > structure > >>>> of the relational algebra that it translates to. The elements of a SQL > >>>> query are its clauses (FROM, WHERE, GROUP BY, SELECT, HAVING, ORDER > BY) > >> and > >>>> the elements of a relational algebra expression are the relational > >>>> operators (scan, join, filter, aggregate, project, sort) and for > simple > >>>> queries there is a simple mapping. But the mapping becomes complex > when > >>>> there are sub-queries and especially correlations, but even a 3-way > >> outer > >>>> join can be complex. In Calcite, SqlToRelConverter, which performs > this > >>>> task, started off 100 lines long and is now 5,000. > >>>> > >>>> My point was that you shouldn’t conflate the SQL AST with the logical > >>>> algebra. It sounds like the point is already taken. > >>>> > >>>> In non-streaming databases, it is almost possible to execute the > logical > >>>> algebra as is. (You need to use iterators, i.e. convert relations into > >>>> streams, and when joining, you need to be careful not to create > >> cartesian > >>>> products before you start applying filters, but otherwise you’re > safe.) > >>>> > >>>> But in streaming databases, the logical algebra is not implementable. > >> You > >>>> cannot literally implement the stream-to-relation or > relation-to-stream > >>>> operators, or, heaven forbid, the r-stream, that re-transmits the > whole > >>>> table every clock-tick. So in addition to the logical algebra you > need a > >>>> physical algebra. The stream-to-relation and relation-to-stream > >> operators > >>>> are in the logical algebra but very likely have disappeared by the > time > >> you > >>>> get to the physical algebra. And the physical algebra introduces new > >>>> constructs like lookups into time-varying materializations and > >> partitioning. > >>>> > >>>> Julian > >> > >> > >