Let me propose an alternative approach. The deliverables and the technology stack would be different, but I think we still fulfill the spirit of the proposal, and there are benefits from better interoperability, standards compliance, and building on existing code that already works.
First, I propose that we implement not a SQL-like language, but standard SQL with streaming extensions. The extensions should be minimal, and should be consistent with the look and feel of the language (e.g. SQL tends to use keywords like OVER rather than punctuation marks like '['). If there is a way to achieve something within standard SQL, we should use it. And any extensions we make should preserve SQL's principles of being a closed language (you can nest structures to arbitrary depth, and you can re-use queries as if they were relations) and of having a sound semantics. The language would allow queries involving streams, mixtures of streams and relations, and only relations. A query that only used relations would be 100% standard SQL, and a query that used a stream S would, with luck, be very similar to one that used a table T with the same contents as S. Second, I propose that we use Calcite's SQL parser, validator, and logical algebra. We could also use its JDBC driver infrastructure. I agree with the consensus that CQL is a good basis for extending relational algebra to streams. The question is, can we shoehorn CQL's algebra extensions into SQL? I believe we can, as follows: The ISTREAM operator is represented by the STREAM keyword after SELECT (DSTREAM and RSTREAM are much less important, so can be deferred) Streams included in the FROM clause implicitly become relations (but they are “streamable” relations, and the planner will very likely leverage this when finding a viable implementation) To use a particular window of a stream, not its entire history, follow it with an OVER clause Use SQL standard constructs for datetime literals (TIMESTAMP ‘2015-01-29 12:18:34.123’, DATE ‘2015-01-29’), interval literals (INTERVAL ‘2:30’ HOUR TO MINUTE, INTERVAL ‘5’ MONTH), windowed aggregates (“AVG(price) OVER (PARTITION BY productId RANGE BETWEEN 10 ROWS PRECEDING AND 5 ROWS FOLLOWING)”). Some examples. # Identity SELECT STREAM * FROM Orders; # Filter and project SELECT STREAM state, quantity FROM Orders WHERE state = ‘CA’; # Windowed aggregation SELECT STREAM product, AVG(price) OVER this_week AS avg_price FROM Orders WINDOW this_week AS (PARTITION BY product ORDER BY rowtime RANGE INTERVAL ‘7’ DAY PRECEDING); # Aggregation # At the top of each hour, emit totals for each product SELECT STREAM product, trunc(rowtime to hour) AS rowtime, COUNT(*) AS c FROM Orders GROUP BY product, trunc(rowtime to hour) # Relational query on recent history of a stream SELECT product, COUNT(*) FROM Orders OVER (ORDER BY rowtime RANGE ‘1’ HOUR PRECEDING) GROUP BY product; or alternatively SELECT product, COUNT(*) FROM Orders WHERE rowtime BETWEEN now - INTERVAL ‘1’ HOUR AND now GROUP BY product; # Stream-table join producing a stream SELECT STREAM * FROM Orders AS o JOIN Products AS p ON o.productId = p.productId WHERE o.price < p.list_price / 2; # Stream-stream join producing a stream SELECT STREAM o.rowtime, o.orderId, s.shipmentId, s.rowtime AS shipTime FROM Orders AS o JOIN Shipments AS s ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1’ HOUR ORDER BY o.rowtime; # Union SELECT STREAM rowtime, customerId FROM Orders UNION ALL SELECT STREAM rowtime, customerId FROM SupportCalls; Note that, as in standard SQL, windowed aggregation emits the same number of rows as it consumes, whereas GROUP BY collapses rows. The windowed aggregation and GROUP BY examples both leverage the fact that “rowtime” is sorted (and “trunc(rowtime to hour)” can be deduced to be sorted). If it were not, the system would not allow the queries. Calcite already has a SQL parser, validator, metadata SPI (that you can use to declare what schemas, tables, streams, columns are available), and a logical algebra. The logical algebra consists of TableScan, Filter, Project, Union, Join, Aggregate, Window, Sort, Values (and a few others). Calcite allows you to define transformation rules that combine operator calls to produce semantically equivalent operator calls, and has an engine that applies lots of transformation rules, optionally guided by a cost model. I am working on a prototype of Calcite that adds streaming [ https://github.com/julianhyde/incubator-calcite/tree/chi <https://github.com/julianhyde/incubator-calcite/tree/chi> ]. Just two new operators are needed: Delta (converts a relation to a stream) and Chi (converts a stream to a relation). And a few rules, such as one that maps Delta(Filter(condition, r)) to Filter(condition, Delta(r)), are sufficient to transform the logical algebra into something that could be implemented. My prototype can parse SELECT STREAM * From Orders convert it to LogicalDelta LogicalProject(id=[$0], product=[$1], quantity=[$2]) EnumerableTableScan(table=[[STREAMS, ORDERS]]) and simplify to EnumerableStreamScan(table=[[STREAMS, ORDERS]]) The next step would be to write rules to convert EnumerableStreamScan (and several other operators) to physical algebra operators SamzaStreamScan etc. In summary, I think this approach should be given serious consideration. An extended standard SQL is much more useful than a SQL-like language, and I believe I have shown that we can add the necessary extensions to SQL without destroying it. Building a SQL parser, validator, relational algebra, JDBC driver and planning framework is a massive amount of work and 90% of the functionality is identical in a streaming and non-streaming system. Lastly, building a stack based on extended standard SQL does not preclude adding other high-level languages on top of the algebra at a later date. Julian > On Jan 28, 2015, at 5:36 PM, Yi Pan <nickpa...@gmail.com> wrote: > > Hi, Julian, > > Thanks for explanation. I got your point that the physical layer > "stream-scan" can be used to get the delta(filter(..)) in the logical > algebra. > My question on this model is: > If a window operation is implemented as filter(tuple.isInWindow(), > stream-scan(Orders)) in the physical layer, it still involves "stream-scan" > on a stream and output a delta relation; while the output of the select > operator is a delta relation and should have a scan(<select-result>) > operation to output it to the stream. > > Or, unless that you were referring to the model that even in the physical > operators for non-stream SQL, the operators should operate on "streams" of > tuples coming into the operator logic. Just that the "streams" are > generated by scanning the regular relational table during the operation. > Then, I agree that essentially the physical operators for non-stream and > stream queries may be merged in one model. Am I interpreting your idea > correctly? > > On Wed, Jan 28, 2015 at 4:52 PM, Julian Hyde <jul...@hydromatic.net> wrote: > >> Consider this simple query (I'll express in 3 equivalent ways): >> >> * select stream * from Orders where state = 'CA' (in streaming SQL) >> * istream [ select * from Orders where state = 'CA' ] (in CQL) >> * delta(filter(state = 'CA', scan(Orders))) (in logical algebra) >> >> In CQL there are no named streams, just streamable tables. So we have to >> ask for the istream of it. >> >> But in Samza or any other streaming system, Orders is a stream. You can >> simply convert the logical algebra >> >> delta(filter(state = 'CA', scan(Orders))) >> >> to the physical algebra >> >> filter(state = 'CA', stream-scan(Orders)) >> >> In the physical algebra the data stays in streaming format all the way >> through. >> >> My point was that stream-to-relation and relation-to-stream occur in EVERY >> CQL query (and logical algebra) but do not necessarily occur in the >> physical algebra. >> >> Julian >> >> >>> On Jan 28, 2015, at 2:18 PM, Yi Pan <nickpa...@gmail.com> wrote: >>> >>> Hi, Julian, >>> >>> Thanks! I think we all agreed on the point to isolate between SQL AST and >>> the logical algebra. >>> >>> Focusing on your comment below: >>> "The stream-to-relation and relation-to-stream operators are in the >> logical >>> algebra but very likely have disappeared by the time you get to the >>> physical algebra. And the physical algebra introduces new constructs like >>> lookups into time-varying materializations and partitioning." >>> >>> In our case, the physical algebra is the Samza operators. I found it hard >>> to understand how we can make the stream-to-relation and >> relation-to-stream >>> operators going away. For example, window operator is a construct to >> create >>> a time-varying materializations of relation and istream operators is a >>> construct to take the insertions of new rows in a time-varying relation >> and >>> output to a stream of tuples. I agree on your comments on rstream, which >>> seems just have academic meanings. But I am not sure w/o the physical >>> operators performing the relation/stream conversions, how do we implement >>> the window operator? >>> >>> -Yi >>> >>> >>> On Wed, Jan 28, 2015 at 2:01 PM, Julian Hyde <jul...@hydromatic.net> >> wrote: >>> >>>> >>>> On Jan 28, 2015, at 10:02 AM, Yi Pan <nickpa...@gmail.com> wrote: >>>> >>>>> I try to understand your comments below: "But there is not a simple >>>>> mapping between >>>>> true SQL and a data-flow graph that you can execute." What is the >>>> specific >>>>> meaning of this statement? Could you elaborate on this a bit more? >>>> >>>> The structure of a SQL query (and its AST) is different to the structure >>>> of the relational algebra that it translates to. The elements of a SQL >>>> query are its clauses (FROM, WHERE, GROUP BY, SELECT, HAVING, ORDER BY) >> and >>>> the elements of a relational algebra expression are the relational >>>> operators (scan, join, filter, aggregate, project, sort) and for simple >>>> queries there is a simple mapping. But the mapping becomes complex when >>>> there are sub-queries and especially correlations, but even a 3-way >> outer >>>> join can be complex. In Calcite, SqlToRelConverter, which performs this >>>> task, started off 100 lines long and is now 5,000. >>>> >>>> My point was that you shouldn’t conflate the SQL AST with the logical >>>> algebra. It sounds like the point is already taken. >>>> >>>> In non-streaming databases, it is almost possible to execute the logical >>>> algebra as is. (You need to use iterators, i.e. convert relations into >>>> streams, and when joining, you need to be careful not to create >> cartesian >>>> products before you start applying filters, but otherwise you’re safe.) >>>> >>>> But in streaming databases, the logical algebra is not implementable. >> You >>>> cannot literally implement the stream-to-relation or relation-to-stream >>>> operators, or, heaven forbid, the r-stream, that re-transmits the whole >>>> table every clock-tick. So in addition to the logical algebra you need a >>>> physical algebra. The stream-to-relation and relation-to-stream >> operators >>>> are in the logical algebra but very likely have disappeared by the time >> you >>>> get to the physical algebra. And the physical algebra introduces new >>>> constructs like lookups into time-varying materializations and >> partitioning. >>>> >>>> Julian >> >>