Dove created FLINK-32639: ---------------------------- Summary: Filter and Limit exist at the same time, limit cannot take effect Key: FLINK-32639 URL: https://issues.apache.org/jira/browse/FLINK-32639 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Dove
Define the environment using Flink Batch. The Source connector uses filesystem(FileSystemTableSource implements SupportsLimitPushDown/SupportsFilterPushDown) {code:java} EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings); tabEnv.executeSql( "CREATE TABLE source(uuid varchar, name varchar, age int, ts timestamp,`partition` varchar) " + "WITH ( 'connector' = 'filesystem', 'path'='file:///tmp/file', 'format'='csv' " + ")");{code} Case 1: Filter {code:java} tabEnv.executeSql("explain select * from source where name is null").print(); == Optimized Execution Plan == Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, partition], where=[name IS NULL]) +- TableSourceScan(table=[[default_catalog, default_database, source, filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]){code} Case 2: Limit {code:java} tabEnv.executeSql("explain select * from source limit 10").print(); == Optimized Execution Plan == Limit(offset=[0], fetch=[10], global=[true]) +- Exchange(distribution=[single]) +- Limit(offset=[0], fetch=[10], global=[false]) +- TableSourceScan(table=[[default_catalog, default_database, source, limit=[10]]], fields=[uuid, name, age, ts, partition]) {code} Case 3: Filter + Limit {code:java} tabEnv.executeSql("explain select * from source where name is null limit 10").print(); == Optimized Execution Plan == Limit(offset=[0], fetch=[10], global=[true]) +- Exchange(distribution=[single]) +- Limit(offset=[0], fetch=[10], global=[false]) +- Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, partition], where=[name IS NULL]) +- TableSourceScan(table=[[default_catalog, default_database, source, filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]) {code} When the Filter condition is in effect, Limit does not appear to be able to be pushed down to Source. -- This message was sent by Atlassian Jira (v8.20.10#820010)