Hi Radu,
Similar as the stream-stream join, this stream-stream inner query does not
seem to be well defined. It needs provide at least some kind of window
bounds to complete the streaming SQL semantics. If this is an unbounded
join/select, a mechanism of how to store the infinite date has to be
considered. I may not fully understand your proposal. Could you
please provide more details about this inner query, say giving some
examples of input and output. It would be also great if you can explain the
use case of this inner query. This helps us to understand the semantics.

It should also be noted that, we have recently decided to unify stream and
batch query with the same regular (batch) SQL. Therefore we have removed
the support for STREAM keyword in flink Streaming SQL. In the past several
months, Fabian and Xiaowei Jiang have started to work on the future Relational
Queries on flink streaming. Fabian has drafted a very good design doc,
https://goo.gl/m31kkE. The design is based on a new concept of dynamic
table whose content changes over time, thereby can be derived from
streams. With this dynamic table, stream query can be done via regular
(batch) SQL. Besides some syntax sugar, there is not too much difference
between batch query and stream query (in terms of what and where of a query
is executed). Stream query has addition characters in the manners of when
to emit a result and how to refine the result considering the retraction.

Hope this helps and look forward to working with you on streaming SQL.

Regards,
Shaoxuan


On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran <radu.tudo...@huawei.com>
wrote:

> Hi all,
>
> I would like to open a jira issue (and then provide the implementation)
> for supporting inner queries. The idea is to be able to support SQL queries
> as the ones presented in the scenarios below. The key idea is that
> supporting inner queries would require to have the implementation for:
>
> è JOIN (type = left and condition = true) - Basically this is a simple
> implementation for a join function between 2 streams that does not require
> any window support behind the scenes as there is no condition on which to
> perform the join
>
> è SINGLE_VALUE - this operator would require to provide one value to be
> furthered joined. In the context of streaming this value should basically
> evolve with the contents of the window. This could be implemented with a
> flatmap function as left joins would allow also to do the mapping with null
> values
>
> We can then extend this initial and simple implementation to provide
> support for joins in general (conditional joins, right joins..) or we can
> isolate this implementation for this specific case of inner queries and go
> with a totally new design for stream to stream joins (might be needed
> depending on what is the decision behind on how to support the conditional
> mapping)
>
> What do you think about this?
>
> Examples of scenarios to apply
>
> SELECT STREAM amount,
> (SELECT id FROM  inputstream1) AS field1
> FROM inputstream2
>
> Translated to
> LogicalProject(amount=[$1], c=[$4])
>     LogicalJoin(condition=[true], joinType=[left])
>       LogicalTableScan(table=[[inputstream1]])
>       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
>         LogicalProject(user_id=[$0])
>           LogicalTableScan(table=[[inputstream2]])
>
> Or from the same stream - perhaps interesting for applying some more
> complex operations within the inner query
> SELECT STREAM amount,
> (SELECT id FROM  inputstream1) AS field1
> FROM inputstream1
>
> Translated to
> LogicalProject(amount=[$1], c=[$4])
>     LogicalJoin(condition=[true], joinType=[left])
>       LogicalTableScan(table=[[inputstream1]])
>       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
>         LogicalProject(user_id=[$0])
>           LogicalTableScan(table=[[inputstream1]])
>
> Or used to do the projection
> SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM inputstream1)
>
> Translated to
>   LogicalProject(amount=[$1], c=[$5])
>     LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], c=[$5])
>       LogicalTableScan(table=[[inputstream1]])
>
>
> Or in the future even
> SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount) OVER
> window AS myagg FROM inputstream1))
> ...
>
>
>
>

Reply via email to