Hi, I am going a Flink lookup join with view based on a Apache Flink Kafka table and a GetInData Flink HTTP connector table. The lookup is successful for many types of lookup keys. It is not successful for arrays of primitives or booleans when a value it specified in a view.
For example CREATE TEMPORARY VIEW orders_view AS SELECT *, CAST(ARRAY['red','green'] AS ARRAY<STRING>) AS `stringArray`, PROCTIME() AS `proc_time` FROM orders; And a table for the http connector CREATE TEMPORARY TABLE api_table_array ( customerId STRING, str1 STRING, int1 INTEGER, arr1 array<string> Then I issue a lookup join: SELECT * FROM orders_view AS o JOIN api_table_array FOR SYSTEM_TIME AS OF o.proc_time as a ON o.const_requestBody_stringArray = a.arr1; This fails with org.apache.flink.table.api.TableException: Temporal table join requires an equality condition on fields of table [default_catalog.default_database.api_table_array]. The FlinkFilterJoin rule issues issue a joinRel.analyzeCondition() [1] , this ends up in Calcite code sets the joinInfo to have no leftKeys or rightKeys and has a nonEquiConditions of: =(CAST(ARRAY(_UTF-16LE'red':VARCHAR(5) CHARACTER SET "UTF-16LE", _UTF-16LE'green':VARCHAR(5) CHARACTER SET "UTF-16LE")):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL, $9) It looks like because this code has identified a non equality condition, the Flink lookup join cannot proceed as it needs an equals condition. If I define the array as a column in a file or a Kafka table the join works. It is only when I define the array literal in the view it fails. It also fails for Boolean. All the other primitive types (Sting , int etc) work. I am trying to find the code that causes this and why, as all the other types work. We are hoping to make this work for Booleans and arrays. I am happy to contribute a change with a pointer as to what might need changing. I am recreating this with Flink 1.20.1 which is using Calcite 1.32. I see that the non equality condition is set [2] , breakpointing on this line I see the condition object has: op=”=” operands 0 is CAST(ARRAY(_UTF-16LE'red':VARCHAR(5) CHARACTER SET "UTF-16LE", _UTF-16LE'green':VARCHAR(5) CHARACTER SET "UTF-16LE")):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL Operands 1 ls $9 In goes through this code a few times, in the failing case the debugger does not resolve op0 or op1, and ends up at [2]. Any thoughts on why this is happening and how to resolve this would be greatly appreciated. I am happy to code a resolution but would need pointers, Kind regards, David. [1] https://github.com/apache/flink/blob/8616d6d811e73979328607db03028ae0220d8491/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java#L349 [2] https://github.com/apache/calcite/blob/597b1fd54fe5b8586525aed2bc4518ca54a25523/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java#L1709 Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: Building C, IBM Hursley Office, Hursley Park Road, Winchester, Hampshire SO21 2JN