[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xuannan Su updated FLINK-28988: ------------------------------- Description: The following code can reproduce the case {code:java} public class TemporalJoinSQLExample1 { public static void main(String[] args) throws Exception { // set up the Java DataStream API final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStreamSource<Tuple3<Integer, String, Instant>> ds = env.fromElements( new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); final Table table = tableEnv.fromDataStream( ds, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f2", "f2 - INTERVAL '2' SECONDS") .build()) .as("id", "state", "ts"); tableEnv.createTemporaryView("source_table", table); final Table dedupeTable = tableEnv.sqlQuery( "SELECT * FROM (" + " SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS row_num FROM source_table" + ") WHERE row_num = 1"); tableEnv.createTemporaryView("versioned_table", dedupeTable); DataStreamSource<Tuple2<Integer, Instant>> event = env.fromElements( new Tuple2<>(0, Instant.ofEpochMilli(0)), new Tuple2<>(0, Instant.ofEpochMilli(5)), new Tuple2<>(0, Instant.ofEpochMilli(10)), new Tuple2<>(0, Instant.ofEpochMilli(15)), new Tuple2<>(0, Instant.ofEpochMilli(20)), new Tuple2<>(0, Instant.ofEpochMilli(25))); final Table eventTable = tableEnv.fromDataStream( event, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f1", "f1 - INTERVAL '2' SECONDS") .build()) .as("id", "ts"); tableEnv.createTemporaryView("event_table", eventTable); final Table result = tableEnv.sqlQuery( "SELECT * FROM event_table" + " LEFT JOIN versioned_table FOR SYSTEM_TIME AS OF event_table.ts" + " ON event_table.id = versioned_table.id"); result.execute().print(); result.filter($("state").isEqual("online")).execute().print(); } } {code} The result of temporal join is the following: |op| id| ts| id0| state| ts0| row_num| |+I| 0|1970-01-01 08:00:00.000| 0| online|1970-01-01 08:00:00.000| 1| |+I| 0|1970-01-01 08:00:00.005| 0| online|1970-01-01 08:00:00.000| 1| |+I| 0|1970-01-01 08:00:00.010| 0| offline|1970-01-01 08:00:00.010| 1| |+I| 0|1970-01-01 08:00:00.015| 0| offline|1970-01-01 08:00:00.010| 1| |+I| 0|1970-01-01 08:00:00.020| 0| online|1970-01-01 08:00:00.020| 1| |+I| 0|1970-01-01 08:00:00.025| 0| online|1970-01-01 08:00:00.020| 1| After filtering with predicate state = 'online', I expect only the two rows with state offline will be filtered out. But I got the following result: |op| id| ts| id0| state| ts0| row_num| |+I| 0|1970-01-01 08:00:00.020| 0| online|1970-01-01 08:00:00.020| 1| |+I| 0|1970-01-01 08:00:00.025| 0| online|1970-01-01 08:00:00.020| 1| was: The following code can reproduce the case {code:java} public class TemporalJoinSQLExample1 { public static void main(String[] args) throws Exception { // set up the Java DataStream API final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStreamSource<Tuple3<Integer, String, Instant>> ds = env.fromElements( new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); final Table table = tableEnv.fromDataStream( ds, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f2", "f2 - INTERVAL '2' SECONDS") .build()) .as("id", "state", "ts"); tableEnv.createTemporaryView("source_table", table); final Table dedupeTable = tableEnv.sqlQuery( "SELECT * FROM (" + " SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS row_num FROM source_table" + ") WHERE row_num = 1"); tableEnv.createTemporaryView("versioned_table", dedupeTable); DataStreamSource<Tuple2<Integer, Instant>> event = env.fromElements( new Tuple2<>(0, Instant.ofEpochMilli(0)), new Tuple2<>(0, Instant.ofEpochMilli(5)), new Tuple2<>(0, Instant.ofEpochMilli(10)), new Tuple2<>(0, Instant.ofEpochMilli(15)), new Tuple2<>(0, Instant.ofEpochMilli(20)), new Tuple2<>(0, Instant.ofEpochMilli(25))); final Table eventTable = tableEnv.fromDataStream( event, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f1", "f1 - INTERVAL '2' SECONDS") .build()) .as("id", "ts"); tableEnv.createTemporaryView("event_table", eventTable); final Table result = tableEnv.sqlQuery( "SELECT * FROM event_table" + " LEFT JOIN versioned_table FOR SYSTEM_TIME AS OF event_table.ts" + " ON event_table.id = versioned_table.id"); result.execute().print(); result.filter($("state").isEqual("online")).execute().print(); } } {code} The result of temporal join is the following: +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ | op | id | ts | id0 | state | ts0 | row_num | +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ | +I | 0 | 1970-01-01 08:00:00.000 | 0 | online | 1970-01-01 08:00:00.000 | 1 | | +I | 0 | 1970-01-01 08:00:00.005 | 0 | online | 1970-01-01 08:00:00.000 | 1 | | +I | 0 | 1970-01-01 08:00:00.010 | 0 | offline | 1970-01-01 08:00:00.010 | 1 | | +I | 0 | 1970-01-01 08:00:00.015 | 0 | offline | 1970-01-01 08:00:00.010 | 1 | | +I | 0 | 1970-01-01 08:00:00.020 | 0 | online | 1970-01-01 08:00:00.020 | 1 | | +I | 0 | 1970-01-01 08:00:00.025 | 0 | online | 1970-01-01 08:00:00.020 | 1 | +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ After filtering with predicate state = 'online', I expect only the two rows with state offline will be filtered out. But I got the following result: +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ | op | id | ts | id0 | state | ts0 | row_num | +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ | +I | 0 | 1970-01-01 08:00:00.020 | 0 | online | 1970-01-01 08:00:00.020 | 1 | | +I | 0 | 1970-01-01 08:00:00.025 | 0 | online | 1970-01-01 08:00:00.020 | 1 | +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ > Incorrect result for filter after temporal join > ----------------------------------------------- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.15.1 > Reporter: Xuannan Su > Priority: Major > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource<Tuple3<Integer, String, Instant>> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource<Tuple2<Integer, Instant>> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.005| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.010| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.015| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020| 1| > |+I| 0|1970-01-01 08:00:00.025| 0| > online|1970-01-01 08:00:00.020| 1| > > After filtering with predicate state = 'online', I expect only the two rows > with state offline will be filtered out. But I got the following result: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020| 1| > |+I| 0|1970-01-01 08:00:00.025| 0| > online|1970-01-01 08:00:00.020| 1| > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)