radu created FLINK-6077: --------------------------- Summary: Support In/Exists/Except/Any /Some/All for Stream SQL Key: FLINK-6077 URL: https://issues.apache.org/jira/browse/FLINK-6077 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: radu
Time target: Proc Time SQL targeted query examples: ---------------------------- With inner query Q1. ```SELECT client FROM stream1 WHERE id IN ((SELECT id FROM stream2 GROUP BY FLOOR(proctime TO HOUR), WHERE salary> 4500 ))``` Comment: A concrete example for this query can be to consider selecting the customers where their country is the list of countries of suppliers (`Select customer FROM customers WHERE Country IN (Select Country FROM suppliers)` ) Comment: This implementation depends on the implementation of the inner query. The structure can be the same as for inner query support with the difference that the LogicalJoin between main query and inner query is conditional. Comment: The inner query needs a bound as otherwise it cannot be decided when to trigger. Comment: If the value is not triggered by the grouping expression then the inner query must based on when that expression changes value. Comments: boundaries should be supported over all options: group by clauses; windows or time expressions (\[floor/ceil\](rowtime/proctime to hour),) With collection Q2. ```SELECT * FROM stream1 WHERE b IN (5000, 7000, 8000, 9000)``` Comment: This can be checked if it is supported by the DataStreamCalc implementation. If not it can be transformed as a sub-JIRA task to extend the DataStreamCalc functionality to implement this conditional behavior. Comment: A similar functionality can be provided if the collection is a table rather than a set of values. With table ```SELECT client FROM stream1 WHERE id IN ((SELECT id FROM table1 where stream1.id = table1.id))``` Comment: This can be a sub-JIRA issue, perhaps within the context of dynamic tables, to support the join with tables and filtering operations based on contents from an external table General comments: **Except** is similar in behavior with IN or EXISTS as it filters out outputs of the main stream based on data from a secondary stream. The implementation will follow exactly the same logic as for IN/Exists by filtering the outputs in the join function between the main stream and the secondary stream. Additionally, we apply the same restrictions for the secondary/inner queries. ```SELECT ID, NAME FROM CUSTOMERS LEFT JOIN ORDERS ON CUSTOMERS.ID = ORDERS.CUSTOMER\_ID EXCEPT SELECT ID, NAME FROM CUSTOMERS RIGHT JOIN ORDERS ON CUSTOMERS.ID = ORDERS.CUSTOMER\_ID GROUP BY FLOOR(procTime TO HOUR);``` Description: ------------ The IN and EXISTS operators are conditional clauses in WHERE clause to check for values in certain collections. The collections based on which the restriction of the values is done can either be static (values, tables, or parts of a stream). This JIRA issue is concerned with the latter case of checking for values over a stream. In order for this operation to work, the stream needs to be bounded such that the result can trigger and the collection can be formed. This points out to using some boundaries or groupings over the sub-query that forms the collection over which IN is applied. This should be supported via 3 options as shown below. Each of these options can be a sub-JIRA issue. 1) Group By clauses that are applied over some monotonic order of the stream based on which ranges are defined. ` [...] GROUP BY prodId` 3) Window clauses to define rolling partitions of the data of the stream, which evolve in time. ` [...] WINDOW HOUR AS (RANGE INTERVAL '10' MINUTE TO SECOND(3) PRECEDING);` Functionality example --------------------- We exemplify below the functionality of the IN/Exists when working with streams. ```SELECT * FROM stream1 WHERE id IN ((SELECT id2 FROM stream2 GROUP BY FLOOR(PROCTIME TO HOUR) WHERE b>10 ))``` Note: The inner query triggers only once an hour. For the next hour the result of the previous hour from the inner query will be the one used to filter the results from the main query as they come. This is consistent also with how the inner queries are translated (see inner queries) ||IngestionTime(Event)||Stream1||Stream 2||Output|| |10:00:01| Id1,10| |nil| |10:02:00| |Id2,2| | |11:25:00| |Id3,15| | |12:3:00| Id2,15| |nil| |12:05:00| Id3,11| |Id3,11| |12:06:00| |Id2,30| | |12:07:00| |Id3,2| | |12:09:00| Id2.17| |nil| |12:10:00| Id3,20| |Id3,20| |...| Implementation option --------------------- Considering that the query only makes sense in the context of 1) window boundaries and 2) over sub-queries that extract collections of data, the main design of this is based on inner query implementation with the following modifications. (As a recap the Inner query is implemented with a special Join \[left type with always true condition\] between the main stream and the output of the inner query which is passed through a single value selection aggregation): 1) The condition of outputting a result by the LogicalJoin is not always true as before. Instead the condition is done within the window function by checking that the input from main stream is within the collection from the inner query. 2) The check is done specifically based on the type of function used (IN, ANY, SOMEā¦.). The logic of each such function would need to have a direct implementation. 3) The filter on the inner query to keep a single value is removed and instead a collection is passed for evaluation in the join. 4) The boundaries of the SQL query are to be used as the boundaries to define the join window in which the verification is done. 5) Type of the join behavior is of the INNER JOIN from condition point of view (value is emitted only if exists on the other side). [See attached document for schema] General logic of Join --------------------- leftDataStream.join(rightDataStream).where(new ConstantConditionSelector()) .equalTo(new ConstantConditionSelector()) .window(\[TIME/COUNT\]\[TUMBLE/SLIDE\]window.create()) > //.trigger(new DefaultTrigger()) > > //.evictor(new DefaultEvictor()) .apply(FlatJoinFunctionWithInSelection()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)