Hi all, I've read the document about dynamic table. Honestly, I think it's well-defined and ingeniously compromise the batch and stream. There are two questions about the design.
1) Though it's fine to take the stream as a snapshot of a dynamic table, a table is essentially a set while a stream is essentially an ordered list (with xxTime). I'm not sure if the operations on a set will all suit for a list (e.g union or merge?). Of course, we can add an "order by time" to all SQL instances, but will it be suitable? 2) As radu said, I also think inner query is essential for a query language. (I didn't see any select from (select) in the document). The problem is, the SQL is based on a closure theory while we can not prove that for a stream. Can the result from a stream operation be another input? It depends. The window operator will convert "point of time" events to "period of time" events and I don't know if the nature of data have changed. Also, the partial emission will lead to heterogeneous results. BTW, the "Emission of dynamic tables" section seem to be a little incompatible with the whole document... Best, Xingcan On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran <radu.tudo...@huawei.com> wrote: > Hi Shaoxuan, > > Thanks for the feedback! > Regarding the proposal for relational queries that you referenced, I am a > bit confused with respect to its purpose and evolution with respect to the > current implementation of stream sql - is it suppose to replace this > implementation, to complement it....but I will send another email about > this as I guess this can be a standalone discussion tread > > Also, regarding the join stream-to-stream I intend to start another > discussion about this such that we can decide all together if we can start > some implementation/design now or we need to wait. > > Now, regarding the inner queries and the points you raised. It is true > that in general an inner join would work like any other join (which > obviously requires some buffering capabilities and mechanisms to restrict > the infinite growth for the join state composition). However, at least for > some cases of supporting inner queries we can support them without the need > for buffering mechanisms or full support for inner join / left join. > Basically the logical operator in which an inner query is translated (left > join with an always true condition is to some extend more similar to UNION > ,- and the union implementation, then the implementation we will have for > the joins). This is why I believe we can already provide the support for > this (I also tested a PoC implementation internally for this and it works). > In terms of examples when we could use this, please see the next 2 > examples. Please let me know what do you think and whether it is worth > designing the jira issue perhaps with some more details (including the > technical details). > > Consider the example below: > > SELECT STREAM user > FROM inputstream > WHERE amount > (SELECT STREAM Min(amount2) FROM inputstream2) > > The point of this is to restrict the values you are selecting based on > some value that you have from the other stream. Consider the values below > that come in each stream > > Inputstream inputstream2 Result > User1,100 user1 (because there is no > value in inputstream2 and the left join should not restrict the output in > this case) > X,x,10 nothing as there is no > event in inputstream to be outputted. Min will become from now 10 > User2, 20 user2 (because 20 is > greater than 10 which is the minimum retain in inputstream2) > X,x,20 nothing as there is no > event in inputstream to be outputted. Min will remain from now 10 > X,x, 5 nothing as there is no > event in inputstream to be outputted. Min will become from now 5 > User3, 8 User3 (because 8 is > greater than 5) > .... > > > The goal for the final usage of this is to be able among others to define > multiple window processing on the same input stream. Consider: > > SELECT STREAM user > FROM inputstream > WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY timestamp > RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM inputstream) < amount > > > Assume you have the following events each coming every 30 minutes > User1, 100 -> Average is 100 and the output of the topology that > implements the query is NULL (no output as 100 is not > than 100) > User2, 10 -> Average is 55 and the output of the topology that > implements the query is NULL (no output as 10 is not > than 55) > User3, 40 -> Average is 25 (10+40) and the output of the topology that > implements the query is User3 (40 is > than 25) > .... > Although the query as it is depends on aggregates and windows, the > operator to implement the inner query can be implemented independently of > functions that are contained in the query. Also, there is no need for a > window or buffering to implement the logic for assembling the results from > the inner query. > > > Best regards, > > -----Original Message----- > From: Shaoxuan Wang [mailto:wshaox...@gmail.com] > Sent: Thursday, January 26, 2017 4:36 AM > To: dev@flink.apache.org > Subject: Re: STREAM SQL inner queries > > Hi Radu, > Similar as the stream-stream join, this stream-stream inner query does not > seem to be well defined. It needs provide at least some kind of window > bounds to complete the streaming SQL semantics. If this is an unbounded > join/select, a mechanism of how to store the infinite date has to be > considered. I may not fully understand your proposal. Could you please > provide more details about this inner query, say giving some examples of > input and output. It would be also great if you can explain the use case of > this inner query. This helps us to understand the semantics. > > It should also be noted that, we have recently decided to unify stream and > batch query with the same regular (batch) SQL. Therefore we have removed > the support for STREAM keyword in flink Streaming SQL. In the past several > months, Fabian and Xiaowei Jiang have started to work on the future > Relational Queries on flink streaming. Fabian has drafted a very good > design doc, https://goo.gl/m31kkE. The design is based on a new concept > of dynamic table whose content changes over time, thereby can be derived > from streams. With this dynamic table, stream query can be done via regular > (batch) SQL. Besides some syntax sugar, there is not too much difference > between batch query and stream query (in terms of what and where of a query > is executed). Stream query has addition characters in the manners of when > to emit a result and how to refine the result considering the retraction. > > Hope this helps and look forward to working with you on streaming SQL. > > Regards, > Shaoxuan > > > On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran <radu.tudo...@huawei.com> > wrote: > > > Hi all, > > > > I would like to open a jira issue (and then provide the > > implementation) for supporting inner queries. The idea is to be able > > to support SQL queries as the ones presented in the scenarios below. > > The key idea is that supporting inner queries would require to have the > implementation for: > > > > è JOIN (type = left and condition = true) - Basically this is a simple > > implementation for a join function between 2 streams that does not > > require any window support behind the scenes as there is no condition > > on which to perform the join > > > > è SINGLE_VALUE - this operator would require to provide one value to > > be furthered joined. In the context of streaming this value should > > basically evolve with the contents of the window. This could be > > implemented with a flatmap function as left joins would allow also to > > do the mapping with null values > > > > We can then extend this initial and simple implementation to provide > > support for joins in general (conditional joins, right joins..) or we > > can isolate this implementation for this specific case of inner > > queries and go with a totally new design for stream to stream joins > > (might be needed depending on what is the decision behind on how to > > support the conditional > > mapping) > > > > What do you think about this? > > > > Examples of scenarios to apply > > > > SELECT STREAM amount, > > (SELECT id FROM inputstream1) AS field1 FROM inputstream2 > > > > Translated to > > LogicalProject(amount=[$1], c=[$4]) > > LogicalJoin(condition=[true], joinType=[left]) > > LogicalTableScan(table=[[inputstream1]]) > > LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) > > LogicalProject(user_id=[$0]) > > LogicalTableScan(table=[[inputstream2]]) > > > > Or from the same stream - perhaps interesting for applying some more > > complex operations within the inner query SELECT STREAM amount, > > (SELECT id FROM inputstream1) AS field1 FROM inputstream1 > > > > Translated to > > LogicalProject(amount=[$1], c=[$4]) > > LogicalJoin(condition=[true], joinType=[left]) > > LogicalTableScan(table=[[inputstream1]]) > > LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) > > LogicalProject(user_id=[$0]) > > LogicalTableScan(table=[[inputstream1]]) > > > > Or used to do the projection > > SELECT STREAM amount, c FROM (SELECT *,id AS c FROM inputstream1) > > > > Translated to > > LogicalProject(amount=[$1], c=[$5]) > > LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], c=[$5]) > > LogicalTableScan(table=[[inputstream1]]) > > > > > > Or in the future even > > SELECT STREAM amount, myagg FROM (SELECT STREAM *, SUM(amount) OVER > > window AS myagg FROM inputstream1)) ... > > > > > > > > >