> Right side columns of temporal left join are always null when right side is 
> filtered
> ------------------------------------------------------------------------------------
>                 Key: FLINK-35869
>                 URL: https://issues.apache.org/jira/browse/FLINK-35869
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.19.1
>            Reporter: Grzegorz Kołakowski
>            Priority: Major
> Background:
> I have a stream of user information updates in Kafka topic, but those updates 
> are partial, that is, the event contains only the fields that has changed 
> ("untouched" fields are empty strings in this example), for instance:
> {noformat}
> {"user_id":1} {"ts":"2024-07-18 
> 12:00:00","user_id":1,"city":"Warszawa","phone_number":""}
> {"user_id":1} {"ts":"2024-07-18 
> 12:01:00","user_id":1,"city":"","phone_number":"+48 123456789"}{noformat}
> In the main query for each element on the left side, I want to find the 
> latest user location (city column). To be able to create a correct versioned 
> table for temporal join I tried to filter the right side:
> {noformat}
> location_changes AS (SELECT * FROM user_data WHERE city <> ''){noformat}
> Full query:
> {code:sql}
> WITH password_logins AS (SELECT * FROM events WHERE `action` = 'login'),
> location_changes AS (SELECT * FROM user_data WHERE city <> '')
> FROM password_logins
> LEFT JOIN location_changes
> FOR SYSTEM_TIME AS OF password_logins.`ts`
> ON password_logins.`user_id` = location_changes.`user_id`;{code}
> If *WHERE city <> ''* filter is present, then all columns from right side are 
> always null:
> {noformat}
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | op |                      ts |     user_id |                         action 
> |                     ts0 |    user_id0 |                           city |    
>                phone_number |
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | +I | 2024-07-18 11:59:59.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:00:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:01:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:02:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:03:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:04:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |{noformat}
> If *WHERE city <> ''* filter is NOT present, right side columns are not 
> always null (but obviously the results are not as I wanted them to be).
> {noformat}
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | op |                      ts |     user_id |                         action 
> |                     ts0 |    user_id0 |                           city |    
>                phone_number |
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | +I | 2024-07-18 11:59:59.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:00:01.000 |           1 |                          login 
> | 2024-07-18 12:00:00.000 |           1 |                       Warszawa |    
>                             |
> | +I | 2024-07-18 12:01:01.000 |           1 |                          login 
> | 2024-07-18 12:01:00.000 |           1 |                                |    
>               +48 123456789 |
> | +I | 2024-07-18 12:02:01.000 |           1 |                          login 
> | 2024-07-18 12:02:00.000 |           1 |                         Kraków |    
>                             |
> | +I | 2024-07-18 12:03:01.000 |           1 |                          login 
> | 2024-07-18 12:03:00.000 |           1 |                                |    
>               +48 987654321 |
> | +I | 2024-07-18 12:04:01.000 |           1 |                          login 
> | 2024-07-18 12:04:00.000 |           1 |                         Gdańsk |    
>                             |{noformat}
> ----
> I ran the job with debugger and I noticed that in 
> {{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#processElement2}}
>  UPDATE_AFTER event usually comes before the corresponding UPDATE_BEFORE, 
> which I think is weird. In consequence, rightState usually contains only 
> UPDATE_BEFORE event for given timestamp.
> {code:java}
> @Override
> public void processElement2(StreamRecord<RowData> element) throws Exception {
>     RowData row = element.getValue();
>     long rowTime = getRightTime(row);
>     rightState.put(rowTime, row);   // rightState contains UPDATE_BEFORE with 
> timestamp=rowTime because UPDATE_BEFORE comes after UPDATE_AFTER
>     registerSmallestTimer(rowTime); // Timer to clean up the state
>     registerProcessingCleanupTimer();
> } {code}
> Then in 
> {{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#emitResultAndCleanUpState}}
>  the right side is always null because 
> {{RowDataUtil.isAccumulateMsg(rightRow.get())}} for UPDATE_BEFORE is false. 
> ({{{}rightRow.isPresent(){}}} -> true; 
> {{RowDataUtil.isAccumulateMsg(rightRow.get())}} -> false; so 
> \{{collectJoinedRow(leftRow, rightNullRow); }}is returned).
> {code:java}
> orderedLeftRecords.forEach(
>         (leftSeq, leftRow) -> {
>             long leftTime = getLeftTime(leftRow);
>             Optional<RowData> rightRow = 
> latestRightRowToJoin(rightRowsSorted, leftTime);
>             if (rightRow.isPresent() && 
> RowDataUtil.isAccumulateMsg(rightRow.get())) {
>                 if (joinCondition.apply(leftRow, rightRow.get())) {
>                     collectJoinedRow(leftRow, rightRow.get());
>                 } else {
>                     if (isLeftOuterJoin) {
>                         collectJoinedRow(leftRow, rightNullRow);
>                     }
>                 }
>             } else {
>                 if (isLeftOuterJoin) {
>                     collectJoinedRow(leftRow, rightNullRow);
>                 }
>             }
>         }); {code}
> ----
> Flink job and docker-compose.yaml to recreate the issue:
> {code:java}
> package com.merck.flink.udf;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> public class Main {
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>         tableEnv.executeSql("create table user_data (\n" +
>                 "    `ts` TIMESTAMP(3),\n" +
>                 "    `user_id` INTEGER,\n" +
>                 "    `city` VARCHAR,\n" +
>                 "    `phone_number` VARCHAR,\n" +
>                 "    watermark for `ts` AS `ts` - INTERVAL '1' SECONDS,\n" +
>                 "    PRIMARY KEY(`user_id`) NOT ENFORCED\n" +
>                 ") with (\n" +
>                 "    'connector' = 'upsert-kafka',\n" +
>                 "    'value.format' = 'json',\n" +
>                 "    'key.format' = 'raw',\n" +
>                 "    'properties.bootstrap.servers' = 'localhost:29092',\n" +
>                 "    'properties.group.id' = 'test-group-id-2',\n" +
>                 "    'topic' = 'user-cdc'\n" +
>                 ");");
>         tableEnv.executeSql("INSERT INTO user_data VALUES\n" +
>                 "    (CAST('2024-07-18 12:00:00.000' AS TIMESTAMP(3)), 1, 
> 'Warszawa', ''),\n" +
>                 "    (CAST('2024-07-18 12:01:00.000' AS TIMESTAMP(3)), 1, '', 
> '+48 123456789'),\n" +
>                 "    (CAST('2024-07-18 12:02:00.000' AS TIMESTAMP(3)), 1, 
> 'Kraków', ''),\n" +
>                 "    (CAST('2024-07-18 12:03:00.000' AS TIMESTAMP(3)), 1, '', 
> '+48 987654321'),\n" +
>                 "    (CAST('2024-07-18 12:04:00.000' AS TIMESTAMP(3)), 1, 
> 'Gdańsk', ''),\n" +
>                 "    (CAST('2024-07-18 12:05:00.000' AS TIMESTAMP(3)), 1, 
> 'Poznań', '');");
>         tableEnv.executeSql("create table events (\n" +
>                 "    `ts` TIMESTAMP(3),\n" +
>                 "    `user_id` INTEGER,\n" +
>                 "    `action` VARCHAR,\n" +
>                 "    watermark for `ts` AS `ts` - INTERVAL '1' SECONDS\n" +
>                 ") with (\n" +
>                 "    'connector' = 'kafka',\n" +
>                 "    'format' = 'json',\n" +
>                 "    'properties.bootstrap.servers' = 'localhost:29092',\n" +
>                 "    'properties.group.id' = 'test-group-id-2',\n" +
>                 "    'scan.startup.mode' = 'earliest-offset',\n" +
>                 "    'topic' = 'events'\n" +
>                 ");");
>         tableEnv.executeSql("INSERT INTO events VALUES\n" +
>                 "    (CAST('2024-07-18 11:59:59.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:00:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:01:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:02:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:03:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:04:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:05:01.000' AS TIMESTAMP(3)), 1, 
> 'login');");
>         Table table = tableEnv.sqlQuery("WITH password_logins AS (SELECT * 
> FROM events WHERE `action` = 'login'),\n" +
>                 // version with filtering: right side columns are always null 
> returns weird results
>                 "location_changes AS (SELECT * FROM user_data WHERE city <> 
> '')\n" +
>                 // version without filtering: right side colums are not 
> always null
>                 //"location_changes AS (SELECT * FROM user_data)\n" +
>                 "SELECT *\n" +
>                 "FROM password_logins\n" +
>                 "LEFT JOIN location_changes\n" +
>                 "FOR SYSTEM_TIME AS OF password_logins.`ts`\n" +
>                 "ON password_logins.`user_id` = location_changes.`user_id`;");
>         table.execute().print();
>     }
> }
>  {code}
> {code:yaml}
> ---
> services:   zookeeper:     image: zookeeper:3.8.0
>     ports:       - "2181:2181"
>   kafka:     image: wurstmeister/kafka:2.13-2.8.1
>     ports:       - "29092:29092"
>     depends_on:       - zookeeper
>     environment:       HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print 
> $$2}'"
> INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
>       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
>       KAFKA_CREATE_TOPICS: "events:1:1,user-cdc:1:1"
>     volumes:       - /var/run/docker.sock:/var/run/docker.sock {code}
> ----
> I'm not sure what should be the expected behaviour. I also saw 
> {{EventTimeTemporalJoinRewriteRule}} which is throws the error below in 
> simimar cases.
> {noformat}
> Filter is not allowed for right changelog input of event time temporal join, 
> it will corrupt the versioning of data. Please consider removing the filter 
> before joining.{noformat}

