Hi Fabian, Thanks for this perspective. I agree with you. I will wait a couple of more days to see if others will provide additional feedback. After this I will start working to define the jira issue that we can refine. Also as you mentioned, I believe there will be several iterations and refinements over the implementation (potentially to start from very simple and naïve functioning and then extend it to provide more elegant semantics).
Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -----Original Message----- From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Friday, February 03, 2017 12:15 AM To: dev@flink.apache.org Subject: Re: STREAM SQL inner queries Hi Radu, I think mostly processing time queries will fall into the category of queries that will never require to update a previously emitted result. Event-time queries have to prepare for late arriving data (which is not possible with processing time) and use cases that require event-time usually do not want to drop late data. The query semantics that you intended for your first example in this thread should be implementable without updates for processing time. However, the query would need to be differently specified, similar as the following one which was adapted from the Temporal Table document [1]. SELECT i2.amount, i1.id FROM InputStream2 AS i2, InputStream1 AS i1 WHERE procTime(i1) = ( SELECT MAX(procTime(i1_2)) FROM InputStream1 AS i1_2 AND procTime(i1_2) <= procTime(i2)) Note that the query is not a simple left-join, i.e., the number for returned rows is larger than InputStream2 if more more than one rows in InputStream1 have the same procTime() (might be OK, if InputStream1 is only slowly changing). Also note, that we have to think more about the procTime() function which is intended to just be a marker for processing time. However, in case of join queries we need to indicate which table we refer to. So this detail needs to be fleshed out. I think in principle, we could start to work on such joins, but given that the condition is not trivial and involves a correlated subquery it might take more time than initially expected to implement the optimization and translation of such joins. Best, Fabian [1] https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szpkbGqFMBtzYiIY4dHe0Q 2017-01-31 22:47 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi, > > I understand the logic and indeed considering the " batch and stream > query equality " it makes the version you have proposed (with the > materialized view for inputstream2. > > You also mentioned there might be some queries that will never require > to update previously emitted results such as queries that discard late > arriving data and do not compute early results. For these we can and > should apply runtime optimizations and simply append to the previously > emitted records. > > Should we thus try to identify now such queries and provide the inner > query implementation for these? Should we wait and see how we deal > with materilized view first? > Basically I am asking what should we do now? > > BTW - thanks for the patience for the discussion and for brainstorming > on this! > > -----Original Message----- > From: Fabian Hueske [mailto:fhue...@gmail.com] > Sent: Tuesday, January 31, 2017 3:45 PM > To: dev@flink.apache.org > Subject: Re: STREAM SQL inner queries > > Hi, > > If the goal is that the materialized result of a streaming query > should be equivalent to the result of a batch query on the > materialized input, we need to update previously emitted data. > Only appending to the already emitted results will not work in most of > the cases. > > In case of the join query you proposed > > SELECT STREAM amount, > (SELECT id FROM inputstream1 ORDER BY time LIMIT 1) AS field1 FROM > inputstream2 > > we would need to fully materialize inputstream2 (and all emitted > result rows). Inputstream1 would not need to be materialized. > When a new record from inputstream1 would arrive, we would need to > discard all previously emitted rows and join the whole inputstream2 > with the new single-value. > This does of course not work in practice. Hence the query would not be > allowed by the optimizer. > > The problem could be fixed by explicitly specifying the join > condition, that you are implicitly expecting, i.e., that each record > of inputstream2 is joined with the current single-value of inputstream1. > If we would rely on the engine to implicitly apply this join condition > by not updating previously emitted records, there would be two major problems: > > 1) the query would not compute what it states (given batch and stream > query > equality) > 2) the result would not be well defined and arbitrary, i.e., it does > not solely depend on the data and the query but on the query > processors, ingestion speed, etc. > > There might be some queries that will never require to update > previously emitted results such as queries that discard late arriving > data and do not compute early results. > For these we can and should apply runtime optimizations and simply > append to the previously emitted records. > > The dynamic table document proposes two output modes: > - update mode by key > - append/retract mode which retracts invalid results and appends new > results. > > A mode that only appends would of course be possible to implement but > would suffer from inconsistent semantics for most queries. > > Best, Fabian > > > > 2017-01-31 13:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > Hi, > > > > I was thinking about this reply... > > I am not sure I understand exactly the idea why would you need to > > keep the whole state for Option 2. From my point of view this is not > > needed (and I see this as the easy case). The main reason is that > > you have the SINGLE_VALUE operator which would imply that you do not > > keep the whole state but rather update a single value. This is of > > course valid only for the operators that do not require to re-apply > > a full aggregation. For example it would work for MIN, MAX, Value > > Selection (e.g. last value)...but would not work on SUM, Average or > > Count...unless is an unbound aggregation where you only update with > > the new values. Basically we could fail/throw an exception just like > > for the dynamic tables in case there are not enough resources to > > enable to compute the query > > > > Nevertheless, I see the discussion goes binding this with the > > concept of the dynamic tables. In this case I would suggest that the > > distinction between the two option to be done based on the existence > > of an ID in the stream. This is the idea that makes the > > differentiation between append tables and update tables. We could > > use > the same here. > > > > If the inner stream on which we apply the inner query has an ID, > > then option 1 (recomputed and apply updates based on retraction and > > all others); otherwise then option 2 (only make an update in the > > operator that keeps the single value - if this is possible) > > > > Best regards, > > > > Dr. Radu Tudoran > > Senior Research Engineer - Big Data Expert IT R&D Division > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center > > Riesstrasse 25, 80992 München > > > > E-mail: radu.tudo...@huawei.com > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 > > Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, > > Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, > > Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, > > Amtsgericht Düsseldorf, HRB 56063, > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and > > its attachments contain confidential information from HUAWEI, which > > is intended only for the person or entity whose address is listed > > above. Any use of the information contained herein in any way > > (including, but not limited to, total or partial disclosure, > reproduction, > > or dissemination) by persons other than the intended recipient(s) is > > prohibited. If you receive this e-mail in error, please notify the > > sender by phone or email immediately and delete it! > > > > > > -----Original Message----- > > From: Fabian Hueske [mailto:fhue...@gmail.com] > > Sent: Monday, January 30, 2017 8:39 PM > > To: dev@flink.apache.org > > Subject: Re: STREAM SQL inner queries > > > > Hi Radu, > > > > Updates of the result (materialized view) are not always simple appends. > > If the query is a non-windowed aggregation or a windowed aggregation > > (or > > join) with late data, some parts of the result need to be removed or > > updated. > > I think in order to implement the second option, we would need to > > emit > the > > complete result for every update because we do not know which parts > > of > the > > previous view became invalid. This is not practical, because it > > would > mean > > to hold the complete result as state and to the complete result for > > every update. > > > > In contrast, the first option sends retraction and update records to > > update the latest view. > > Moreover, we only need to hold those results as state that might be > > updated and not the complete result. > > > > I agree that the discussion helps a lot. > > > > Best, Fabian > > > > 2017-01-30 15:49 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > Hi, > > > > > > I would like to ask for a further clarifications about the statement: > > > " a streaming query should be equivalent to the result of a batch > > > query that is executed on the materialized stream " > > > > > > I do agree with the principle but the question that we I would > > > like to ask is how do we interpret the relation between a stream > > > and the materialized view of the stream at some point. If we > > > consider that we materialize the view on the elements we received > > > on the stream until moment X (let's say it had elements 1 2 3) and > > > we apply an SQL query, indeed this should give the exact same > > > result as if 1 2 3 would be in a database/batch and we apply the > > > same logic. However, some time later in the future, if we receive > > > another element (e.g. 4) do we have the same materialized view, > > > which we update or we consider a new state, a new materialized > > > view and therefore a new scenario. Basically assuming we take last > > > value. Then we can have two > > > options: > > > > > > Option 1) At moment x the output is 3 (last value of the > > > materialized view of 1 2 3 is 3) and then at moment X+1 when 4 > > > arrives, the last value remains unchanged 1 (it is the same > > > materialize view) Option 2) At moment x the output is 3 (last > > > value of the materialized view of 1 2 3 is 3) and then at moment > > > X+1 when 4 arrives, the first value is modified to 4 (it is a new > > > materialized view and the output is as we would apply the SQL > > > query on the batch case with all elements 1 2 3 4) > > > > > > I would assume (based on previous discussions and the panel in > > > flink > > > forward) that we rather go for option 2. The correct output of a > > > SQL query on a stream is that one would create a materialized view > > > at that point in time and apply the query in batch mode. When a > > > new element arrives (stream > > > evolves) then we will get a new materialized view. > > > > > > If this is the case as my assumption that I would say that SINGLE_ > > > VALUE should be continuously updated as the stream on top of which > > > is applied evolves. > > > > > > My 2cents (anyway - I think the discussion is very useful and > > > hopefully applicable also for other operators/scenarios that we > > > are going to > > > implement) > > > > > > > > > > > > > > > -----Original Message----- > > > From: Fabian Hueske [mailto:fhue...@gmail.com] > > > Sent: Monday, January 30, 2017 2:09 PM > > > To: dev@flink.apache.org > > > Subject: Re: STREAM SQL inner queries > > > > > > Hi Radu, > > > > > > I think it is most important to get the semantics of a streaming > > > query right. > > > In my opinion, the result of a streaming query should be > > > equivalent to the result of a batch query that is executed on the > > > materialized > stream. > > > It should not matter whether you append the records received from > > > a Kafka topic to a table and execute a batch query on that table > > > or if you do run the same query continuously on the Kafka topic. > > > > > > It is correct, that some queries become too expensive to compute > > > if we implement these semantics. > > > However, this would be the price to pay for stream-batch > > > consistent semantics. > > > > > > Regarding the inner query case. I think a query should yield the > > > same result, regardless of whether it is an inner or outer query. > > > This is one of the core principles of SQL, that I would not change. > > > > > > Best, Fabian > > > > > > 2017-01-30 12:54 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > > > Hi Fabian, > > > > > > > > Thanks for the link and for the remarks. > > > > > > > > I do not imagine the behavior of the inner query necessary on > > > > the lines you describe. I specifically refer to " is applies as > > > > well for the inner query. However, as the result of the inner > > > > query evolves, also the result of the join needs to be constantly > > > > recomputed. > > > > Hence, for every new result of (SELECT x FROM input1 ORDER BY > > > > time LIMIT 1), we would need to emit an update for each record > > > > that was > > joined before." > > > > > > > > If we consider such a working scenario, then the behavior would > > > > be something like the one below, if I understand correctly. Take > > > > for example the query" STREAM amount, (SELECT id FROM > > > > inputstream1) AS > > > > field1 FROM inputstream2" > > > > > > > > Stream1 Stream2 Output > > > > Id1 > > > > User1,10 (10,Id1) > > > > User2,11 (11,Id2) > > > > Id3 (10,Id3), (11, Id3) > > > > User3,9 (9,Id3) > > > > ... > > > > > > > > ...regardless of how we express the logic of the inner query (do > > > > we use LIMIT 1, we don't....), I would expect that the outputs > > > > that were emitted are not retracted or modified in the future. > > > > In the previous example the > > > > updates: (10,Id3), (11, Id3) should never happen. With this in > > > > mind although the inner query is translated to a LogicalJoin > > > > operator, the functionality is more similar with a union or a > > > > coFlatMap, where we only use one input as the holder for what to > > > > associate in the future for the other. Anyway, I do not see the > > > > need to have any buffers (as for the general case of joins) to > > > > compute the content for creating the output from the inner query. > > > > > > > > Regarding your previous comment about failing based on > > > > SINGLE_VALUE > > > > verification: this is also something to be just agree. After > > > > all, as the implementation is decoupled from the parsing of the > > > > query, we can implement any of the behaviors: either through an > > > > error when a second element or update would happen in the second > > > > stream, or just update the single value state for future use. > > > > > > > > All in all, it think we just need to clarify the expectation to have. > > > > Please let me know what do you think. > > > > > > > > I agree with the approach of starting small - even with some > > > > very limited cases when we support inner queries and then extend > > > > or define the general cases. > > > > > > > > > > > > Dr. Radu Tudoran > > > > Senior Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center > > > > Riesstrasse 25, 80992 München > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > Mobile: +49 15209084330 > > > > Telephone: +49 891588344173 > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 > > > > Düsseldorf, Germany, www.huawei.com Registered Office: > > > > Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing > > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der > > > > Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail > > > > and its attachments contain confidential information from > > > > HUAWEI, which is intended only for the person or entity whose > > > > address is listed above. Any use of the information contained > > > > herein in any way (including, but not limited to, total or > > > > partial disclosure, > > > reproduction, > > > > or dissemination) by persons other than the intended > > > > recipient(s) is prohibited. If you receive this e-mail in error, > > > > please notify the sender by phone or email immediately and delete it! > > > > > > > > -----Original Message----- > > > > From: Fabian Hueske [mailto:fhue...@gmail.com] > > > > Sent: Monday, January 30, 2017 12:33 PM > > > > To: dev@flink.apache.org > > > > Subject: Re: STREAM SQL inner queries > > > > > > > > Hi Radu, > > > > > > > > I thought about your join proposal again and think there is an > > > > issue with the semantics. > > > > > > > > The problem is that the result of a query is recomputed as new > > > > data arrives in the dynamic table. > > > > This applies as well for the inner query. However, as the result > > > > of the inner query evolves, also the result of the join needs to > > > > be constantly recomputed. Hence, for every new result of (SELECT > > > > x FROM > > > > input1 ORDER BY time LIMIT 1), we would need to emit an update > > > > for each record that was joined before. > > > > > > > > In order to prevent this, the join would need to have a > > > > time-based join predicate defining that a tuple of the outer > > > > query should join with the current value of the inner query that > > > > the time of its own > > timestamp. > > > Such a > > > > predicate can be expressed in SQL but this is quite cumbersome. > > > > > > > > Julian Hyde (Apache Calcite committer) discussed similar use > > > > cases in a document and proposed something called Temporal Tables [1]. > > > > In some sense, the proposed dynamic tables are a special case of > > > > temporal table always reflecting the current point in time > > > > (i.e., not a previous point in time). > > > > > > > > Best, Fabian > > > > > > > > [1] > > > > https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szpkbGq > > > > F > > > > MBtzYiIY4dHe0Q > > > > > > > > 2017-01-27 21:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > > > > > > > > > Hi Radu, > > > > > > > > > > I was a bit surprised, that Calcite's parser accepted your query. > > > > > Hence, I check the Calcite plan and and had look at the > > > > > documentation of Calcite's SqlSingleValueAggFunction: > > > > > > > > > > > SINGLE_VALUE aggregate function returns the input value if > > > > > > there is only > > > > > one value in the input; Otherwise it triggers a run-time error. > > > > > > > > > > So, the query would only pass if the inner query returns a > > > > > single value (which would usually not be the case of a stream). > > > > > The SINGLE_VALUE check is not added to the plan if the inner > > > > > query is guaranteed to return a single value (LIMIT 1, global > aggregation). > > > > > > > > > > Anyway, I agree that we could start to add the simple cases of > > > > > these joins for processing time. > > > > > For event-time, I think we need to consider late arriving data > > > > > and support retraction values. > > > > > > > > > > Best, Fabian > > > > > > > > > > 2017-01-27 10:43 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > > > > > >> Hi all, > > > > >> > > > > >> Thanks for the feedback! > > > > >> > > > > >> I agree that we should get the semantics right and after we > > > > >> can implement it. I think it would be quite useful. Now, > > > > >> regarding the > > > > remarks you made: > > > > >> > > > > >> > > > > >> "> SELECT STREAM amount, (SELECT id FROM inputstream1) AS > > > > >> field1 FROM > > > > >> inputstream2 > > > > >> you are suggesting that the subquery (SELECT id FROM > > > > >> inputstream1) returns a single value (I assume the last > > > > >> received value). I would interpret the query differently and > > > > >> expect it to return the values of all rows of the > > > > >> inputstream1 up to the > current > > point in time. > > > > >> " > > > > >> > > > > >> It is a good point. It is not so much that I wanted to > > > > >> suggest that this should be the syntax to use - I just relied > > > > >> basically on the logical operators that calcite has parsed > > > > >> the query into (JOIN + > > > SINGLE > > > > VALUE). > > > > >> Based on this logical translation I would say the correct > > > > >> implementation for this translation is to return one value > > > > >> not necessarily the whole content of the stream. Anyway, we > > > > >> are not restricted to this as we could potentially use > > > > >> different rules in > > > > calcite to alter the resulting plan. > > > > >> > > > > >> However, if we decide that such queries should return the > > > > >> whole stream rather than a single value - we are indeed > > > > >> tapping in the problem of potentially unbounded cases. For > > > > >> this I do agree that the approach you proposed to rely on dynamic > > > > >> tables is very good. > > > > >> In such a case we would just pass to the upper operators the > > > > >> entire content of > > > > the dynamic table. > > > > >> For that matter it works also for the single value (as the > > > > >> table would contain only one value). However, for the simple > > > > >> case of returning a single value we can provide even now an > > > > >> implementation and we do not need to wait until the full > > > > >> functionality of dynamic > > > > tables is provided. > > > > >> > > > > >> In the same time I also agree that the syntax " a FROM > > > > >> inputstream ORDER BY time LIMIT 1" is elegant. I have not > > > > >> issue to consider the case of inner queries to be translated > > > > >> like this only when they would > > > > have the "Limit 1" > > > > >> specified or directly only when they are provided in such a form. > > > > >> > > > > >> I will wait for additional remarks in order to all agree on a > > > > >> specific semantic and then I will push this in a jira issue > > > > >> to be furthered review and validated. > > > > >> > > > > >> Best regards, > > > > >> > > > > >> > > > > >> > > > > >> Dr. Radu Tudoran > > > > >> Senior Research Engineer - Big Data Expert IT R&D Division > > > > >> > > > > >> > > > > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center > > > > >> Riesstrasse 25, 80992 München > > > > >> > > > > >> E-mail: radu.tudo...@huawei.com > > > > >> Mobile: +49 15209084330 > > > > >> Telephone: +49 891588344173 > > > > >> > > > > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 > > > > >> Düsseldorf, Germany, www.huawei.com Registered Office: > > > > >> Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing > > > > >> Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der > > > > >> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > > > >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This > > > > >> e-mail and its attachments contain confidential information > > > > >> from HUAWEI, which is intended only for the person or entity > > > > >> whose address > > > is > > > > >> listed above. Any use of the information contained herein in > > > > >> any way (including, but not limited to, total or partial > > > > >> disclosure, > > > > reproduction, > > > > >> or dissemination) by persons other than the intended > > > > >> recipient(s) is prohibited. If you receive this e-mail in > > > > >> error, please notify the > > > > sender > > > > >> by phone or email immediately and delete it! > > > > >> > > > > >> -----Original Message----- > > > > >> From: Fabian Hueske [mailto:fhue...@gmail.com] > > > > >> Sent: Thursday, January 26, 2017 4:51 PM > > > > >> To: dev@flink.apache.org > > > > >> Subject: Re: STREAM SQL inner queries > > > > >> > > > > >> Hi everybody, > > > > >> > > > > >> thanks for the proposal Radu. > > > > >> If I understood it correctly, you are proposing a left join > > > > >> between a stream and a single value (which is compute from a > > stream). > > > > >> This makes sense and should be a common use case. > > > > >> > > > > >> However, I think some of your example queries do not return a > > > > >> single value as required for the join. > > > > >> > > > > >> In your example: > > > > >> > SELECT STREAM amount, (SELECT id FROM inputstream1) AS > > > > >> > field1 FROM > > > > >> inputstream2 > > > > >> > > > > >> you are suggesting that the subquery (SELECT id FROM > > > > >> inputstream1) returns a single value (I assume the last > > > > >> received > > value). > > > > >> I would interpret the query differently and expect it to > > > > >> return the values of all rows of the inputstream1 up to the > > > > >> current point in > > > time. > > > > >> IMO, a query like "SELECT a FROM inputstream ORDER BY time > > > > >> LIMIT > 1" > > > > would > > > > >> capture the semantics better. > > > > >> > > > > >> The subquery > > > > >> > (SELECT AVERAGE(amount) OVER (ORDER BY timestamp RANGE > > > > >> > INTERVAL > > > > >> > 1 > > > HOUR > > > > >> PRECEDING) AS hour_sum FROM inputstream) > > > > >> > > > > >> has a similar problem and would return one row for each > > > > >> record of inputstream, i.e., not a single value. > > > > >> > > > > >> Anyway, if we get the semantics of the query that computes > > > > >> the single value right, I think this type of join should be > > > > >> well covered by the dynamic table proposal. > > > > >> The single value input will be a dynamic table (of constant > > > > >> size = 1) which is continuously updated by the engine. > > > > >> Joining this table to to a dynamic (append) table will result > > > > >> in a continuously growing dynamic table, which can be emitted > > > > >> as a > > stream. > > > > >> > > > > >> This would look very similar as you proposed but we would > > > > >> need to make sure that the single value query actually > > > > >> returns a single > > value. > > > > >> > > > > >> @Xingcan Thanks for your feedback. > > > > >> I would suggest to move the general discussion about the > > > > >> dynamic table proposal to the thread that Radu started (I > > > > >> responded there a few > > > > minutes > > > > >> ago). > > > > >> > > > > >> Just a few comments here: By logically converting a stream > > > > >> into a > > > > dynamic > > > > >> table we have well defined semantics for the operation such > > > > >> as > > > > aggregations > > > > >> and joins. > > > > >> However, you are right, that this does not mean that we can > > > efficiently > > > > >> apply all operations on dynamic tables that we can apply on > > > > >> an actual > > > > batch > > > > >> table. Some operations are just too expensive or require too > > > > >> much > > > state > > > > to > > > > >> be performed in a streaming fashion. So yes, there will be > > > > >> some restrictions but that is rather to the nature of stream > > > > >> processing > > > than > > > > to > > > > >> the idea of dynamic tables, IMO. > > > > >> > > > > >> Best, > > > > >> Fabian > > > > >> > > > > >> > > > > >> 2017-01-26 11:33 GMT+01:00 Xingcan <xingc...@gmail.com>: > > > > >> > > > > >> > Hi all, > > > > >> > > > > > >> > I've read the document about dynamic table. Honestly, I > > > > >> > think it's well-defined and ingeniously compromise the > > > > >> > batch and stream. There are two questions about the design. > > > > >> > > > > > >> > 1) Though it's fine to take the stream as a snapshot of a > > > > >> > dynamic table, a table is essentially a set while a stream > > > > >> > is essentially an ordered list (with xxTime). I'm not sure > > > > >> > if the operations on a set will all suit for a list (e.g > > > > >> > union or merge?). Of course, we can > > > add > > > > >> > an "order by time" to all SQL instances, but will it be > suitable? > > > > >> > > > > > >> > 2) As radu said, I also think inner query is essential for > > > > >> > a query language. (I didn't see any select from (select) in > > > > >> > the > > document). > > > The > > > > >> > problem is, the SQL is based on a closure theory while we > > > > >> > can not prove that for a stream. Can the result from a > > > > >> > stream operation be > > > > >> another input? > > > > >> > It depends. The window operator will convert "point of time" > > > > >> > events > > > to > > > > >> > "period of time" events and I don't know if the nature of > > > > >> > data have changed. Also, the partial emission will lead to > > > > >> > heterogeneous > > > > results. > > > > >> > > > > > >> > BTW, the "Emission of dynamic tables" section seem to be a > > > > >> > little incompatible with the whole document... > > > > >> > > > > > >> > Best, > > > > >> > Xingcan > > > > >> > > > > > >> > On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran > > > > >> > <radu.tudo...@huawei.com> > > > > >> > wrote: > > > > >> > > > > > >> > > Hi Shaoxuan, > > > > >> > > > > > > >> > > Thanks for the feedback! > > > > >> > > Regarding the proposal for relational queries that you > > > referenced, I > > > > >> > > am a bit confused with respect to its purpose and > > > > >> > > evolution with respect to > > > > >> > the > > > > >> > > current implementation of stream sql - is it suppose to > > > > >> > > replace > > > this > > > > >> > > implementation, to complement it....but I will send > > > > >> > > another email about this as I guess this can be a > > > > >> > > standalone discussion tread > > > > >> > > > > > > >> > > Also, regarding the join stream-to-stream I intend to > > > > >> > > start > > > another > > > > >> > > discussion about this such that we can decide all > > > > >> > > together if we > > > can > > > > >> > start > > > > >> > > some implementation/design now or we need to wait. > > > > >> > > > > > > >> > > Now, regarding the inner queries and the points you raised. > > > > >> > > It is true that in general an inner join would work like > > > > >> > > any other join (which obviously requires some buffering > > > > >> > > capabilities and > > > mechanisms > > > > >> > > to restrict the infinite growth for the join state > composition). > > > > >> > > However, at least > > > > >> > for > > > > >> > > some cases of supporting inner queries we can support > > > > >> > > them without the > > > > >> > need > > > > >> > > for buffering mechanisms or full support for inner join / > > > > >> > > left > > > join. > > > > >> > > Basically the logical operator in which an inner query is > > > translated > > > > >> > (left > > > > >> > > join with an always true condition is to some extend more > > > > >> > > similar > > > to > > > > >> > UNION > > > > >> > > ,- and the union implementation, then the implementation > > > > >> > > we will have for the joins). This is why I believe we can > > > > >> > > already provide the support for this (I also tested a PoC > > > > >> > > implementation > > > internally > > > > >> > > for this and it > > > > >> > works). > > > > >> > > In terms of examples when we could use this, please see > > > > >> > > the next 2 examples. Please let me know what do you think > > > > >> > > and whether it is worth designing the jira issue perhaps > > > > >> > > with some more details (including the technical details). > > > > >> > > > > > > >> > > Consider the example below: > > > > >> > > > > > > >> > > SELECT STREAM user > > > > >> > > FROM inputstream > > > > >> > > WHERE amount > (SELECT STREAM Min(amount2) FROM > > > > >> > > inputstream2) > > > > >> > > > > > > >> > > The point of this is to restrict the values you are > > > > >> > > selecting > > > based > > > > >> > > on some value that you have from the other stream. > > > > >> > > Consider the values below that come in each stream > > > > >> > > > > > > >> > > Inputstream inputstream2 Result > > > > >> > > User1,100 user1 (because > > > there > > > > >> is > > > > >> > no > > > > >> > > value in inputstream2 and the left join should not > > > > >> > > restrict the output in this case) > > > > >> > > X,x,10 nothing as > there > > > is > > > > no > > > > >> > > event in inputstream to be outputted. Min will become > > > > >> > > from now > > 10 > > > > >> > > User2, 20 user2 (because > > 20 > > > is > > > > >> > > greater than 10 which is the minimum retain in inputstream2) > > > > >> > > X,x,20 nothing as > there > > > is > > > > no > > > > >> > > event in inputstream to be outputted. Min will remain > > > > >> > > from now > > 10 > > > > >> > > X,x, 5 nothing as > there > > > is > > > > no > > > > >> > > event in inputstream to be outputted. Min will become > > > > >> > > from > now 5 > > > > >> > > User3, 8 User3 > (because 8 > > > is > > > > >> > > greater than 5) > > > > >> > > .... > > > > >> > > > > > > >> > > > > > > >> > > The goal for the final usage of this is to be able among > > > > >> > > others to define multiple window processing on the same > > > > >> > > input > > stream. > > > > Consider: > > > > >> > > > > > > >> > > SELECT STREAM user > > > > >> > > FROM inputstream > > > > >> > > WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER > > > > >> > > BY timestamp RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum > > > > >> > > FROM > > > > >> > > inputstream) < amount > > > > >> > > > > > > >> > > > > > > >> > > Assume you have the following events each coming every 30 > > minutes > > > > >> > > User1, 100 -> Average is 100 and the output of the topology > > that > > > > >> > > implements the query is NULL (no output as 100 is not > > > > > >> > > than > > 100) > > > > >> > > User2, 10 -> Average is 55 and the output of the topology > > that > > > > >> > > implements the query is NULL (no output as 10 is not > > > > > >> > > than > 55) > > > > >> > > User3, 40 -> Average is 25 (10+40) and the output of the > > > topology > > > > >> that > > > > >> > > implements the query is User3 (40 is > than 25) .... > > > > >> > > Although the query as it is depends on aggregates and > > > > >> > > windows, the operator to implement the inner query can be > > > > >> > > implemented independently of functions that are contained > > > > >> > > in the query. Also, there is no need for a window or > > > > >> > > buffering to implement the logic for assembling the > > > > >> > > results > > > > >> > from > > > > >> > > the inner query. > > > > >> > > > > > > >> > > > > > > >> > > Best regards, > > > > >> > > > > > > >> > > -----Original Message----- > > > > >> > > From: Shaoxuan Wang [mailto:wshaox...@gmail.com] > > > > >> > > Sent: Thursday, January 26, 2017 4:36 AM > > > > >> > > To: dev@flink.apache.org > > > > >> > > Subject: Re: STREAM SQL inner queries > > > > >> > > > > > > >> > > Hi Radu, > > > > >> > > Similar as the stream-stream join, this stream-stream > > > > >> > > inner query does > > > > >> > not > > > > >> > > seem to be well defined. It needs provide at least some > > > > >> > > kind of window bounds to complete the streaming SQL > > > > >> > > semantics. If this is > > > an > > > > >> > > unbounded join/select, a mechanism of how to store the > > > > >> > > infinite > > > date > > > > >> > > has to be considered. I may not fully understand your > proposal. > > > > >> > > Could you please provide more details about this inner > > > > >> > > query, say giving some examples of input and output. It > > > > >> > > would be also great > > > if > > > > >> > > you can explain the use case > > > > >> > of > > > > >> > > this inner query. This helps us to understand the semantics. > > > > >> > > > > > > >> > > It should also be noted that, we have recently decided to > > > > >> > > unify stream > > > > >> > and > > > > >> > > batch query with the same regular (batch) SQL. Therefore > > > > >> > > we have removed the support for STREAM keyword in flink > > > > >> > > Streaming SQL. In the past > > > > >> > several > > > > >> > > months, Fabian and Xiaowei Jiang have started to work on > > > > >> > > the > > > future > > > > >> > > Relational Queries on flink streaming. Fabian has drafted > > > > >> > > a very good design doc, https://goo.gl/m31kkE. The design > > > > >> > > is based on a > > > > new > > > > >> > > concept of dynamic table whose content changes over time, > > > > >> > > thereby can be derived from streams. With this dynamic > > > > >> > > table, stream query can be done via > > > > >> > regular > > > > >> > > (batch) SQL. Besides some syntax sugar, there is not too > > > > >> > > much difference between batch query and stream query (in > > > > >> > > terms of what and where of a > > > > >> > query > > > > >> > > is executed). Stream query has addition characters in the > > > > >> > > manners > > > of > > > > >> > > when to emit a result and how to refine the result > > > > >> > > considering the > > > > >> retraction. > > > > >> > > > > > > >> > > Hope this helps and look forward to working with you on > > > > >> > > streaming > > > > SQL. > > > > >> > > > > > > >> > > Regards, > > > > >> > > Shaoxuan > > > > >> > > > > > > >> > > > > > > >> > > On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran > > > > >> > > <radu.tudo...@huawei.com> > > > > >> > > wrote: > > > > >> > > > > > > >> > > > Hi all, > > > > >> > > > > > > > >> > > > I would like to open a jira issue (and then provide the > > > > >> > > > implementation) for supporting inner queries. The idea > > > > >> > > > is to be able to support SQL queries as the ones > > > > >> > > > presented in the > > > scenarios > > > > >> below. > > > > >> > > > The key idea is that supporting inner queries would > > > > >> > > > require to have the > > > > >> > > implementation for: > > > > >> > > > > > > > >> > > > è JOIN (type = left and condition = true) - Basically > > > > >> > > > this is a simple implementation for a join function > > > > >> > > > between 2 streams that does not require any window > > > > >> > > > support behind the scenes as there > > > is > > > > >> > > > no condition on which to perform the join > > > > >> > > > > > > > >> > > > è SINGLE_VALUE - this operator would require to provide > > > > >> > > > one > > > value > > > > >> > > > to be furthered joined. In the context of streaming > > > > >> > > > this value should basically evolve with the contents of > > > > >> > > > the window. This could be implemented with a flatmap > > > > >> > > > function as left joins would allow also to do the > > > > >> > > > mapping with null values > > > > >> > > > > > > > >> > > > We can then extend this initial and simple > > > > >> > > > implementation to provide support for joins in general > > > > >> > > > (conditional joins, right > > > > >> > > > joins..) or we can isolate this implementation for this > > > > >> > > > specific case of inner queries and go with a totally > > > > >> > > > new design for > > > stream > > > > >> > > > to stream joins (might be needed depending on what is > > > > >> > > > the > > > decision > > > > >> > > > behind on how to support the conditional > > > > >> > > > mapping) > > > > >> > > > > > > > >> > > > What do you think about this? > > > > >> > > > > > > > >> > > > Examples of scenarios to apply > > > > >> > > > > > > > >> > > > SELECT STREAM amount, > > > > >> > > > (SELECT id FROM inputstream1) AS field1 FROM > > > > >> > > > inputstream2 > > > > >> > > > > > > > >> > > > Translated to > > > > >> > > > LogicalProject(amount=[$1], c=[$4]) > > > > >> > > > LogicalJoin(condition=[true], joinType=[left]) > > > > >> > > > LogicalTableScan(table=[[inputstream1]]) > > > > >> > > > LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) > > > > >> > > > LogicalProject(user_id=[$0]) > > > > >> > > > LogicalTableScan(table=[[inputstream2]]) > > > > >> > > > > > > > >> > > > Or from the same stream - perhaps interesting for > > > > >> > > > applying some more complex operations within the inner > > > > >> > > > query SELECT STREAM amount, (SELECT id FROM > > > > >> > > > inputstream1) AS field1 FROM > > > inputstream1 > > > > >> > > > > > > > >> > > > Translated to > > > > >> > > > LogicalProject(amount=[$1], c=[$4]) > > > > >> > > > LogicalJoin(condition=[true], joinType=[left]) > > > > >> > > > LogicalTableScan(table=[[inputstream1]]) > > > > >> > > > LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) > > > > >> > > > LogicalProject(user_id=[$0]) > > > > >> > > > LogicalTableScan(table=[[inputstream1]]) > > > > >> > > > > > > > >> > > > Or used to do the projection SELECT STREAM amount, c > > > > >> > > > FROM (SELECT *,id AS c FROM > > > inputstream1) > > > > >> > > > > > > > >> > > > Translated to > > > > >> > > > LogicalProject(amount=[$1], c=[$5]) > > > > >> > > > LogicalProject(time=[$0], amount =[$1], date=[$2], > > > > >> > > > id =[$4], > > > > >> > c=[$5]) > > > > >> > > > LogicalTableScan(table=[[inputstream1]]) > > > > >> > > > > > > > >> > > > > > > > >> > > > Or in the future even > > > > >> > > > SELECT STREAM amount, myagg FROM (SELECT STREAM *, > > > > >> > > > SUM(amount) OVER window AS myagg FROM inputstream1)) ... > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >