Hi Radu,

I thought about your join proposal again and think there is an issue with
the semantics.

The problem is that the result of a query is recomputed as new data arrives
in the dynamic table.
This applies as well for the inner query. However, as the result of the
inner query evolves, also the result of the join needs to be constantly
recomputed. Hence, for every new result of (SELECT x FROM input1 ORDER BY
time LIMIT 1), we would need to emit an update for each record that was
joined before.

In order to prevent this, the join would need to have a time-based join
predicate defining that a tuple of the outer query should join with the
current value of the inner query that the time of its own timestamp. Such a
predicate can be expressed in SQL but this is quite cumbersome.

Julian Hyde (Apache Calcite committer) discussed similar use cases in a
document and proposed something called Temporal Tables [1].
In some sense, the proposed dynamic tables are a special case of temporal
table always reflecting the current point in time (i.e., not a previous
point in time).

Best, Fabian

[1]
https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szpkbGqFMBtzYiIY4dHe0Q

2017-01-27 21:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Radu,
>
> I was a bit surprised, that Calcite's parser accepted your query.
> Hence, I check the Calcite plan and and had look at the documentation of
> Calcite's SqlSingleValueAggFunction:
>
> > SINGLE_VALUE aggregate function returns the input value if there is only
> one value in the input; Otherwise it triggers a run-time error.
>
> So, the query would only pass if the inner query returns a single value
> (which would usually not be the case of a stream).
> The SINGLE_VALUE check is not added to the plan if the inner query is
> guaranteed to return a single value (LIMIT 1, global aggregation).
>
> Anyway, I agree that we could start to add the simple cases of these joins
> for processing time.
> For event-time, I think we need to consider late arriving data and support
> retraction values.
>
> Best, Fabian
>
> 2017-01-27 10:43 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
>
>> Hi all,
>>
>> Thanks for the feedback!
>>
>> I agree that we should get the semantics right and after we can implement
>> it. I think it would be quite useful. Now, regarding the remarks you made:
>>
>>
>> "> SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM
>> inputstream2
>> you are suggesting that the subquery (SELECT id FROM  inputstream1)
>> returns a single value (I assume the last received value). I would
>> interpret the query differently and expect it to return the values of all
>> rows of the inputstream1 up to the current point in time.
>> "
>>
>> It is a good point. It is not so much that I wanted to suggest that this
>> should be the syntax to use - I just relied basically on the logical
>> operators that calcite has parsed the query into (JOIN + SINGLE VALUE).
>> Based on this logical translation I would say the correct implementation
>> for this translation is to return one value not necessarily the whole
>> content of the stream. Anyway, we are not restricted to this as we could
>> potentially use different rules in calcite to alter the resulting plan.
>>
>> However, if we decide that such queries should return the whole stream
>> rather than a single value - we are indeed tapping in the problem of
>> potentially unbounded cases. For this I do agree that the approach you
>> proposed to rely on dynamic tables is very good. In such a case we would
>> just pass to the upper operators the entire content of the dynamic table.
>> For that matter it works also for the single value (as the table would
>> contain only one value). However, for the simple case of returning a single
>> value we can provide even now an implementation and we do not need to wait
>> until the full functionality of dynamic tables is provided.
>>
>> In the same time I also agree that the syntax  " a FROM inputstream ORDER
>> BY time LIMIT 1" is elegant. I have not issue to consider the case of inner
>> queries to be translated like this only when they would have the "Limit 1"
>> specified or directly only when they are provided in such a form.
>>
>> I will wait for additional remarks in order to all agree on a specific
>> semantic and then I will push this in a jira issue to be furthered review
>> and validated.
>>
>> Best regards,
>>
>>
>>
>> Dr. Radu Tudoran
>> Senior Research Engineer - Big Data Expert
>> IT R&D Division
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> European Research Center
>> Riesstrasse 25, 80992 München
>>
>> E-mail: radu.tudo...@huawei.com
>> Mobile: +49 15209084330
>> Telephone: +49 891588344173
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address is
>> listed above. Any use of the information contained herein in any way
>> (including, but not limited to, total or partial disclosure, reproduction,
>> or dissemination) by persons other than the intended recipient(s) is
>> prohibited. If you receive this e-mail in error, please notify the sender
>> by phone or email immediately and delete it!
>>
>> -----Original Message-----
>> From: Fabian Hueske [mailto:fhue...@gmail.com]
>> Sent: Thursday, January 26, 2017 4:51 PM
>> To: dev@flink.apache.org
>> Subject: Re: STREAM SQL inner queries
>>
>> Hi everybody,
>>
>> thanks for the proposal Radu.
>> If I understood it correctly, you are proposing a left join between a
>> stream and a single value (which is compute from a stream).
>> This makes sense and should be a common use case.
>>
>> However, I think some of your example queries do not return a single
>> value as required for the join.
>>
>> In your example:
>> > SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM
>> inputstream2
>>
>> you are suggesting that the subquery (SELECT id FROM  inputstream1)
>> returns a single value (I assume the last received value).
>> I would interpret the query differently and expect it to return the
>> values of all rows of the inputstream1 up to the current point in time.
>> IMO, a query like "SELECT a FROM inputstream ORDER BY time LIMIT 1" would
>> capture the semantics better.
>>
>> The subquery
>> > (SELECT AVERAGE(amount) OVER (ORDER BY timestamp RANGE INTERVAL 1 HOUR
>> PRECEDING) AS hour_sum FROM inputstream)
>>
>> has a similar problem and would return one row for each record of
>> inputstream, i.e., not a single value.
>>
>> Anyway, if we get the semantics of the query that computes the single
>> value right, I think this type of join should be well covered by the
>> dynamic table proposal.
>> The single value input will be a dynamic table (of constant size = 1)
>> which is continuously updated by the engine.
>> Joining this table to to a dynamic (append) table will result in a
>> continuously growing dynamic table, which can be emitted as a stream.
>>
>> This would look very similar as you proposed but we would need to make
>> sure that the single value query actually returns a single value.
>>
>> @Xingcan Thanks for your feedback.
>> I would suggest to move the general discussion about the dynamic table
>> proposal to the thread that Radu started (I responded there a few minutes
>> ago).
>>
>> Just a few comments here: By logically converting a stream into a dynamic
>> table we have well defined semantics for the operation such as aggregations
>> and joins.
>> However, you are right, that this does not mean that we can efficiently
>> apply all operations on dynamic tables that we can apply on an actual batch
>> table. Some operations are just too expensive or require too much state to
>> be performed in a streaming fashion. So yes, there will be some
>> restrictions but that is rather to the nature of stream processing than to
>> the idea of dynamic tables, IMO.
>>
>> Best,
>> Fabian
>>
>>
>> 2017-01-26 11:33 GMT+01:00 Xingcan <xingc...@gmail.com>:
>>
>> > Hi all,
>> >
>> > I've read the document about dynamic table. Honestly, I think it's
>> > well-defined and ingeniously compromise the batch and stream. There
>> > are two questions about the design.
>> >
>> > 1) Though it's fine to take the stream as a snapshot of a dynamic
>> > table, a table is essentially a set while a stream is essentially an
>> > ordered list (with xxTime). I'm not sure if the operations on a set
>> > will all suit for a list (e.g union or merge?). Of course, we can add
>> > an "order by time" to all SQL instances, but will it be suitable?
>> >
>> > 2) As radu said, I also think inner query is essential for a query
>> > language. (I didn't see any select from (select) in the document). The
>> > problem is, the SQL is based on a closure theory while we can not
>> > prove that for a stream. Can the result from a stream operation be
>> another input?
>> > It depends. The window operator will convert "point of time" events to
>> > "period of time" events and I don't know if the nature of data have
>> > changed. Also, the partial emission will lead to heterogeneous results.
>> >
>> > BTW, the "Emission of dynamic tables" section seem to be a little
>> > incompatible with the whole document...
>> >
>> > Best,
>> > Xingcan
>> >
>> > On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran
>> > <radu.tudo...@huawei.com>
>> > wrote:
>> >
>> > > Hi Shaoxuan,
>> > >
>> > > Thanks for the feedback!
>> > > Regarding the proposal for relational queries that you referenced, I
>> > > am a bit confused with respect to its purpose and evolution with
>> > > respect to
>> > the
>> > > current implementation of stream sql - is it suppose to replace this
>> > > implementation, to complement it....but I will send another email
>> > > about this as I guess this can be a standalone discussion tread
>> > >
>> > > Also, regarding the join stream-to-stream I intend to start another
>> > > discussion about this such that we can decide all together if we can
>> > start
>> > > some implementation/design now or we need to wait.
>> > >
>> > > Now, regarding the inner queries and the points you raised. It is
>> > > true that in general an inner join would work like any other join
>> > > (which obviously requires some buffering capabilities and mechanisms
>> > > to restrict the infinite growth for the join state composition).
>> > > However, at least
>> > for
>> > > some cases of supporting inner queries we can support them without
>> > > the
>> > need
>> > > for buffering mechanisms or full support for inner join / left join.
>> > > Basically the logical operator in which an inner query is translated
>> > (left
>> > > join with an always true condition is to some extend more similar to
>> > UNION
>> > > ,- and the union implementation, then the implementation we will
>> > > have for the joins). This is why I believe we can already provide
>> > > the support for this (I also tested a PoC implementation internally
>> > > for this and it
>> > works).
>> > > In terms of examples when we could use this, please see the next 2
>> > > examples. Please let me know what do you think and whether it is
>> > > worth designing the jira issue perhaps with some more details
>> > > (including the technical details).
>> > >
>> > > Consider the example below:
>> > >
>> > > SELECT STREAM user
>> > >         FROM inputstream
>> > >         WHERE amount > (SELECT STREAM Min(amount2) FROM
>> > > inputstream2)
>> > >
>> > > The point of this is to restrict the values you are selecting based
>> > > on some value that you have from the other stream. Consider the
>> > > values below that come in each stream
>> > >
>> > > Inputstream             inputstream2            Result
>> > > User1,100                                       user1 (because there
>> is
>> > no
>> > > value in inputstream2 and the left join should not restrict the
>> > > output in this case)
>> > >                         X,x,10                  nothing as there is no
>> > > event in inputstream to be outputted. Min will become from now 10
>> > > User2, 20                                       user2 (because 20 is
>> > > greater than 10 which is the minimum retain in inputstream2)
>> > >                         X,x,20                  nothing as there is no
>> > > event in inputstream to be outputted. Min will remain from now 10
>> > >                         X,x, 5                  nothing as there is no
>> > > event in inputstream to be outputted. Min will become from now 5
>> > > User3, 8                                        User3 (because 8 is
>> > > greater than 5)
>> > > ....
>> > >
>> > >
>> > > The goal for the final usage of this is to be able among others to
>> > > define multiple window processing on the same input stream. Consider:
>> > >
>> > > SELECT STREAM user
>> > >         FROM inputstream
>> > >         WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY
>> > > timestamp RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM
>> > > inputstream) < amount
>> > >
>> > >
>> > > Assume you have the following events each coming every 30 minutes
>> > > User1, 100   -> Average is 100 and the output of the topology that
>> > > implements the query is NULL (no output as 100 is not > than 100)
>> > > User2, 10    -> Average is 55 and the output of the topology that
>> > > implements the query is NULL (no output as 10 is not > than 55)
>> > > User3, 40    -> Average is 25 (10+40) and the output of the topology
>> that
>> > > implements the query is User3 (40 is > than 25) ....
>> > > Although the query as it is depends on aggregates and windows, the
>> > > operator to implement the inner query can be implemented
>> > > independently of functions that are contained in the query. Also,
>> > > there is no need for a window or buffering to implement the logic
>> > > for assembling the results
>> > from
>> > > the inner query.
>> > >
>> > >
>> > > Best regards,
>> > >
>> > > -----Original Message-----
>> > > From: Shaoxuan Wang [mailto:wshaox...@gmail.com]
>> > > Sent: Thursday, January 26, 2017 4:36 AM
>> > > To: dev@flink.apache.org
>> > > Subject: Re: STREAM SQL inner queries
>> > >
>> > >  Hi Radu,
>> > > Similar as the stream-stream join, this stream-stream inner query
>> > > does
>> > not
>> > > seem to be well defined. It needs provide at least some kind of
>> > > window bounds to complete the streaming SQL semantics. If this is an
>> > > unbounded join/select, a mechanism of how to store the infinite date
>> > > has to be considered. I may not fully understand your proposal.
>> > > Could you please provide more details about this inner query, say
>> > > giving some examples of input and output. It would be also great if
>> > > you can explain the use case
>> > of
>> > > this inner query. This helps us to understand the semantics.
>> > >
>> > > It should also be noted that, we have recently decided to unify
>> > > stream
>> > and
>> > > batch query with the same regular (batch) SQL. Therefore we have
>> > > removed the support for STREAM keyword in flink Streaming SQL. In
>> > > the past
>> > several
>> > > months, Fabian and Xiaowei Jiang have started to work on the future
>> > > Relational Queries on flink streaming. Fabian has drafted a very
>> > > good design doc, https://goo.gl/m31kkE. The design is based on a new
>> > > concept of dynamic table whose content changes over time, thereby
>> > > can be derived from streams. With this dynamic table, stream query
>> > > can be done via
>> > regular
>> > > (batch) SQL. Besides some syntax sugar, there is not too much
>> > > difference between batch query and stream query (in terms of what
>> > > and where of a
>> > query
>> > > is executed). Stream query has addition characters in the manners of
>> > > when to emit a result and how to refine the result considering the
>> retraction.
>> > >
>> > > Hope this helps and look forward to working with you on streaming SQL.
>> > >
>> > > Regards,
>> > > Shaoxuan
>> > >
>> > >
>> > > On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran
>> > > <radu.tudo...@huawei.com>
>> > > wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I would like to open a jira issue (and then provide the
>> > > > implementation) for supporting inner queries. The idea is to be
>> > > > able to support SQL queries as the ones presented in the scenarios
>> below.
>> > > > The key idea is that supporting inner queries would require to
>> > > > have the
>> > > implementation for:
>> > > >
>> > > > è JOIN (type = left and condition = true) - Basically this is a
>> > > > simple implementation for a join function between 2 streams that
>> > > > does not require any window support behind the scenes as there is
>> > > > no condition on which to perform the join
>> > > >
>> > > > è SINGLE_VALUE - this operator would require to provide one value
>> > > > to be furthered joined. In the context of streaming this value
>> > > > should basically evolve with the contents of the window. This
>> > > > could be implemented with a flatmap function as left joins would
>> > > > allow also to do the mapping with null values
>> > > >
>> > > > We can then extend this initial and simple implementation to
>> > > > provide support for joins in general (conditional joins, right
>> > > > joins..) or we can isolate this implementation for this specific
>> > > > case of inner queries and go with a totally new design for stream
>> > > > to stream joins (might be needed depending on what is the decision
>> > > > behind on how to support the conditional
>> > > > mapping)
>> > > >
>> > > > What do you think about this?
>> > > >
>> > > > Examples of scenarios to apply
>> > > >
>> > > > SELECT STREAM amount,
>> > > > (SELECT id FROM  inputstream1) AS field1 FROM inputstream2
>> > > >
>> > > > Translated to
>> > > > LogicalProject(amount=[$1], c=[$4])
>> > > >     LogicalJoin(condition=[true], joinType=[left])
>> > > >       LogicalTableScan(table=[[inputstream1]])
>> > > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
>> > > >         LogicalProject(user_id=[$0])
>> > > >           LogicalTableScan(table=[[inputstream2]])
>> > > >
>> > > > Or from the same stream - perhaps interesting for applying some
>> > > > more complex operations within the inner query SELECT STREAM
>> > > > amount, (SELECT id FROM  inputstream1) AS field1 FROM inputstream1
>> > > >
>> > > > Translated to
>> > > > LogicalProject(amount=[$1], c=[$4])
>> > > >     LogicalJoin(condition=[true], joinType=[left])
>> > > >       LogicalTableScan(table=[[inputstream1]])
>> > > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
>> > > >         LogicalProject(user_id=[$0])
>> > > >           LogicalTableScan(table=[[inputstream1]])
>> > > >
>> > > > Or used to do the projection
>> > > > SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM inputstream1)
>> > > >
>> > > > Translated to
>> > > >   LogicalProject(amount=[$1], c=[$5])
>> > > >     LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4],
>> > c=[$5])
>> > > >       LogicalTableScan(table=[[inputstream1]])
>> > > >
>> > > >
>> > > > Or in the future even
>> > > > SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount)
>> > > > OVER window AS myagg FROM inputstream1)) ...
>> > > >
>> > > >
>> > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to