[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600135#comment-17600135 ]
Shuiqiang Chen commented on FLINK-28988: ---------------------------------------- TLTD: The filters in aboveFilter should not be pushed down into right table when it is a temporal join. when there's no filter after temporal join, the query is explained as below: {code:xml} == Abstract Syntax Tree == LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')]) : +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL SECOND)]) : +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]]) +- LogicalFilter(condition=[=($cor0.id, $0)]) +- LogicalSnapshot(period=[$cor0.ts]) +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3]) +- LogicalFilter(condition=[=($3, 1)]) +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, _UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER (PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS LAST)]) +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 2000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[*anonymous_datastream_source$1*]]) == Optimized Physical Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL SECOND)]) : +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) == Optimized Execution Plan == Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, row_num]) +- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND __TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), __TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, ts, id0, state, ts0, row_num]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[f0 AS id, f1 AS ts]) : +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL SECOND)]) : +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[f0, f1]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num]) +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) +- Exchange(distribution=[hash[$0]]) +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2]) +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 2000:INTERVAL SECOND)]) +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2]) {code} And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come [ONLY_UPDATE_AFTER]. Therefore, during execution runtime, the rightSortedState in TemporalRowTimeJoinOperator contains the following rows: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, offline, 1970-01-01 08:00:00.010] [+I, 0, online, 1970-01-01 08:00:00.020] So we can get the expected temporal join result: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1] [+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1] [+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1] However, if the filter was pushed down into the right table, the right sorted state will bocome: [+I, 0, online, 1970-01-01 08:00:00.000] [+I, 0, online, 1970-01-01 08:00:00.020] and the temporal join result will become: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.0010,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.0015,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1] [+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1] while the expected result is: [+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1] [+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1] [+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1] [+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]. > Incorrect result for filter after temporal join > ----------------------------------------------- > > Key: FLINK-28988 > URL: https://issues.apache.org/jira/browse/FLINK-28988 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.15.1 > Reporter: Xuannan Su > Assignee: Shuiqiang Chen > Priority: Major > > The following code can reproduce the case > > {code:java} > public class TemporalJoinSQLExample1 { > public static void main(String[] args) throws Exception { > // set up the Java DataStream API > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > final DataStreamSource<Tuple3<Integer, String, Instant>> ds = > env.fromElements( > new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), > new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), > new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); > final Table table = > tableEnv.fromDataStream( > ds, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.STRING()) > .column("f2", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f2", "f2 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "state", "ts"); > tableEnv.createTemporaryView("source_table", table); > final Table dedupeTable = > tableEnv.sqlQuery( > "SELECT * FROM (" > + " SELECT *, ROW_NUMBER() OVER (PARTITION BY > id ORDER BY ts DESC) AS row_num FROM source_table" > + ") WHERE row_num = 1"); > tableEnv.createTemporaryView("versioned_table", dedupeTable); > DataStreamSource<Tuple2<Integer, Instant>> event = > env.fromElements( > new Tuple2<>(0, Instant.ofEpochMilli(0)), > new Tuple2<>(0, Instant.ofEpochMilli(5)), > new Tuple2<>(0, Instant.ofEpochMilli(10)), > new Tuple2<>(0, Instant.ofEpochMilli(15)), > new Tuple2<>(0, Instant.ofEpochMilli(20)), > new Tuple2<>(0, Instant.ofEpochMilli(25))); > final Table eventTable = > tableEnv.fromDataStream( > event, > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", > DataTypes.TIMESTAMP_LTZ(3)) > .watermark("f1", "f1 - INTERVAL '2' > SECONDS") > .build()) > .as("id", "ts"); > tableEnv.createTemporaryView("event_table", eventTable); > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM event_table" > + " LEFT JOIN versioned_table FOR SYSTEM_TIME > AS OF event_table.ts" > + " ON event_table.id = versioned_table.id"); > result.execute().print(); > result.filter($("state").isEqual("online")).execute().print(); > } > } {code} > > The result of temporal join is the following: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.000| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.005| 0| > online|1970-01-01 08:00:00.000| 1| > |+I| 0|1970-01-01 08:00:00.010| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.015| 0| > offline|1970-01-01 08:00:00.010| 1| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020| 1| > |+I| 0|1970-01-01 08:00:00.025| 0| > online|1970-01-01 08:00:00.020| 1| > > After filtering with predicate state = 'online', I expect only the two rows > with state offline will be filtered out. But I got the following result: > |op| id| ts| id0| > state| ts0| row_num| > |+I| 0|1970-01-01 08:00:00.020| 0| > online|1970-01-01 08:00:00.020| 1| > |+I| 0|1970-01-01 08:00:00.025| 0| > online|1970-01-01 08:00:00.020| 1| > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)