[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xingbo Huang reassigned FLINK-28988: ------------------------------------ Assignee: Shuiqiang Chen (was: chenshuiqiang) > Incorrect result for filter after temporal join > ----------------------------------------------- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.15.1 > Reporter: Xuannan Su > Assignee: Shuiqiang Chen > Priority: Major > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource<Tuple3<Integer, String, Instant>> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource<Tuple2<Integer, Instant>> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.005| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.010| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.015| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020| 1| > |+I| 0|1970-01-01 08:00:00.025| 0| > online|1970-01-01 08:00:00.020| 1| > > After filtering with predicate state = 'online', I expect only the two rows > with state offline will be filtered out. But I got the following result: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020| 1| > |+I| 0|1970-01-01 08:00:00.025| 0| > online|1970-01-01 08:00:00.020| 1| > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)