Dove created FLINK-32639:
----------------------------

             Summary: Filter and Limit exist at the same time, limit cannot 
take effect
                 Key: FLINK-32639
                 URL: https://issues.apache.org/jira/browse/FLINK-32639
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Dove


Define the environment using Flink Batch.

The Source connector uses filesystem(FileSystemTableSource implements 
SupportsLimitPushDown/SupportsFilterPushDown)

 
{code:java}
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
tabEnv.executeSql(
        "CREATE TABLE source(uuid varchar, name varchar, age int, ts 
timestamp,`partition` varchar)  "
                + "WITH ( 'connector' = 'filesystem', 
'path'='file:///tmp/file', 'format'='csv' "
                + ")");{code}
Case 1: Filter

 
{code:java}
tabEnv.executeSql("explain select * from source where name is null").print();

== Optimized Execution Plan ==
Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, partition], 
where=[name IS NULL])
+- TableSourceScan(table=[[default_catalog, default_database, source, 
filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]){code}
 

Case 2: Limit 
{code:java}
tabEnv.executeSql("explain select * from source limit 10").print();

== Optimized Execution Plan ==
Limit(offset=[0], fetch=[10], global=[true])
+- Exchange(distribution=[single])
   +- Limit(offset=[0], fetch=[10], global=[false])
      +- TableSourceScan(table=[[default_catalog, default_database, source, 
limit=[10]]], fields=[uuid, name, age, ts, partition]) {code}
Case 3: Filter + Limit
{code:java}
tabEnv.executeSql("explain select * from source where name is null limit 
10").print();

== Optimized Execution Plan ==
Limit(offset=[0], fetch=[10], global=[true])
+- Exchange(distribution=[single])
   +- Limit(offset=[0], fetch=[10], global=[false])
      +- Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, 
partition], where=[name IS NULL])
         +- TableSourceScan(table=[[default_catalog, default_database, source, 
filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]) {code}
When the Filter condition is in effect, Limit does not appear to be able to be 
pushed down to Source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to