Hi all, I would like to open a jira issue (and then provide the implementation) for supporting inner queries. The idea is to be able to support SQL queries as the ones presented in the scenarios below. The key idea is that supporting inner queries would require to have the implementation for:
è JOIN (type = left and condition = true) - Basically this is a simple implementation for a join function between 2 streams that does not require any window support behind the scenes as there is no condition on which to perform the join è SINGLE_VALUE - this operator would require to provide one value to be furthered joined. In the context of streaming this value should basically evolve with the contents of the window. This could be implemented with a flatmap function as left joins would allow also to do the mapping with null values We can then extend this initial and simple implementation to provide support for joins in general (conditional joins, right joins..) or we can isolate this implementation for this specific case of inner queries and go with a totally new design for stream to stream joins (might be needed depending on what is the decision behind on how to support the conditional mapping) What do you think about this? Examples of scenarios to apply SELECT STREAM amount, (SELECT id FROM inputstream1) AS field1 FROM inputstream2 Translated to LogicalProject(amount=[$1], c=[$4]) LogicalJoin(condition=[true], joinType=[left]) LogicalTableScan(table=[[inputstream1]]) LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) LogicalProject(user_id=[$0]) LogicalTableScan(table=[[inputstream2]]) Or from the same stream - perhaps interesting for applying some more complex operations within the inner query SELECT STREAM amount, (SELECT id FROM inputstream1) AS field1 FROM inputstream1 Translated to LogicalProject(amount=[$1], c=[$4]) LogicalJoin(condition=[true], joinType=[left]) LogicalTableScan(table=[[inputstream1]]) LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) LogicalProject(user_id=[$0]) LogicalTableScan(table=[[inputstream1]]) Or used to do the projection SELECT STREAM amount, c FROM (SELECT *,id AS c FROM inputstream1) Translated to LogicalProject(amount=[$1], c=[$5]) LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], c=[$5]) LogicalTableScan(table=[[inputstream1]]) Or in the future even SELECT STREAM amount, myagg FROM (SELECT STREAM *, SUM(amount) OVER window AS myagg FROM inputstream1)) ...