Let me propose an alternative approach. The deliverables and the technology 
stack would be different, but I think we still fulfill the spirit of the 
proposal, and there are benefits from better interoperability, standards 
compliance, and building on existing code that already works.
First, I propose that we implement not a SQL-like language, but standard SQL 
with streaming extensions. The extensions should be minimal, and should be 
consistent with the look and feel of the language (e.g. SQL tends to use 
keywords like OVER rather than punctuation marks like '['). If there is a way 
to achieve something within standard SQL, we should use it. And any extensions 
we make should preserve SQL's principles of being a closed language (you can 
nest structures to arbitrary depth, and you can re-use queries as if they were 
relations) and of having a sound semantics.

The language would allow queries involving streams, mixtures of streams and 
relations, and only relations. A query that only used relations would be 100% 
standard SQL, and a query that used a stream S would, with luck, be very 
similar to one that used a table T with the same contents as S.

Second, I propose that we use Calcite's SQL parser, validator, and logical 
algebra. We could also use its JDBC driver infrastructure.

I agree with the consensus that CQL is a good basis for extending relational 
algebra to streams. The question is, can we shoehorn CQL's algebra extensions 
into SQL? I believe we can, as follows:
The ISTREAM operator is represented by the STREAM keyword after SELECT (DSTREAM 
and RSTREAM are much less important, so can be deferred)
Streams included in the FROM clause implicitly become relations (but they are 
“streamable” relations, and the planner will very likely leverage this when 
finding a viable implementation)
To use a particular window of a stream, not its entire history, follow it with 
an OVER clause

Use SQL standard constructs for datetime literals (TIMESTAMP ‘2015-01-29 
12:18:34.123’, DATE ‘2015-01-29’), interval literals (INTERVAL ‘2:30’ HOUR TO 
MINUTE, INTERVAL ‘5’ MONTH), windowed aggregates (“AVG(price) OVER (PARTITION 
BY productId RANGE BETWEEN 10 ROWS PRECEDING AND 5 ROWS FOLLOWING)”).

Some examples.

# Identity 
SELECT STREAM *
FROM Orders;

# Filter and project
SELECT STREAM state, quantity
FROM Orders
WHERE state = ‘CA’;

# Windowed aggregation
SELECT STREAM product,
   AVG(price) OVER this_week AS avg_price
FROM Orders
WINDOW this_week AS (PARTITION BY product ORDER BY rowtime RANGE INTERVAL ‘7’ 
DAY PRECEDING);

# Aggregation
# At the top of each hour, emit totals for each product
SELECT STREAM product, trunc(rowtime to hour) AS rowtime, COUNT(*) AS c
FROM Orders
GROUP BY product, trunc(rowtime to hour)

# Relational query on recent history of a stream
SELECT product, COUNT(*)
FROM Orders OVER (ORDER BY rowtime RANGE ‘1’ HOUR PRECEDING)
GROUP BY product;

or alternatively

SELECT product, COUNT(*)
FROM Orders
WHERE rowtime BETWEEN now - INTERVAL ‘1’ HOUR AND now
GROUP BY product;

# Stream-table join producing a stream
SELECT STREAM *
FROM Orders AS o
  JOIN Products AS p ON o.productId = p.productId
WHERE o.price < p.list_price / 2;

# Stream-stream join producing a stream
SELECT STREAM o.rowtime, o.orderId, s.shipmentId, s.rowtime AS shipTime
FROM Orders AS o
  JOIN Shipments AS s
  ON o.orderId = s.orderId
   AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1’ HOUR
ORDER BY o.rowtime;

# Union
SELECT STREAM rowtime, customerId FROM Orders
UNION ALL
SELECT STREAM rowtime, customerId FROM SupportCalls;

Note that, as in standard SQL, windowed aggregation emits the same number of 
rows as it consumes, whereas GROUP BY collapses rows. The windowed aggregation 
and GROUP BY examples both leverage the fact that “rowtime” is sorted (and 
“trunc(rowtime to hour)” can be deduced to be sorted). If it were not, the 
system would not allow the queries.

Calcite already has a SQL parser, validator, metadata SPI (that you can use to 
declare what schemas, tables, streams, columns are available), and a logical 
algebra. The logical algebra consists of TableScan, Filter, Project, Union, 
Join, Aggregate, Window, Sort, Values (and a few others). Calcite allows you to 
define transformation rules that combine operator calls to produce semantically 
equivalent operator calls, and has an engine that applies lots of 
transformation rules, optionally guided by a cost model.

I am working on a prototype of Calcite that adds streaming [ 
https://github.com/julianhyde/incubator-calcite/tree/chi 
<https://github.com/julianhyde/incubator-calcite/tree/chi> ]. Just two new 
operators are needed: Delta (converts a relation to a stream) and Chi (converts 
a stream to a relation). And a few rules, such as one that maps 
Delta(Filter(condition, r)) to Filter(condition, Delta(r)), are sufficient to 
transform the logical algebra into something that could be implemented. My 
prototype can parse

SELECT STREAM * From Orders

convert it to

LogicalDelta
  LogicalProject(id=[$0], product=[$1], quantity=[$2])
    EnumerableTableScan(table=[[STREAMS, ORDERS]])

and simplify to

EnumerableStreamScan(table=[[STREAMS, ORDERS]])

The next step would be to write rules to convert EnumerableStreamScan (and 
several other operators) to physical algebra operators SamzaStreamScan etc.


In summary, I think this approach should be given serious consideration. An 
extended standard SQL is much more useful than a SQL-like language, and I 
believe I have shown that we can add the necessary extensions to SQL without 
destroying it.

Building a SQL parser, validator, relational algebra, JDBC driver and planning 
framework is a massive amount of work and 90% of the functionality is identical 
in a streaming and non-streaming system.

Lastly, building a stack based on extended standard SQL does not preclude 
adding other high-level languages on top of the algebra at a later date.

Julian


> On Jan 28, 2015, at 5:36 PM, Yi Pan <nickpa...@gmail.com> wrote:
> 
> Hi, Julian,
> 
> Thanks for explanation. I got your point that the physical layer
> "stream-scan" can be used to get the delta(filter(..)) in the logical
> algebra.
> My question on this model is:
> If a window operation is implemented as filter(tuple.isInWindow(),
> stream-scan(Orders)) in the physical layer, it still involves "stream-scan"
> on a stream and output a delta relation; while the output of the select
> operator is a delta relation and should have a scan(<select-result>)
> operation to output it to the stream.
> 
> Or, unless that you were referring to the model that even in the physical
> operators for non-stream SQL, the operators should operate on "streams" of
> tuples coming into the operator logic. Just that the "streams" are
> generated by scanning the regular relational table during the operation.
> Then, I agree that essentially the physical operators for non-stream and
> stream queries may be merged in one model. Am I interpreting your idea
> correctly?
> 
> On Wed, Jan 28, 2015 at 4:52 PM, Julian Hyde <jul...@hydromatic.net> wrote:
> 
>> Consider this simple query (I'll express in 3 equivalent ways):
>> 
>> * select stream * from Orders where state = 'CA' (in streaming SQL)
>> * istream [ select * from Orders where state = 'CA' ] (in CQL)
>> * delta(filter(state = 'CA', scan(Orders))) (in logical algebra)
>> 
>> In CQL there are no named streams, just streamable tables. So we have to
>> ask for the istream of it.
>> 
>> But in Samza or any other streaming system, Orders is a stream. You can
>> simply convert the logical algebra
>> 
>>  delta(filter(state = 'CA', scan(Orders)))
>> 
>> to the physical algebra
>> 
>>  filter(state = 'CA', stream-scan(Orders))
>> 
>> In the physical algebra the data stays in streaming format all the way
>> through.
>> 
>> My point was that stream-to-relation and relation-to-stream occur in EVERY
>> CQL query (and logical algebra) but do not necessarily occur in the
>> physical algebra.
>> 
>> Julian
>> 
>> 
>>> On Jan 28, 2015, at 2:18 PM, Yi Pan <nickpa...@gmail.com> wrote:
>>> 
>>> Hi, Julian,
>>> 
>>> Thanks! I think we all agreed on the point to isolate between SQL AST and
>>> the logical algebra.
>>> 
>>> Focusing on your comment below:
>>> "The stream-to-relation and relation-to-stream operators are in the
>> logical
>>> algebra but very likely have disappeared by the time you get to the
>>> physical algebra. And the physical algebra introduces new constructs like
>>> lookups into time-varying materializations and partitioning."
>>> 
>>> In our case, the physical algebra is the Samza operators. I found it hard
>>> to understand how we can make the stream-to-relation and
>> relation-to-stream
>>> operators going away. For example, window operator is a construct to
>> create
>>> a time-varying materializations of relation and istream operators is a
>>> construct to take the insertions of new rows in a time-varying relation
>> and
>>> output to a stream of tuples. I agree on your comments on rstream, which
>>> seems just have academic meanings. But I am not sure w/o the physical
>>> operators performing the relation/stream conversions, how do we implement
>>> the window operator?
>>> 
>>> -Yi
>>> 
>>> 
>>> On Wed, Jan 28, 2015 at 2:01 PM, Julian Hyde <jul...@hydromatic.net>
>> wrote:
>>> 
>>>> 
>>>> On Jan 28, 2015, at 10:02 AM, Yi Pan <nickpa...@gmail.com> wrote:
>>>> 
>>>>> I try to understand your comments below: "But there is not a simple
>>>>> mapping between
>>>>> true SQL and a data-flow graph that you can execute." What is the
>>>> specific
>>>>> meaning of this statement? Could you elaborate on this a bit more?
>>>> 
>>>> The structure of a SQL query (and its AST) is different to the structure
>>>> of the relational algebra that it translates to. The elements of a SQL
>>>> query are its clauses (FROM, WHERE, GROUP BY, SELECT, HAVING, ORDER BY)
>> and
>>>> the elements of a relational algebra expression are the relational
>>>> operators (scan, join, filter, aggregate, project, sort) and for simple
>>>> queries there is a simple mapping. But the mapping becomes complex when
>>>> there are sub-queries and especially correlations, but even a 3-way
>> outer
>>>> join can be complex. In Calcite, SqlToRelConverter, which performs this
>>>> task, started off 100 lines long and is now 5,000.
>>>> 
>>>> My point was that you shouldn’t conflate the SQL AST with the logical
>>>> algebra. It sounds like the point is already taken.
>>>> 
>>>> In non-streaming databases, it is almost possible to execute the logical
>>>> algebra as is. (You need to use iterators, i.e. convert relations into
>>>> streams, and when joining, you need to be careful not to create
>> cartesian
>>>> products before you start applying filters, but otherwise you’re safe.)
>>>> 
>>>> But in streaming databases, the logical algebra is not implementable.
>> You
>>>> cannot literally implement the stream-to-relation or relation-to-stream
>>>> operators, or, heaven forbid, the r-stream, that re-transmits the whole
>>>> table every clock-tick. So in addition to the logical algebra you need a
>>>> physical algebra. The stream-to-relation and relation-to-stream
>> operators
>>>> are in the logical algebra but very likely have disappeared by the time
>> you
>>>> get to the physical algebra. And the physical algebra introduces new
>>>> constructs like lookups into time-varying materializations and
>> partitioning.
>>>> 
>>>> Julian
>> 
>> 

Reply via email to