[ 
https://issues.apache.org/jira/browse/FLINK-35869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grzegorz Kołakowski updated FLINK-35869:
----------------------------------------
    Description: 
Background:

I have a stream of user information updates in Kafka topic, but those updates 
are partial, that is, the event contains only the fields that has changed 
("untouched" fields are empty strings in this example), for instance:
{noformat}
{"user_id":1}   {"ts":"2024-07-18 
12:00:00","user_id":1,"city":"Warszawa","phone_number":""}
{"user_id":1}   {"ts":"2024-07-18 
12:01:00","user_id":1,"city":"","phone_number":"+48 123456789"}{noformat}
In the main query for each element on the left side, I want to find the latest 
user location (city column). To be able to create a correct versioned table for 
temporal join I tried to filter the right side:
{noformat}
location_changes AS (SELECT * FROM user_data WHERE city <> ''){noformat}
Full query:
{code:sql}
WITH password_logins AS (SELECT * FROM events WHERE `action` = 'login'),
location_changes AS (SELECT * FROM user_data WHERE city <> '')
SELECT *
FROM password_logins
LEFT JOIN location_changes
FOR SYSTEM_TIME AS OF password_logins.`ts`
ON password_logins.`user_id` = location_changes.`user_id`;{code}
If *WHERE city <> ''* filter is present, then all columns from right side are 
always null:
{noformat}
+----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
| op |                      ts |     user_id |                         action | 
                    ts0 |    user_id0 |                           city |        
           phone_number |
+----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
| +I | 2024-07-18 11:59:59.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:00:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:01:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:02:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:03:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:04:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |{noformat}
If *WHERE city <> ''* filter is NOT present, right side columns are not always 
null (but obviously the results are not as I wanted them to be).
{noformat}
+----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
| op |                      ts |     user_id |                         action | 
                    ts0 |    user_id0 |                           city |        
           phone_number |
+----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
| +I | 2024-07-18 11:59:59.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:00:01.000 |           1 |                          login | 
2024-07-18 12:00:00.000 |           1 |                       Warszawa |        
                        |
| +I | 2024-07-18 12:01:01.000 |           1 |                          login | 
2024-07-18 12:01:00.000 |           1 |                                |        
          +48 123456789 |
| +I | 2024-07-18 12:02:01.000 |           1 |                          login | 
2024-07-18 12:02:00.000 |           1 |                         Kraków |        
                        |
| +I | 2024-07-18 12:03:01.000 |           1 |                          login | 
2024-07-18 12:03:00.000 |           1 |                                |        
          +48 987654321 |
| +I | 2024-07-18 12:04:01.000 |           1 |                          login | 
2024-07-18 12:04:00.000 |           1 |                         Gdańsk |        
                        |{noformat}
----
 

I ran the job with debugger and I noticed that in 
{{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#processElement2}}
 UPDATE_AFTER event usually comes before the corresponding UPDATE_BEFORE, which 
I think is weird. In consequence, rightState usually contains only 
UPDATE_BEFORE event for given timestamp.
{code:java}
@Override
public void processElement2(StreamRecord<RowData> element) throws Exception {
    RowData row = element.getValue();
    long rowTime = getRightTime(row);
    rightState.put(rowTime, row);   // rightState contains UPDATE_BEFORE with 
timestamp=rowTime because UPDATE_BEFORE comes after UPDATE_AFTER
    registerSmallestTimer(rowTime); // Timer to clean up the state
    registerProcessingCleanupTimer();
} {code}
Then in 
{{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#emitResultAndCleanUpState}}
 the right side columns are always null because 
{{RowDataUtil.isAccumulateMsg(rightRow.get())}} for UPDATE_BEFORE is false. 
({{{}rightRow.isPresent(){}}} -> true; 
{{RowDataUtil.isAccumulateMsg(rightRow.get())}} -> false; so 
\{{collectJoinedRow(leftRow, rightNullRow); }}is returned).

 
{code:java}
orderedLeftRecords.forEach(
        (leftSeq, leftRow) -> {
            long leftTime = getLeftTime(leftRow);
            Optional<RowData> rightRow = latestRightRowToJoin(rightRowsSorted, 
leftTime);
            if (rightRow.isPresent() && 
RowDataUtil.isAccumulateMsg(rightRow.get())) {
                if (joinCondition.apply(leftRow, rightRow.get())) {
                    collectJoinedRow(leftRow, rightRow.get());
                } else {
                    if (isLeftOuterJoin) {
                        collectJoinedRow(leftRow, rightNullRow);
                    }
                }
            } else {
                if (isLeftOuterJoin) {
                    collectJoinedRow(leftRow, rightNullRow);
                }
            }
        }); {code}
 
----
Flink job and docker-compose.yaml to recreate the issue:
{code:java}
package com.merck.flink.udf;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Main {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table user_data (\n" +
                "    `ts` TIMESTAMP(3),\n" +
                "    `user_id` INTEGER,\n" +
                "    `city` VARCHAR,\n" +
                "    `phone_number` VARCHAR,\n" +
                "    watermark for `ts` AS `ts` - INTERVAL '1' SECONDS,\n" +
                "    PRIMARY KEY(`user_id`) NOT ENFORCED\n" +
                ") with (\n" +
                "    'connector' = 'upsert-kafka',\n" +
                "    'value.format' = 'json',\n" +
                "    'key.format' = 'raw',\n" +
                "    'properties.bootstrap.servers' = 'localhost:29092',\n" +
                "    'properties.group.id' = 'test-group-id-2',\n" +
                "    'topic' = 'user-cdc'\n" +
                ");");
        tableEnv.executeSql("INSERT INTO user_data VALUES\n" +
                "    (CAST('2024-07-18 12:00:00.000' AS TIMESTAMP(3)), 1, 
'Warszawa', ''),\n" +
                "    (CAST('2024-07-18 12:01:00.000' AS TIMESTAMP(3)), 1, '', 
'+48 123456789'),\n" +
                "    (CAST('2024-07-18 12:02:00.000' AS TIMESTAMP(3)), 1, 
'Kraków', ''),\n" +
                "    (CAST('2024-07-18 12:03:00.000' AS TIMESTAMP(3)), 1, '', 
'+48 987654321'),\n" +
                "    (CAST('2024-07-18 12:04:00.000' AS TIMESTAMP(3)), 1, 
'Gdańsk', ''),\n" +
                "    (CAST('2024-07-18 12:05:00.000' AS TIMESTAMP(3)), 1, 
'Poznań', '');");

        tableEnv.executeSql("create table events (\n" +
                "    `ts` TIMESTAMP(3),\n" +
                "    `user_id` INTEGER,\n" +
                "    `action` VARCHAR,\n" +
                "    watermark for `ts` AS `ts` - INTERVAL '1' SECONDS\n" +
                ") with (\n" +
                "    'connector' = 'kafka',\n" +
                "    'format' = 'json',\n" +
                "    'properties.bootstrap.servers' = 'localhost:29092',\n" +
                "    'properties.group.id' = 'test-group-id-2',\n" +
                "    'scan.startup.mode' = 'earliest-offset',\n" +
                "    'topic' = 'events'\n" +
                ");");
        tableEnv.executeSql("INSERT INTO events VALUES\n" +
                "    (CAST('2024-07-18 11:59:59.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:00:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:01:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:02:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:03:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:04:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:05:01.000' AS TIMESTAMP(3)), 1, 
'login');");

        Table table = tableEnv.sqlQuery("WITH password_logins AS (SELECT * FROM 
events WHERE `action` = 'login'),\n" +
                // version with filtering: right side columns are always null 
returns weird results
                "location_changes AS (SELECT * FROM user_data WHERE city <> 
'')\n" +
                // version without filtering: right side colums are not always 
null
                //"location_changes AS (SELECT * FROM user_data)\n" +
                "SELECT *\n" +
                "FROM password_logins\n" +
                "LEFT JOIN location_changes\n" +
                "FOR SYSTEM_TIME AS OF password_logins.`ts`\n" +
                "ON password_logins.`user_id` = location_changes.`user_id`;");
        table.execute().print();
    }
}
 {code}
 
{code:yaml}
---
services:   zookeeper:     image: zookeeper:3.8.0
    ports:       - "2181:2181"

  kafka:     image: wurstmeister/kafka:2.13-2.8.1
    ports:       - "29092:29092"
    depends_on:       - zookeeper
    environment:       HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
      KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,INTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: 
INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "events:1:1,user-cdc:1:1"
    volumes:       - /var/run/docker.sock:/var/run/docker.sock {code}
----
I'm not sure what should be the expected behaviour. I also saw 
{{EventTimeTemporalJoinRewriteRule}} which is throws the error below in simimar 
cases.
{noformat}
Filter is not allowed for right changelog input of event time temporal join, it 
will corrupt the versioning of data. Please consider removing the filter before 
joining.{noformat}
 

  was:
Background:

I have a stream of user information updates in Kafka topic, but those updates 
are partial, that is, the event contains only the fields that has changed 
("untouched" fields are empty strings in this example), for instance:
{noformat}
{"user_id":1}   {"ts":"2024-07-18 
12:00:00","user_id":1,"city":"Warszawa","phone_number":""}
{"user_id":1}   {"ts":"2024-07-18 
12:01:00","user_id":1,"city":"","phone_number":"+48 123456789"}{noformat}
In the main query for each element on the left side, I want to find the latest 
user location (city column). To be able to create a correct versioned table for 
temporal join I tried to filter the right side:
{noformat}
location_changes AS (SELECT * FROM user_data WHERE city <> ''){noformat}
Full query:
{code:sql}
WITH password_logins AS (SELECT * FROM events WHERE `action` = 'login'),
location_changes AS (SELECT * FROM user_data WHERE city <> '')
SELECT *
FROM password_logins
LEFT JOIN location_changes
FOR SYSTEM_TIME AS OF password_logins.`ts`
ON password_logins.`user_id` = location_changes.`user_id`;{code}
If *WHERE city <> ''* filter is present, then all columns from right side are 
always null:
{noformat}
+----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
| op |                      ts |     user_id |                         action | 
                    ts0 |    user_id0 |                           city |        
           phone_number |
+----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
| +I | 2024-07-18 11:59:59.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:00:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:01:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:02:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:03:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:04:01.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |{noformat}
If *WHERE city <> ''* filter is NOT present, right side columns are not always 
null (but obviously the results are not as I wanted them to be).
{noformat}
+----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
| op |                      ts |     user_id |                         action | 
                    ts0 |    user_id0 |                           city |        
           phone_number |
+----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
| +I | 2024-07-18 11:59:59.000 |           1 |                          login | 
                 <NULL> |      <NULL> |                         <NULL> |        
                 <NULL> |
| +I | 2024-07-18 12:00:01.000 |           1 |                          login | 
2024-07-18 12:00:00.000 |           1 |                       Warszawa |        
                        |
| +I | 2024-07-18 12:01:01.000 |           1 |                          login | 
2024-07-18 12:01:00.000 |           1 |                                |        
          +48 123456789 |
| +I | 2024-07-18 12:02:01.000 |           1 |                          login | 
2024-07-18 12:02:00.000 |           1 |                         Kraków |        
                        |
| +I | 2024-07-18 12:03:01.000 |           1 |                          login | 
2024-07-18 12:03:00.000 |           1 |                                |        
          +48 987654321 |
| +I | 2024-07-18 12:04:01.000 |           1 |                          login | 
2024-07-18 12:04:00.000 |           1 |                         Gdańsk |        
                        |{noformat}
----
 

I ran the job with debugger and I noticed that in 
{{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#processElement2}}
 UPDATE_AFTER event usually comes before the corresponding UPDATE_BEFORE, which 
I think is weird. In consequence, rightState usually contains only 
UPDATE_BEFORE event for given timestamp.
{code:java}
@Override
public void processElement2(StreamRecord<RowData> element) throws Exception {
    RowData row = element.getValue();
    long rowTime = getRightTime(row);
    rightState.put(rowTime, row);   // rightState contains UPDATE_BEFORE with 
timestamp=rowTime because UPDATE_BEFORE comes after UPDATE_AFTER
    registerSmallestTimer(rowTime); // Timer to clean up the state
    registerProcessingCleanupTimer();
} {code}
Then in 
{{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#emitResultAndCleanUpState}}
 the right side is always null because 
{{RowDataUtil.isAccumulateMsg(rightRow.get())}} for UPDATE_BEFORE is false. 
({{{}rightRow.isPresent(){}}} -> true; 
{{RowDataUtil.isAccumulateMsg(rightRow.get())}} -> false; so 
\{{collectJoinedRow(leftRow, rightNullRow); }}is returned).

 
{code:java}
orderedLeftRecords.forEach(
        (leftSeq, leftRow) -> {
            long leftTime = getLeftTime(leftRow);
            Optional<RowData> rightRow = latestRightRowToJoin(rightRowsSorted, 
leftTime);
            if (rightRow.isPresent() && 
RowDataUtil.isAccumulateMsg(rightRow.get())) {
                if (joinCondition.apply(leftRow, rightRow.get())) {
                    collectJoinedRow(leftRow, rightRow.get());
                } else {
                    if (isLeftOuterJoin) {
                        collectJoinedRow(leftRow, rightNullRow);
                    }
                }
            } else {
                if (isLeftOuterJoin) {
                    collectJoinedRow(leftRow, rightNullRow);
                }
            }
        }); {code}
 
----
Flink job and docker-compose.yaml to recreate the issue:
{code:java}
package com.merck.flink.udf;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Main {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table user_data (\n" +
                "    `ts` TIMESTAMP(3),\n" +
                "    `user_id` INTEGER,\n" +
                "    `city` VARCHAR,\n" +
                "    `phone_number` VARCHAR,\n" +
                "    watermark for `ts` AS `ts` - INTERVAL '1' SECONDS,\n" +
                "    PRIMARY KEY(`user_id`) NOT ENFORCED\n" +
                ") with (\n" +
                "    'connector' = 'upsert-kafka',\n" +
                "    'value.format' = 'json',\n" +
                "    'key.format' = 'raw',\n" +
                "    'properties.bootstrap.servers' = 'localhost:29092',\n" +
                "    'properties.group.id' = 'test-group-id-2',\n" +
                "    'topic' = 'user-cdc'\n" +
                ");");
        tableEnv.executeSql("INSERT INTO user_data VALUES\n" +
                "    (CAST('2024-07-18 12:00:00.000' AS TIMESTAMP(3)), 1, 
'Warszawa', ''),\n" +
                "    (CAST('2024-07-18 12:01:00.000' AS TIMESTAMP(3)), 1, '', 
'+48 123456789'),\n" +
                "    (CAST('2024-07-18 12:02:00.000' AS TIMESTAMP(3)), 1, 
'Kraków', ''),\n" +
                "    (CAST('2024-07-18 12:03:00.000' AS TIMESTAMP(3)), 1, '', 
'+48 987654321'),\n" +
                "    (CAST('2024-07-18 12:04:00.000' AS TIMESTAMP(3)), 1, 
'Gdańsk', ''),\n" +
                "    (CAST('2024-07-18 12:05:00.000' AS TIMESTAMP(3)), 1, 
'Poznań', '');");

        tableEnv.executeSql("create table events (\n" +
                "    `ts` TIMESTAMP(3),\n" +
                "    `user_id` INTEGER,\n" +
                "    `action` VARCHAR,\n" +
                "    watermark for `ts` AS `ts` - INTERVAL '1' SECONDS\n" +
                ") with (\n" +
                "    'connector' = 'kafka',\n" +
                "    'format' = 'json',\n" +
                "    'properties.bootstrap.servers' = 'localhost:29092',\n" +
                "    'properties.group.id' = 'test-group-id-2',\n" +
                "    'scan.startup.mode' = 'earliest-offset',\n" +
                "    'topic' = 'events'\n" +
                ");");
        tableEnv.executeSql("INSERT INTO events VALUES\n" +
                "    (CAST('2024-07-18 11:59:59.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:00:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:01:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:02:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:03:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:04:01.000' AS TIMESTAMP(3)), 1, 
'login'),\n" +
                "    (CAST('2024-07-18 12:05:01.000' AS TIMESTAMP(3)), 1, 
'login');");

        Table table = tableEnv.sqlQuery("WITH password_logins AS (SELECT * FROM 
events WHERE `action` = 'login'),\n" +
                // version with filtering: right side columns are always null 
returns weird results
                "location_changes AS (SELECT * FROM user_data WHERE city <> 
'')\n" +
                // version without filtering: right side colums are not always 
null
                //"location_changes AS (SELECT * FROM user_data)\n" +
                "SELECT *\n" +
                "FROM password_logins\n" +
                "LEFT JOIN location_changes\n" +
                "FOR SYSTEM_TIME AS OF password_logins.`ts`\n" +
                "ON password_logins.`user_id` = location_changes.`user_id`;");
        table.execute().print();
    }
}
 {code}
 
{code:yaml}
---
services:   zookeeper:     image: zookeeper:3.8.0
    ports:       - "2181:2181"

  kafka:     image: wurstmeister/kafka:2.13-2.8.1
    ports:       - "29092:29092"
    depends_on:       - zookeeper
    environment:       HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
      KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,INTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: 
INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "events:1:1,user-cdc:1:1"
    volumes:       - /var/run/docker.sock:/var/run/docker.sock {code}
----
I'm not sure what should be the expected behaviour. I also saw 
{{EventTimeTemporalJoinRewriteRule}} which is throws the error below in simimar 
cases.
{noformat}
Filter is not allowed for right changelog input of event time temporal join, it 
will corrupt the versioning of data. Please consider removing the filter before 
joining.{noformat}
 


> Right side columns of temporal left join are always null when right side is 
> filtered
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-35869
>                 URL: https://issues.apache.org/jira/browse/FLINK-35869
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.19.1
>            Reporter: Grzegorz Kołakowski
>            Priority: Major
>
> Background:
> I have a stream of user information updates in Kafka topic, but those updates 
> are partial, that is, the event contains only the fields that has changed 
> ("untouched" fields are empty strings in this example), for instance:
> {noformat}
> {"user_id":1} {"ts":"2024-07-18 
> 12:00:00","user_id":1,"city":"Warszawa","phone_number":""}
> {"user_id":1} {"ts":"2024-07-18 
> 12:01:00","user_id":1,"city":"","phone_number":"+48 123456789"}{noformat}
> In the main query for each element on the left side, I want to find the 
> latest user location (city column). To be able to create a correct versioned 
> table for temporal join I tried to filter the right side:
> {noformat}
> location_changes AS (SELECT * FROM user_data WHERE city <> ''){noformat}
> Full query:
> {code:sql}
> WITH password_logins AS (SELECT * FROM events WHERE `action` = 'login'),
> location_changes AS (SELECT * FROM user_data WHERE city <> '')
> SELECT *
> FROM password_logins
> LEFT JOIN location_changes
> FOR SYSTEM_TIME AS OF password_logins.`ts`
> ON password_logins.`user_id` = location_changes.`user_id`;{code}
> If *WHERE city <> ''* filter is present, then all columns from right side are 
> always null:
> {noformat}
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | op |                      ts |     user_id |                         action 
> |                     ts0 |    user_id0 |                           city |    
>                phone_number |
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | +I | 2024-07-18 11:59:59.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:00:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:01:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:02:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:03:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:04:01.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |{noformat}
> If *WHERE city <> ''* filter is NOT present, right side columns are not 
> always null (but obviously the results are not as I wanted them to be).
> {noformat}
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | op |                      ts |     user_id |                         action 
> |                     ts0 |    user_id0 |                           city |    
>                phone_number |
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | +I | 2024-07-18 11:59:59.000 |           1 |                          login 
> |                  <NULL> |      <NULL> |                         <NULL> |    
>                      <NULL> |
> | +I | 2024-07-18 12:00:01.000 |           1 |                          login 
> | 2024-07-18 12:00:00.000 |           1 |                       Warszawa |    
>                             |
> | +I | 2024-07-18 12:01:01.000 |           1 |                          login 
> | 2024-07-18 12:01:00.000 |           1 |                                |    
>               +48 123456789 |
> | +I | 2024-07-18 12:02:01.000 |           1 |                          login 
> | 2024-07-18 12:02:00.000 |           1 |                         Kraków |    
>                             |
> | +I | 2024-07-18 12:03:01.000 |           1 |                          login 
> | 2024-07-18 12:03:00.000 |           1 |                                |    
>               +48 987654321 |
> | +I | 2024-07-18 12:04:01.000 |           1 |                          login 
> | 2024-07-18 12:04:00.000 |           1 |                         Gdańsk |    
>                             |{noformat}
> ----
>  
> I ran the job with debugger and I noticed that in 
> {{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#processElement2}}
>  UPDATE_AFTER event usually comes before the corresponding UPDATE_BEFORE, 
> which I think is weird. In consequence, rightState usually contains only 
> UPDATE_BEFORE event for given timestamp.
> {code:java}
> @Override
> public void processElement2(StreamRecord<RowData> element) throws Exception {
>     RowData row = element.getValue();
>     long rowTime = getRightTime(row);
>     rightState.put(rowTime, row);   // rightState contains UPDATE_BEFORE with 
> timestamp=rowTime because UPDATE_BEFORE comes after UPDATE_AFTER
>     registerSmallestTimer(rowTime); // Timer to clean up the state
>     registerProcessingCleanupTimer();
> } {code}
> Then in 
> {{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#emitResultAndCleanUpState}}
>  the right side columns are always null because 
> {{RowDataUtil.isAccumulateMsg(rightRow.get())}} for UPDATE_BEFORE is false. 
> ({{{}rightRow.isPresent(){}}} -> true; 
> {{RowDataUtil.isAccumulateMsg(rightRow.get())}} -> false; so 
> \{{collectJoinedRow(leftRow, rightNullRow); }}is returned).
>  
> {code:java}
> orderedLeftRecords.forEach(
>         (leftSeq, leftRow) -> {
>             long leftTime = getLeftTime(leftRow);
>             Optional<RowData> rightRow = 
> latestRightRowToJoin(rightRowsSorted, leftTime);
>             if (rightRow.isPresent() && 
> RowDataUtil.isAccumulateMsg(rightRow.get())) {
>                 if (joinCondition.apply(leftRow, rightRow.get())) {
>                     collectJoinedRow(leftRow, rightRow.get());
>                 } else {
>                     if (isLeftOuterJoin) {
>                         collectJoinedRow(leftRow, rightNullRow);
>                     }
>                 }
>             } else {
>                 if (isLeftOuterJoin) {
>                     collectJoinedRow(leftRow, rightNullRow);
>                 }
>             }
>         }); {code}
>  
> ----
> Flink job and docker-compose.yaml to recreate the issue:
> {code:java}
> package com.merck.flink.udf;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> public class Main {
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>         tableEnv.executeSql("create table user_data (\n" +
>                 "    `ts` TIMESTAMP(3),\n" +
>                 "    `user_id` INTEGER,\n" +
>                 "    `city` VARCHAR,\n" +
>                 "    `phone_number` VARCHAR,\n" +
>                 "    watermark for `ts` AS `ts` - INTERVAL '1' SECONDS,\n" +
>                 "    PRIMARY KEY(`user_id`) NOT ENFORCED\n" +
>                 ") with (\n" +
>                 "    'connector' = 'upsert-kafka',\n" +
>                 "    'value.format' = 'json',\n" +
>                 "    'key.format' = 'raw',\n" +
>                 "    'properties.bootstrap.servers' = 'localhost:29092',\n" +
>                 "    'properties.group.id' = 'test-group-id-2',\n" +
>                 "    'topic' = 'user-cdc'\n" +
>                 ");");
>         tableEnv.executeSql("INSERT INTO user_data VALUES\n" +
>                 "    (CAST('2024-07-18 12:00:00.000' AS TIMESTAMP(3)), 1, 
> 'Warszawa', ''),\n" +
>                 "    (CAST('2024-07-18 12:01:00.000' AS TIMESTAMP(3)), 1, '', 
> '+48 123456789'),\n" +
>                 "    (CAST('2024-07-18 12:02:00.000' AS TIMESTAMP(3)), 1, 
> 'Kraków', ''),\n" +
>                 "    (CAST('2024-07-18 12:03:00.000' AS TIMESTAMP(3)), 1, '', 
> '+48 987654321'),\n" +
>                 "    (CAST('2024-07-18 12:04:00.000' AS TIMESTAMP(3)), 1, 
> 'Gdańsk', ''),\n" +
>                 "    (CAST('2024-07-18 12:05:00.000' AS TIMESTAMP(3)), 1, 
> 'Poznań', '');");
>         tableEnv.executeSql("create table events (\n" +
>                 "    `ts` TIMESTAMP(3),\n" +
>                 "    `user_id` INTEGER,\n" +
>                 "    `action` VARCHAR,\n" +
>                 "    watermark for `ts` AS `ts` - INTERVAL '1' SECONDS\n" +
>                 ") with (\n" +
>                 "    'connector' = 'kafka',\n" +
>                 "    'format' = 'json',\n" +
>                 "    'properties.bootstrap.servers' = 'localhost:29092',\n" +
>                 "    'properties.group.id' = 'test-group-id-2',\n" +
>                 "    'scan.startup.mode' = 'earliest-offset',\n" +
>                 "    'topic' = 'events'\n" +
>                 ");");
>         tableEnv.executeSql("INSERT INTO events VALUES\n" +
>                 "    (CAST('2024-07-18 11:59:59.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:00:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:01:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:02:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:03:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:04:01.000' AS TIMESTAMP(3)), 1, 
> 'login'),\n" +
>                 "    (CAST('2024-07-18 12:05:01.000' AS TIMESTAMP(3)), 1, 
> 'login');");
>         Table table = tableEnv.sqlQuery("WITH password_logins AS (SELECT * 
> FROM events WHERE `action` = 'login'),\n" +
>                 // version with filtering: right side columns are always null 
> returns weird results
>                 "location_changes AS (SELECT * FROM user_data WHERE city <> 
> '')\n" +
>                 // version without filtering: right side colums are not 
> always null
>                 //"location_changes AS (SELECT * FROM user_data)\n" +
>                 "SELECT *\n" +
>                 "FROM password_logins\n" +
>                 "LEFT JOIN location_changes\n" +
>                 "FOR SYSTEM_TIME AS OF password_logins.`ts`\n" +
>                 "ON password_logins.`user_id` = location_changes.`user_id`;");
>         table.execute().print();
>     }
> }
>  {code}
>  
> {code:yaml}
> ---
> services:   zookeeper:     image: zookeeper:3.8.0
>     ports:       - "2181:2181"
>   kafka:     image: wurstmeister/kafka:2.13-2.8.1
>     ports:       - "29092:29092"
>     depends_on:       - zookeeper
>     environment:       HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print 
> $$2}'"
>       KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,INTERNAL://:9092
>       KAFKA_ADVERTISED_LISTENERS: 
> INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
>       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
> INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
>       KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
>       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
>       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
>       KAFKA_CREATE_TOPICS: "events:1:1,user-cdc:1:1"
>     volumes:       - /var/run/docker.sock:/var/run/docker.sock {code}
> ----
> I'm not sure what should be the expected behaviour. I also saw 
> {{EventTimeTemporalJoinRewriteRule}} which is throws the error below in 
> simimar cases.
> {noformat}
> Filter is not allowed for right changelog input of event time temporal join, 
> it will corrupt the versioning of data. Please consider removing the filter 
> before joining.{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to