Hi Radu, I was a bit surprised, that Calcite's parser accepted your query. Hence, I check the Calcite plan and and had look at the documentation of Calcite's SqlSingleValueAggFunction:
> SINGLE_VALUE aggregate function returns the input value if there is only one value in the input; Otherwise it triggers a run-time error. So, the query would only pass if the inner query returns a single value (which would usually not be the case of a stream). The SINGLE_VALUE check is not added to the plan if the inner query is guaranteed to return a single value (LIMIT 1, global aggregation). Anyway, I agree that we could start to add the simple cases of these joins for processing time. For event-time, I think we need to consider late arriving data and support retraction values. Best, Fabian 2017-01-27 10:43 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi all, > > Thanks for the feedback! > > I agree that we should get the semantics right and after we can implement > it. I think it would be quite useful. Now, regarding the remarks you made: > > > "> SELECT STREAM amount, (SELECT id FROM inputstream1) AS field1 FROM > inputstream2 > you are suggesting that the subquery (SELECT id FROM inputstream1) > returns a single value (I assume the last received value). I would > interpret the query differently and expect it to return the values of all > rows of the inputstream1 up to the current point in time. > " > > It is a good point. It is not so much that I wanted to suggest that this > should be the syntax to use - I just relied basically on the logical > operators that calcite has parsed the query into (JOIN + SINGLE VALUE). > Based on this logical translation I would say the correct implementation > for this translation is to return one value not necessarily the whole > content of the stream. Anyway, we are not restricted to this as we could > potentially use different rules in calcite to alter the resulting plan. > > However, if we decide that such queries should return the whole stream > rather than a single value - we are indeed tapping in the problem of > potentially unbounded cases. For this I do agree that the approach you > proposed to rely on dynamic tables is very good. In such a case we would > just pass to the upper operators the entire content of the dynamic table. > For that matter it works also for the single value (as the table would > contain only one value). However, for the simple case of returning a single > value we can provide even now an implementation and we do not need to wait > until the full functionality of dynamic tables is provided. > > In the same time I also agree that the syntax " a FROM inputstream ORDER > BY time LIMIT 1" is elegant. I have not issue to consider the case of inner > queries to be translated like this only when they would have the "Limit 1" > specified or directly only when they are provided in such a form. > > I will wait for additional remarks in order to all agree on a specific > semantic and then I will push this in a jira issue to be furthered review > and validated. > > Best regards, > > > > Dr. Radu Tudoran > Senior Research Engineer - Big Data Expert > IT R&D Division > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, reproduction, > or dissemination) by persons other than the intended recipient(s) is > prohibited. If you receive this e-mail in error, please notify the sender > by phone or email immediately and delete it! > > -----Original Message----- > From: Fabian Hueske [mailto:fhue...@gmail.com] > Sent: Thursday, January 26, 2017 4:51 PM > To: dev@flink.apache.org > Subject: Re: STREAM SQL inner queries > > Hi everybody, > > thanks for the proposal Radu. > If I understood it correctly, you are proposing a left join between a > stream and a single value (which is compute from a stream). > This makes sense and should be a common use case. > > However, I think some of your example queries do not return a single value > as required for the join. > > In your example: > > SELECT STREAM amount, (SELECT id FROM inputstream1) AS field1 FROM > inputstream2 > > you are suggesting that the subquery (SELECT id FROM inputstream1) > returns a single value (I assume the last received value). > I would interpret the query differently and expect it to return the values > of all rows of the inputstream1 up to the current point in time. > IMO, a query like "SELECT a FROM inputstream ORDER BY time LIMIT 1" would > capture the semantics better. > > The subquery > > (SELECT AVERAGE(amount) OVER (ORDER BY timestamp RANGE INTERVAL 1 HOUR > PRECEDING) AS hour_sum FROM inputstream) > > has a similar problem and would return one row for each record of > inputstream, i.e., not a single value. > > Anyway, if we get the semantics of the query that computes the single > value right, I think this type of join should be well covered by the > dynamic table proposal. > The single value input will be a dynamic table (of constant size = 1) > which is continuously updated by the engine. > Joining this table to to a dynamic (append) table will result in a > continuously growing dynamic table, which can be emitted as a stream. > > This would look very similar as you proposed but we would need to make > sure that the single value query actually returns a single value. > > @Xingcan Thanks for your feedback. > I would suggest to move the general discussion about the dynamic table > proposal to the thread that Radu started (I responded there a few minutes > ago). > > Just a few comments here: By logically converting a stream into a dynamic > table we have well defined semantics for the operation such as aggregations > and joins. > However, you are right, that this does not mean that we can efficiently > apply all operations on dynamic tables that we can apply on an actual batch > table. Some operations are just too expensive or require too much state to > be performed in a streaming fashion. So yes, there will be some > restrictions but that is rather to the nature of stream processing than to > the idea of dynamic tables, IMO. > > Best, > Fabian > > > 2017-01-26 11:33 GMT+01:00 Xingcan <xingc...@gmail.com>: > > > Hi all, > > > > I've read the document about dynamic table. Honestly, I think it's > > well-defined and ingeniously compromise the batch and stream. There > > are two questions about the design. > > > > 1) Though it's fine to take the stream as a snapshot of a dynamic > > table, a table is essentially a set while a stream is essentially an > > ordered list (with xxTime). I'm not sure if the operations on a set > > will all suit for a list (e.g union or merge?). Of course, we can add > > an "order by time" to all SQL instances, but will it be suitable? > > > > 2) As radu said, I also think inner query is essential for a query > > language. (I didn't see any select from (select) in the document). The > > problem is, the SQL is based on a closure theory while we can not > > prove that for a stream. Can the result from a stream operation be > another input? > > It depends. The window operator will convert "point of time" events to > > "period of time" events and I don't know if the nature of data have > > changed. Also, the partial emission will lead to heterogeneous results. > > > > BTW, the "Emission of dynamic tables" section seem to be a little > > incompatible with the whole document... > > > > Best, > > Xingcan > > > > On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran > > <radu.tudo...@huawei.com> > > wrote: > > > > > Hi Shaoxuan, > > > > > > Thanks for the feedback! > > > Regarding the proposal for relational queries that you referenced, I > > > am a bit confused with respect to its purpose and evolution with > > > respect to > > the > > > current implementation of stream sql - is it suppose to replace this > > > implementation, to complement it....but I will send another email > > > about this as I guess this can be a standalone discussion tread > > > > > > Also, regarding the join stream-to-stream I intend to start another > > > discussion about this such that we can decide all together if we can > > start > > > some implementation/design now or we need to wait. > > > > > > Now, regarding the inner queries and the points you raised. It is > > > true that in general an inner join would work like any other join > > > (which obviously requires some buffering capabilities and mechanisms > > > to restrict the infinite growth for the join state composition). > > > However, at least > > for > > > some cases of supporting inner queries we can support them without > > > the > > need > > > for buffering mechanisms or full support for inner join / left join. > > > Basically the logical operator in which an inner query is translated > > (left > > > join with an always true condition is to some extend more similar to > > UNION > > > ,- and the union implementation, then the implementation we will > > > have for the joins). This is why I believe we can already provide > > > the support for this (I also tested a PoC implementation internally > > > for this and it > > works). > > > In terms of examples when we could use this, please see the next 2 > > > examples. Please let me know what do you think and whether it is > > > worth designing the jira issue perhaps with some more details > > > (including the technical details). > > > > > > Consider the example below: > > > > > > SELECT STREAM user > > > FROM inputstream > > > WHERE amount > (SELECT STREAM Min(amount2) FROM > > > inputstream2) > > > > > > The point of this is to restrict the values you are selecting based > > > on some value that you have from the other stream. Consider the > > > values below that come in each stream > > > > > > Inputstream inputstream2 Result > > > User1,100 user1 (because there is > > no > > > value in inputstream2 and the left join should not restrict the > > > output in this case) > > > X,x,10 nothing as there is no > > > event in inputstream to be outputted. Min will become from now 10 > > > User2, 20 user2 (because 20 is > > > greater than 10 which is the minimum retain in inputstream2) > > > X,x,20 nothing as there is no > > > event in inputstream to be outputted. Min will remain from now 10 > > > X,x, 5 nothing as there is no > > > event in inputstream to be outputted. Min will become from now 5 > > > User3, 8 User3 (because 8 is > > > greater than 5) > > > .... > > > > > > > > > The goal for the final usage of this is to be able among others to > > > define multiple window processing on the same input stream. Consider: > > > > > > SELECT STREAM user > > > FROM inputstream > > > WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY > > > timestamp RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM > > > inputstream) < amount > > > > > > > > > Assume you have the following events each coming every 30 minutes > > > User1, 100 -> Average is 100 and the output of the topology that > > > implements the query is NULL (no output as 100 is not > than 100) > > > User2, 10 -> Average is 55 and the output of the topology that > > > implements the query is NULL (no output as 10 is not > than 55) > > > User3, 40 -> Average is 25 (10+40) and the output of the topology > that > > > implements the query is User3 (40 is > than 25) .... > > > Although the query as it is depends on aggregates and windows, the > > > operator to implement the inner query can be implemented > > > independently of functions that are contained in the query. Also, > > > there is no need for a window or buffering to implement the logic > > > for assembling the results > > from > > > the inner query. > > > > > > > > > Best regards, > > > > > > -----Original Message----- > > > From: Shaoxuan Wang [mailto:wshaox...@gmail.com] > > > Sent: Thursday, January 26, 2017 4:36 AM > > > To: dev@flink.apache.org > > > Subject: Re: STREAM SQL inner queries > > > > > > Hi Radu, > > > Similar as the stream-stream join, this stream-stream inner query > > > does > > not > > > seem to be well defined. It needs provide at least some kind of > > > window bounds to complete the streaming SQL semantics. If this is an > > > unbounded join/select, a mechanism of how to store the infinite date > > > has to be considered. I may not fully understand your proposal. > > > Could you please provide more details about this inner query, say > > > giving some examples of input and output. It would be also great if > > > you can explain the use case > > of > > > this inner query. This helps us to understand the semantics. > > > > > > It should also be noted that, we have recently decided to unify > > > stream > > and > > > batch query with the same regular (batch) SQL. Therefore we have > > > removed the support for STREAM keyword in flink Streaming SQL. In > > > the past > > several > > > months, Fabian and Xiaowei Jiang have started to work on the future > > > Relational Queries on flink streaming. Fabian has drafted a very > > > good design doc, https://goo.gl/m31kkE. The design is based on a new > > > concept of dynamic table whose content changes over time, thereby > > > can be derived from streams. With this dynamic table, stream query > > > can be done via > > regular > > > (batch) SQL. Besides some syntax sugar, there is not too much > > > difference between batch query and stream query (in terms of what > > > and where of a > > query > > > is executed). Stream query has addition characters in the manners of > > > when to emit a result and how to refine the result considering the > retraction. > > > > > > Hope this helps and look forward to working with you on streaming SQL. > > > > > > Regards, > > > Shaoxuan > > > > > > > > > On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran > > > <radu.tudo...@huawei.com> > > > wrote: > > > > > > > Hi all, > > > > > > > > I would like to open a jira issue (and then provide the > > > > implementation) for supporting inner queries. The idea is to be > > > > able to support SQL queries as the ones presented in the scenarios > below. > > > > The key idea is that supporting inner queries would require to > > > > have the > > > implementation for: > > > > > > > > è JOIN (type = left and condition = true) - Basically this is a > > > > simple implementation for a join function between 2 streams that > > > > does not require any window support behind the scenes as there is > > > > no condition on which to perform the join > > > > > > > > è SINGLE_VALUE - this operator would require to provide one value > > > > to be furthered joined. In the context of streaming this value > > > > should basically evolve with the contents of the window. This > > > > could be implemented with a flatmap function as left joins would > > > > allow also to do the mapping with null values > > > > > > > > We can then extend this initial and simple implementation to > > > > provide support for joins in general (conditional joins, right > > > > joins..) or we can isolate this implementation for this specific > > > > case of inner queries and go with a totally new design for stream > > > > to stream joins (might be needed depending on what is the decision > > > > behind on how to support the conditional > > > > mapping) > > > > > > > > What do you think about this? > > > > > > > > Examples of scenarios to apply > > > > > > > > SELECT STREAM amount, > > > > (SELECT id FROM inputstream1) AS field1 FROM inputstream2 > > > > > > > > Translated to > > > > LogicalProject(amount=[$1], c=[$4]) > > > > LogicalJoin(condition=[true], joinType=[left]) > > > > LogicalTableScan(table=[[inputstream1]]) > > > > LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) > > > > LogicalProject(user_id=[$0]) > > > > LogicalTableScan(table=[[inputstream2]]) > > > > > > > > Or from the same stream - perhaps interesting for applying some > > > > more complex operations within the inner query SELECT STREAM > > > > amount, (SELECT id FROM inputstream1) AS field1 FROM inputstream1 > > > > > > > > Translated to > > > > LogicalProject(amount=[$1], c=[$4]) > > > > LogicalJoin(condition=[true], joinType=[left]) > > > > LogicalTableScan(table=[[inputstream1]]) > > > > LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) > > > > LogicalProject(user_id=[$0]) > > > > LogicalTableScan(table=[[inputstream1]]) > > > > > > > > Or used to do the projection > > > > SELECT STREAM amount, c FROM (SELECT *,id AS c FROM inputstream1) > > > > > > > > Translated to > > > > LogicalProject(amount=[$1], c=[$5]) > > > > LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], > > c=[$5]) > > > > LogicalTableScan(table=[[inputstream1]]) > > > > > > > > > > > > Or in the future even > > > > SELECT STREAM amount, myagg FROM (SELECT STREAM *, SUM(amount) > > > > OVER window AS myagg FROM inputstream1)) ... > > > > > > > > > > > > > > > > > > > > > >