Hi Radu,

I was a bit surprised, that Calcite's parser accepted your query.
Hence, I check the Calcite plan and and had look at the documentation of
Calcite's SqlSingleValueAggFunction:

> SINGLE_VALUE aggregate function returns the input value if there is only
one value in the input; Otherwise it triggers a run-time error.

So, the query would only pass if the inner query returns a single value
(which would usually not be the case of a stream).
The SINGLE_VALUE check is not added to the plan if the inner query is
guaranteed to return a single value (LIMIT 1, global aggregation).

Anyway, I agree that we could start to add the simple cases of these joins
for processing time.
For event-time, I think we need to consider late arriving data and support
retraction values.

Best, Fabian

2017-01-27 10:43 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi all,
>
> Thanks for the feedback!
>
> I agree that we should get the semantics right and after we can implement
> it. I think it would be quite useful. Now, regarding the remarks you made:
>
>
> "> SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM
> inputstream2
> you are suggesting that the subquery (SELECT id FROM  inputstream1)
> returns a single value (I assume the last received value). I would
> interpret the query differently and expect it to return the values of all
> rows of the inputstream1 up to the current point in time.
> "
>
> It is a good point. It is not so much that I wanted to suggest that this
> should be the syntax to use - I just relied basically on the logical
> operators that calcite has parsed the query into (JOIN + SINGLE VALUE).
> Based on this logical translation I would say the correct implementation
> for this translation is to return one value not necessarily the whole
> content of the stream. Anyway, we are not restricted to this as we could
> potentially use different rules in calcite to alter the resulting plan.
>
> However, if we decide that such queries should return the whole stream
> rather than a single value - we are indeed tapping in the problem of
> potentially unbounded cases. For this I do agree that the approach you
> proposed to rely on dynamic tables is very good. In such a case we would
> just pass to the upper operators the entire content of the dynamic table.
> For that matter it works also for the single value (as the table would
> contain only one value). However, for the simple case of returning a single
> value we can provide even now an implementation and we do not need to wait
> until the full functionality of dynamic tables is provided.
>
> In the same time I also agree that the syntax  " a FROM inputstream ORDER
> BY time LIMIT 1" is elegant. I have not issue to consider the case of inner
> queries to be translated like this only when they would have the "Limit 1"
> specified or directly only when they are provided in such a form.
>
> I will wait for additional remarks in order to all agree on a specific
> semantic and then I will push this in a jira issue to be furthered review
> and validated.
>
> Best regards,
>
>
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Thursday, January 26, 2017 4:51 PM
> To: dev@flink.apache.org
> Subject: Re: STREAM SQL inner queries
>
> Hi everybody,
>
> thanks for the proposal Radu.
> If I understood it correctly, you are proposing a left join between a
> stream and a single value (which is compute from a stream).
> This makes sense and should be a common use case.
>
> However, I think some of your example queries do not return a single value
> as required for the join.
>
> In your example:
> > SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM
> inputstream2
>
> you are suggesting that the subquery (SELECT id FROM  inputstream1)
> returns a single value (I assume the last received value).
> I would interpret the query differently and expect it to return the values
> of all rows of the inputstream1 up to the current point in time.
> IMO, a query like "SELECT a FROM inputstream ORDER BY time LIMIT 1" would
> capture the semantics better.
>
> The subquery
> > (SELECT AVERAGE(amount) OVER (ORDER BY timestamp RANGE INTERVAL 1 HOUR
> PRECEDING) AS hour_sum FROM inputstream)
>
> has a similar problem and would return one row for each record of
> inputstream, i.e., not a single value.
>
> Anyway, if we get the semantics of the query that computes the single
> value right, I think this type of join should be well covered by the
> dynamic table proposal.
> The single value input will be a dynamic table (of constant size = 1)
> which is continuously updated by the engine.
> Joining this table to to a dynamic (append) table will result in a
> continuously growing dynamic table, which can be emitted as a stream.
>
> This would look very similar as you proposed but we would need to make
> sure that the single value query actually returns a single value.
>
> @Xingcan Thanks for your feedback.
> I would suggest to move the general discussion about the dynamic table
> proposal to the thread that Radu started (I responded there a few minutes
> ago).
>
> Just a few comments here: By logically converting a stream into a dynamic
> table we have well defined semantics for the operation such as aggregations
> and joins.
> However, you are right, that this does not mean that we can efficiently
> apply all operations on dynamic tables that we can apply on an actual batch
> table. Some operations are just too expensive or require too much state to
> be performed in a streaming fashion. So yes, there will be some
> restrictions but that is rather to the nature of stream processing than to
> the idea of dynamic tables, IMO.
>
> Best,
> Fabian
>
>
> 2017-01-26 11:33 GMT+01:00 Xingcan <xingc...@gmail.com>:
>
> > Hi all,
> >
> > I've read the document about dynamic table. Honestly, I think it's
> > well-defined and ingeniously compromise the batch and stream. There
> > are two questions about the design.
> >
> > 1) Though it's fine to take the stream as a snapshot of a dynamic
> > table, a table is essentially a set while a stream is essentially an
> > ordered list (with xxTime). I'm not sure if the operations on a set
> > will all suit for a list (e.g union or merge?). Of course, we can add
> > an "order by time" to all SQL instances, but will it be suitable?
> >
> > 2) As radu said, I also think inner query is essential for a query
> > language. (I didn't see any select from (select) in the document). The
> > problem is, the SQL is based on a closure theory while we can not
> > prove that for a stream. Can the result from a stream operation be
> another input?
> > It depends. The window operator will convert "point of time" events to
> > "period of time" events and I don't know if the nature of data have
> > changed. Also, the partial emission will lead to heterogeneous results.
> >
> > BTW, the "Emission of dynamic tables" section seem to be a little
> > incompatible with the whole document...
> >
> > Best,
> > Xingcan
> >
> > On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran
> > <radu.tudo...@huawei.com>
> > wrote:
> >
> > > Hi Shaoxuan,
> > >
> > > Thanks for the feedback!
> > > Regarding the proposal for relational queries that you referenced, I
> > > am a bit confused with respect to its purpose and evolution with
> > > respect to
> > the
> > > current implementation of stream sql - is it suppose to replace this
> > > implementation, to complement it....but I will send another email
> > > about this as I guess this can be a standalone discussion tread
> > >
> > > Also, regarding the join stream-to-stream I intend to start another
> > > discussion about this such that we can decide all together if we can
> > start
> > > some implementation/design now or we need to wait.
> > >
> > > Now, regarding the inner queries and the points you raised. It is
> > > true that in general an inner join would work like any other join
> > > (which obviously requires some buffering capabilities and mechanisms
> > > to restrict the infinite growth for the join state composition).
> > > However, at least
> > for
> > > some cases of supporting inner queries we can support them without
> > > the
> > need
> > > for buffering mechanisms or full support for inner join / left join.
> > > Basically the logical operator in which an inner query is translated
> > (left
> > > join with an always true condition is to some extend more similar to
> > UNION
> > > ,- and the union implementation, then the implementation we will
> > > have for the joins). This is why I believe we can already provide
> > > the support for this (I also tested a PoC implementation internally
> > > for this and it
> > works).
> > > In terms of examples when we could use this, please see the next 2
> > > examples. Please let me know what do you think and whether it is
> > > worth designing the jira issue perhaps with some more details
> > > (including the technical details).
> > >
> > > Consider the example below:
> > >
> > > SELECT STREAM user
> > >         FROM inputstream
> > >         WHERE amount > (SELECT STREAM Min(amount2) FROM
> > > inputstream2)
> > >
> > > The point of this is to restrict the values you are selecting based
> > > on some value that you have from the other stream. Consider the
> > > values below that come in each stream
> > >
> > > Inputstream             inputstream2            Result
> > > User1,100                                       user1 (because there is
> > no
> > > value in inputstream2 and the left join should not restrict the
> > > output in this case)
> > >                         X,x,10                  nothing as there is no
> > > event in inputstream to be outputted. Min will become from now 10
> > > User2, 20                                       user2 (because 20 is
> > > greater than 10 which is the minimum retain in inputstream2)
> > >                         X,x,20                  nothing as there is no
> > > event in inputstream to be outputted. Min will remain from now 10
> > >                         X,x, 5                  nothing as there is no
> > > event in inputstream to be outputted. Min will become from now 5
> > > User3, 8                                        User3 (because 8 is
> > > greater than 5)
> > > ....
> > >
> > >
> > > The goal for the final usage of this is to be able among others to
> > > define multiple window processing on the same input stream. Consider:
> > >
> > > SELECT STREAM user
> > >         FROM inputstream
> > >         WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY
> > > timestamp RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM
> > > inputstream) < amount
> > >
> > >
> > > Assume you have the following events each coming every 30 minutes
> > > User1, 100   -> Average is 100 and the output of the topology that
> > > implements the query is NULL (no output as 100 is not > than 100)
> > > User2, 10    -> Average is 55 and the output of the topology that
> > > implements the query is NULL (no output as 10 is not > than 55)
> > > User3, 40    -> Average is 25 (10+40) and the output of the topology
> that
> > > implements the query is User3 (40 is > than 25) ....
> > > Although the query as it is depends on aggregates and windows, the
> > > operator to implement the inner query can be implemented
> > > independently of functions that are contained in the query. Also,
> > > there is no need for a window or buffering to implement the logic
> > > for assembling the results
> > from
> > > the inner query.
> > >
> > >
> > > Best regards,
> > >
> > > -----Original Message-----
> > > From: Shaoxuan Wang [mailto:wshaox...@gmail.com]
> > > Sent: Thursday, January 26, 2017 4:36 AM
> > > To: dev@flink.apache.org
> > > Subject: Re: STREAM SQL inner queries
> > >
> > >  Hi Radu,
> > > Similar as the stream-stream join, this stream-stream inner query
> > > does
> > not
> > > seem to be well defined. It needs provide at least some kind of
> > > window bounds to complete the streaming SQL semantics. If this is an
> > > unbounded join/select, a mechanism of how to store the infinite date
> > > has to be considered. I may not fully understand your proposal.
> > > Could you please provide more details about this inner query, say
> > > giving some examples of input and output. It would be also great if
> > > you can explain the use case
> > of
> > > this inner query. This helps us to understand the semantics.
> > >
> > > It should also be noted that, we have recently decided to unify
> > > stream
> > and
> > > batch query with the same regular (batch) SQL. Therefore we have
> > > removed the support for STREAM keyword in flink Streaming SQL. In
> > > the past
> > several
> > > months, Fabian and Xiaowei Jiang have started to work on the future
> > > Relational Queries on flink streaming. Fabian has drafted a very
> > > good design doc, https://goo.gl/m31kkE. The design is based on a new
> > > concept of dynamic table whose content changes over time, thereby
> > > can be derived from streams. With this dynamic table, stream query
> > > can be done via
> > regular
> > > (batch) SQL. Besides some syntax sugar, there is not too much
> > > difference between batch query and stream query (in terms of what
> > > and where of a
> > query
> > > is executed). Stream query has addition characters in the manners of
> > > when to emit a result and how to refine the result considering the
> retraction.
> > >
> > > Hope this helps and look forward to working with you on streaming SQL.
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > >
> > > On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran
> > > <radu.tudo...@huawei.com>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to open a jira issue (and then provide the
> > > > implementation) for supporting inner queries. The idea is to be
> > > > able to support SQL queries as the ones presented in the scenarios
> below.
> > > > The key idea is that supporting inner queries would require to
> > > > have the
> > > implementation for:
> > > >
> > > > è JOIN (type = left and condition = true) - Basically this is a
> > > > simple implementation for a join function between 2 streams that
> > > > does not require any window support behind the scenes as there is
> > > > no condition on which to perform the join
> > > >
> > > > è SINGLE_VALUE - this operator would require to provide one value
> > > > to be furthered joined. In the context of streaming this value
> > > > should basically evolve with the contents of the window. This
> > > > could be implemented with a flatmap function as left joins would
> > > > allow also to do the mapping with null values
> > > >
> > > > We can then extend this initial and simple implementation to
> > > > provide support for joins in general (conditional joins, right
> > > > joins..) or we can isolate this implementation for this specific
> > > > case of inner queries and go with a totally new design for stream
> > > > to stream joins (might be needed depending on what is the decision
> > > > behind on how to support the conditional
> > > > mapping)
> > > >
> > > > What do you think about this?
> > > >
> > > > Examples of scenarios to apply
> > > >
> > > > SELECT STREAM amount,
> > > > (SELECT id FROM  inputstream1) AS field1 FROM inputstream2
> > > >
> > > > Translated to
> > > > LogicalProject(amount=[$1], c=[$4])
> > > >     LogicalJoin(condition=[true], joinType=[left])
> > > >       LogicalTableScan(table=[[inputstream1]])
> > > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> > > >         LogicalProject(user_id=[$0])
> > > >           LogicalTableScan(table=[[inputstream2]])
> > > >
> > > > Or from the same stream - perhaps interesting for applying some
> > > > more complex operations within the inner query SELECT STREAM
> > > > amount, (SELECT id FROM  inputstream1) AS field1 FROM inputstream1
> > > >
> > > > Translated to
> > > > LogicalProject(amount=[$1], c=[$4])
> > > >     LogicalJoin(condition=[true], joinType=[left])
> > > >       LogicalTableScan(table=[[inputstream1]])
> > > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> > > >         LogicalProject(user_id=[$0])
> > > >           LogicalTableScan(table=[[inputstream1]])
> > > >
> > > > Or used to do the projection
> > > > SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM inputstream1)
> > > >
> > > > Translated to
> > > >   LogicalProject(amount=[$1], c=[$5])
> > > >     LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4],
> > c=[$5])
> > > >       LogicalTableScan(table=[[inputstream1]])
> > > >
> > > >
> > > > Or in the future even
> > > > SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount)
> > > > OVER window AS myagg FROM inputstream1)) ...
> > > >
> > > >
> > > >
> > > >
> > >
> >
>

Reply via email to