Hi,
I am going a Flink lookup join with view based on a Apache Flink Kafka table  
and a GetInData Flink HTTP connector table. The lookup is successful for many 
types of lookup keys. It is not successful for arrays of primitives or booleans 
when a value it specified in a view.

For example



CREATE TEMPORARY VIEW orders_view AS

SELECT

    *,

    CAST(ARRAY['red','green'] AS ARRAY<STRING>) AS `stringArray`,

  PROCTIME()                     AS `proc_time`

FROM

    orders;





And a table for the http connector

CREATE TEMPORARY TABLE api_table_array

  (

        customerId           STRING,

        str1           STRING,

        int1 INTEGER, arr1  array<string>

Then I issue a lookup join:


SELECT *  FROM orders_view AS o  JOIN api_table_array  FOR SYSTEM_TIME AS OF 
o.proc_time as a   ON o.const_requestBody_stringArray = a.arr1;

This fails with
org.apache.flink.table.api.TableException: Temporal table join requires an 
equality condition on fields of table 
[default_catalog.default_database.api_table_array].

The FlinkFilterJoin rule issues issue a joinRel.analyzeCondition() [1] ,  this 
ends up in Calcite code sets the joinInfo to have no leftKeys or rightKeys and 
has a nonEquiConditions of:

=(CAST(ARRAY(_UTF-16LE'red':VARCHAR(5) CHARACTER SET "UTF-16LE", 
_UTF-16LE'green':VARCHAR(5) CHARACTER SET "UTF-16LE")):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL, $9)

It looks like because this code has identified a non equality condition, the 
Flink lookup join cannot proceed as it needs an equals condition.

If I define the array as a column in a file or a Kafka table the join works. It 
is only when I define the array literal in the view it fails. It also fails for 
Boolean. All the other primitive types (Sting , int etc) work.

I am trying to find the code that causes this and why, as all the other types 
work. We are hoping to make this work for Booleans and arrays. I am happy to 
contribute a change with a pointer as to what might need changing.

I am recreating this with Flink 1.20.1 which is using Calcite 1.32. I see that 
the non equality condition is set [2] , breakpointing on this line I see the 
condition object has:

op=”=”
operands 0 is
CAST(ARRAY(_UTF-16LE'red':VARCHAR(5) CHARACTER SET "UTF-16LE", 
_UTF-16LE'green':VARCHAR(5) CHARACTER SET "UTF-16LE")):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL
Operands 1 ls
$9

In goes through this code a few times, in the failing case the debugger does 
not resolve op0 or op1, and ends up at [2].

Any thoughts on why this is happening and how to resolve this would be greatly 
appreciated. I am happy to code a resolution but would need pointers,

Kind regards, David.

[1] 
https://github.com/apache/flink/blob/8616d6d811e73979328607db03028ae0220d8491/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java#L349

[2]
https://github.com/apache/calcite/blob/597b1fd54fe5b8586525aed2bc4518ca54a25523/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java#L1709






Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN

Reply via email to