Lilo created FLINK-36930:
----------------------------

             Summary: Lookup join with JDBC connector fails when join condition 
does not include fields from the probe/left table
                 Key: FLINK-36930
                 URL: https://issues.apache.org/jira/browse/FLINK-36930
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.19.1, 1.18.1, 1.17.0
         Environment:  * Flink versions affected: 1.17.0 and later versions 
(Tested on 1.19.1)
 * Flink version where this worked: 1.16.2
 * Connector: JDBC (tested with Apache Derby and MySQL ; likely reproducible 
with other JDBC drivers). Notably, this issue does not affect the Hive 
connector.

            Reporter: Lilo


I've encountered a regression in Flink 1.17.0 (and later versions) related to 
lookup joins when using the JDBC connector as the lookup source. The issue 
arises when the join condition does not include fields from the probe/left 
table. This scenario worked correctly in Flink 1.16.2 but now throws a 
`TableException`.
 

 
The following SQL code demonstrates the problem.
{code:sql}
SET 'execution.target' = 'local';

-- Create the lookup table (using Apache Derby as an example)
DROP TEMPORARY TABLE IF EXISTS lookup_table;
CREATE TEMPORARY TABLE lookup_table (
    id INT,
    sub_id INT,
    v STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:derby:memory:myInMemoryDB;create=true',
    'table-name' = 'lookup_table'
);

-- Create the main table (using datagen connector)
DROP TEMPORARY TABLE IF EXISTS main_table;
CREATE TEMPORARY TABLE main_table (
    v STRING,
    some_id INT,
    some_sub_id INT,
    proctime AS PROCTIME()
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '2',
    'fields.some_id.kind' = 'random',
    'fields.some_id.min' = '1',
    'fields.some_id.max' = '5',
    'fields.some_sub_id.kind' = 'random',
    'fields.some_sub_id.min' = '1',
    'fields.some_sub_id.max' = '3',
    'fields.v.length' = '10'
);

-- This lookup join works correctly (join condition includes a field from 
main_table)
EXPLAIN PLAN FOR
SELECT
    t1.*,
    t2.v AS lookup_value
FROM main_table t1
INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
    ON t2.id = 1
    AND t2.sub_id = t1.some_sub_id;

-- This lookup join FAILS in Flink 1.17.0+ (join condition only uses constants)
EXPLAIN PLAN FOR
SELECT
    t1.*,
    t2.v AS lookup_value
FROM main_table t1
INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
    ON t2.id = 1
    AND t2.sub_id = 1;

-- Attempting to bypass the issue with a view also fails
DROP VIEW IF EXISTS main_table_view;
CREATE TEMPORARY VIEW main_table_view AS SELECT *, 1 AS fake_id FROM main_table;
EXPLAIN PLAN FOR
SELECT
    t1.*,
    t2.v AS lookup_value
FROM main_table_view t1
INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2
    ON t2.id = 1
    AND t2.sub_id = t1.fake_id;
{code}

*Expected Behavior:*

These queries should execute successfully, even when the join condition does 
not involve fields from the main table. This was the behavior in Flink 1.16.2.

*Actual Behavior:*

In Flink 1.17.0 and later, the second query (where the join condition only uses 
constants) throws the following exception (from Flink 1.17.2):

{code:java}
Caused by: org.apache.flink.table.api.TableException: Temporal table join 
requires an equality condition on fields of table 
[default_catalog.default_database.lookup_table].
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validate(CommonExecLookupJoin.java:687)
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:249)
        at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:157)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to