[
https://issues.apache.org/jira/browse/FLINK-35869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18087911#comment-18087911
]
Qiu Yanjun commented on FLINK-35869:
------------------------------------
Hi, I would like to work on this bug. I can investigate the temporal left join
rewrite path around the filtered versioned table / watermark pushdown case and
add a focused regression test. Could a committer please assign this issue to me
if that sounds reasonable? Thanks!
> Right side columns of temporal left join are always null when right side is
> filtered
> ------------------------------------------------------------------------------------
>
> Key: FLINK-35869
> URL: https://issues.apache.org/jira/browse/FLINK-35869
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.19.1
> Reporter: Grzegorz Kołakowski
> Priority: Major
>
> Background:
> I have a stream of user information updates in Kafka topic, but those updates
> are partial, that is, the event contains only the fields that has changed
> ("untouched" fields are empty strings in this example), for instance:
> {noformat}
> {"user_id":1} {"ts":"2024-07-18
> 12:00:00","user_id":1,"city":"Warszawa","phone_number":""}
> {"user_id":1} {"ts":"2024-07-18
> 12:01:00","user_id":1,"city":"","phone_number":"+48 123456789"}{noformat}
> In the main query for each element on the left side, I want to find the
> latest user location (city column). To be able to create a correct versioned
> table for temporal join I tried to filter the right side:
> {noformat}
> location_changes AS (SELECT * FROM user_data WHERE city <> ''){noformat}
> Full query:
> {code:sql}
> WITH password_logins AS (SELECT * FROM events WHERE `action` = 'login'),
> location_changes AS (SELECT * FROM user_data WHERE city <> '')
> SELECT *
> FROM password_logins
> LEFT JOIN location_changes
> FOR SYSTEM_TIME AS OF password_logins.`ts`
> ON password_logins.`user_id` = location_changes.`user_id`;{code}
> If *WHERE city <> ''* filter is present, then all columns from right side are
> always null:
> {noformat}
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | op | ts | user_id | action
> | ts0 | user_id0 | city |
> phone_number |
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | +I | 2024-07-18 11:59:59.000 | 1 | login
> | <NULL> | <NULL> | <NULL> |
> <NULL> |
> | +I | 2024-07-18 12:00:01.000 | 1 | login
> | <NULL> | <NULL> | <NULL> |
> <NULL> |
> | +I | 2024-07-18 12:01:01.000 | 1 | login
> | <NULL> | <NULL> | <NULL> |
> <NULL> |
> | +I | 2024-07-18 12:02:01.000 | 1 | login
> | <NULL> | <NULL> | <NULL> |
> <NULL> |
> | +I | 2024-07-18 12:03:01.000 | 1 | login
> | <NULL> | <NULL> | <NULL> |
> <NULL> |
> | +I | 2024-07-18 12:04:01.000 | 1 | login
> | <NULL> | <NULL> | <NULL> |
> <NULL> |{noformat}
> If *WHERE city <> ''* filter is NOT present, right side columns are not
> always null (but obviously the results are not as I wanted them to be).
> {noformat}
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | op | ts | user_id | action
> | ts0 | user_id0 | city |
> phone_number |
> +----+-------------------------+-------------+--------------------------------+-------------------------+-------------+--------------------------------+--------------------------------+
> | +I | 2024-07-18 11:59:59.000 | 1 | login
> | <NULL> | <NULL> | <NULL> |
> <NULL> |
> | +I | 2024-07-18 12:00:01.000 | 1 | login
> | 2024-07-18 12:00:00.000 | 1 | Warszawa |
> |
> | +I | 2024-07-18 12:01:01.000 | 1 | login
> | 2024-07-18 12:01:00.000 | 1 | |
> +48 123456789 |
> | +I | 2024-07-18 12:02:01.000 | 1 | login
> | 2024-07-18 12:02:00.000 | 1 | Kraków |
> |
> | +I | 2024-07-18 12:03:01.000 | 1 | login
> | 2024-07-18 12:03:00.000 | 1 | |
> +48 987654321 |
> | +I | 2024-07-18 12:04:01.000 | 1 | login
> | 2024-07-18 12:04:00.000 | 1 | Gdańsk |
> |{noformat}
> ----
>
> I ran the job with debugger and I noticed that in
> {{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#processElement2}}
> UPDATE_AFTER event usually comes before the corresponding UPDATE_BEFORE,
> which I think is weird. In consequence, rightState usually contains only
> UPDATE_BEFORE event for given timestamp.
> {code:java}
> @Override
> public void processElement2(StreamRecord<RowData> element) throws Exception {
> RowData row = element.getValue();
> long rowTime = getRightTime(row);
> rightState.put(rowTime, row); // rightState contains UPDATE_BEFORE with
> timestamp=rowTime because UPDATE_BEFORE comes after UPDATE_AFTER
> registerSmallestTimer(rowTime); // Timer to clean up the state
> registerProcessingCleanupTimer();
> } {code}
> Then in
> {{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#emitResultAndCleanUpState}}
> the right side columns are always null because
> {{RowDataUtil.isAccumulateMsg(rightRow.get())}} for UPDATE_BEFORE is false.
> ({{{}rightRow.isPresent(){}}} -> true;
> {{RowDataUtil.isAccumulateMsg(rightRow.get())}} -> false; so
> \{{collectJoinedRow(leftRow, rightNullRow); }}is returned).
>
> {code:java}
> orderedLeftRecords.forEach(
> (leftSeq, leftRow) -> {
> long leftTime = getLeftTime(leftRow);
> Optional<RowData> rightRow =
> latestRightRowToJoin(rightRowsSorted, leftTime);
> if (rightRow.isPresent() &&
> RowDataUtil.isAccumulateMsg(rightRow.get())) {
> if (joinCondition.apply(leftRow, rightRow.get())) {
> collectJoinedRow(leftRow, rightRow.get());
> } else {
> if (isLeftOuterJoin) {
> collectJoinedRow(leftRow, rightNullRow);
> }
> }
> } else {
> if (isLeftOuterJoin) {
> collectJoinedRow(leftRow, rightNullRow);
> }
> }
> }); {code}
>
> ----
> Flink job and docker-compose.yaml to recreate the issue:
> {code:java}
> package com.merck.flink.udf;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> public class Main {
> public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> tableEnv.executeSql("create table user_data (\n" +
> " `ts` TIMESTAMP(3),\n" +
> " `user_id` INTEGER,\n" +
> " `city` VARCHAR,\n" +
> " `phone_number` VARCHAR,\n" +
> " watermark for `ts` AS `ts` - INTERVAL '1' SECONDS,\n" +
> " PRIMARY KEY(`user_id`) NOT ENFORCED\n" +
> ") with (\n" +
> " 'connector' = 'upsert-kafka',\n" +
> " 'value.format' = 'json',\n" +
> " 'key.format' = 'raw',\n" +
> " 'properties.bootstrap.servers' = 'localhost:29092',\n" +
> " 'properties.group.id' = 'test-group-id-2',\n" +
> " 'topic' = 'user-cdc'\n" +
> ");");
> tableEnv.executeSql("INSERT INTO user_data VALUES\n" +
> " (CAST('2024-07-18 12:00:00.000' AS TIMESTAMP(3)), 1,
> 'Warszawa', ''),\n" +
> " (CAST('2024-07-18 12:01:00.000' AS TIMESTAMP(3)), 1, '',
> '+48 123456789'),\n" +
> " (CAST('2024-07-18 12:02:00.000' AS TIMESTAMP(3)), 1,
> 'Kraków', ''),\n" +
> " (CAST('2024-07-18 12:03:00.000' AS TIMESTAMP(3)), 1, '',
> '+48 987654321'),\n" +
> " (CAST('2024-07-18 12:04:00.000' AS TIMESTAMP(3)), 1,
> 'Gdańsk', ''),\n" +
> " (CAST('2024-07-18 12:05:00.000' AS TIMESTAMP(3)), 1,
> 'Poznań', '');");
> tableEnv.executeSql("create table events (\n" +
> " `ts` TIMESTAMP(3),\n" +
> " `user_id` INTEGER,\n" +
> " `action` VARCHAR,\n" +
> " watermark for `ts` AS `ts` - INTERVAL '1' SECONDS\n" +
> ") with (\n" +
> " 'connector' = 'kafka',\n" +
> " 'format' = 'json',\n" +
> " 'properties.bootstrap.servers' = 'localhost:29092',\n" +
> " 'properties.group.id' = 'test-group-id-2',\n" +
> " 'scan.startup.mode' = 'earliest-offset',\n" +
> " 'topic' = 'events'\n" +
> ");");
> tableEnv.executeSql("INSERT INTO events VALUES\n" +
> " (CAST('2024-07-18 11:59:59.000' AS TIMESTAMP(3)), 1,
> 'login'),\n" +
> " (CAST('2024-07-18 12:00:01.000' AS TIMESTAMP(3)), 1,
> 'login'),\n" +
> " (CAST('2024-07-18 12:01:01.000' AS TIMESTAMP(3)), 1,
> 'login'),\n" +
> " (CAST('2024-07-18 12:02:01.000' AS TIMESTAMP(3)), 1,
> 'login'),\n" +
> " (CAST('2024-07-18 12:03:01.000' AS TIMESTAMP(3)), 1,
> 'login'),\n" +
> " (CAST('2024-07-18 12:04:01.000' AS TIMESTAMP(3)), 1,
> 'login'),\n" +
> " (CAST('2024-07-18 12:05:01.000' AS TIMESTAMP(3)), 1,
> 'login');");
> Table table = tableEnv.sqlQuery("WITH password_logins AS (SELECT *
> FROM events WHERE `action` = 'login'),\n" +
> // version with filtering: right side columns are always null
> returns weird results
> "location_changes AS (SELECT * FROM user_data WHERE city <>
> '')\n" +
> // version without filtering: right side colums are not
> always null
> //"location_changes AS (SELECT * FROM user_data)\n" +
> "SELECT *\n" +
> "FROM password_logins\n" +
> "LEFT JOIN location_changes\n" +
> "FOR SYSTEM_TIME AS OF password_logins.`ts`\n" +
> "ON password_logins.`user_id` = location_changes.`user_id`;");
> table.execute().print();
> }
> }
> {code}
>
> {code:yaml}
> ---
> services: zookeeper: image: zookeeper:3.8.0
> ports: - "2181:2181"
> kafka: image: wurstmeister/kafka:2.13-2.8.1
> ports: - "29092:29092"
> depends_on: - zookeeper
> environment: HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print
> $$2}'"
> KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,INTERNAL://:9092
> KAFKA_ADVERTISED_LISTENERS:
> INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
> INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
> KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
> KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
> KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
> KAFKA_CREATE_TOPICS: "events:1:1,user-cdc:1:1"
> volumes: - /var/run/docker.sock:/var/run/docker.sock {code}
> ----
> I'm not sure what should be the expected behaviour. I also saw
> {{EventTimeTemporalJoinRewriteRule}} which is throws the error below in
> simimar cases.
> {noformat}
> Filter is not allowed for right changelog input of event time temporal join,
> it will corrupt the versioning of data. Please consider removing the filter
> before joining.{noformat}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)