[ https://issues.apache.org/jira/browse/FLINK-37134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931128#comment-17931128 ]
Yang Li edited comment on FLINK-37134 at 2/27/25 11:50 AM: ----------------------------------------------------------- h1. Conclusion The overall behavior meets expectations. h1. Environment Prepare ./bin/start-cluster.sh ./bin/sql-gateway.sh start ./bin/sql-client.sh gateway --endpoint [http://127.0.0.1:8083|http://127.0.0.1:8083/] Flink SQL> CREATE CATALOG mt_cat WITH ( > 'type' = 'test-filesystem', > 'path' = '/home/feng/flink-liyang-test/catalog_path', > 'default-database' = 'mydb'); [INFO] Execute statement succeeded. Flink SQL> use catalog mt_cat; [INFO] Execute statement succeeded h1. Prepare Source Table Flink SQL> CREATE TABLE json_source ( > order_id BIGINT, > user_id BIGINT, > user_name STRING, > order_created_at STRING, > payment_amount_cents BIGINT > ) WITH ( > 'format' = 'json', > 'source.monitor-interval' = '10s' > ); [INFO] Execute statement succeeded. Flink SQL> INSERT INTO json_source VALUES > (1001, 1, 'user1', '2024-06-19', 10), > (1002, 2, 'user2', '2024-06-19', 20), > (1003, 3, 'user3', '2024-06-19', 30), > (1004, 4, 'user4', '2024-06-19', 40), > (1005, 1, 'user1', '2024-06-20', 10), > (1006, 2, 'user2', '2024-06-20', 20), > (1007, 3, 'user3', '2024-06-20', 30), > (1008, 4, 'user4', '2024-06-20', 40); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: eeedf09027a4b6643b7dcbf0ed345803 h1. Continuous Mode Materialized Table CREATE MATERIALIZED TABLE continuous_users_shops PARTITIONED BY (ds) WITH ( 'format' = 'debezium-json', 'sink.rolling-policy.rollover-interval' = '10s', 'sink.rolling-policy.check-interval' = '10s' ) FRESHNESS = INTERVAL '30' SECOND AS SELECT user_id, ds, SUM (payment_amount_cents) AS payed_buy_fee_sum, SUM (1) AS PV FROM ( SELECT user_id, order_created_at AS ds, payment_amount_cents FROM json_source ) AS tmp GROUP BY user_id, ds; Before alter !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NmQ0NTI5NjNhYjlkMTM3NjQ3YjlkMTE0N2VmMjdlYzVfQUpmY3p5emx4NEJpRDFlOE0yWXdyVHN0V1NJM2k0cGhfVG9rZW46Ym94azRDR0UxTTZYbFpxeWhTa2VrdXJ0WFFmXzE3NDA2NTY2OTI6MTc0MDY2MDI5Ml9WNA|width=729,height=177! Add a field Flink SQL> Alter MATERIALIZED TABLE continuous_users_shops > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV, > LAST_VALUE(user_name) > FROM ( > SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; [INFO] Execute statement succeeded. The result is as expected !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NDhkNWM5ZjgwZWRhYzMzYmM1MjQ4NTg0NTRkNzc0OGFfQmM0MTg1WnJKbnRFS1A2ZXdub1AxTVo0azQwUVRkZ0xfVG9rZW46Ym94azRSUUUwZ0c0R05IVU1pWFp2dnNuR0doXzE3NDA2NTY2MTY6MTc0MDY2MDIxNl9WNA|width=752,height=252! h1. Full Mode Materialized Table With Partition CREATE MATERIALIZED TABLE full_users_shops PARTITIONED BY (ds) WITH ( 'format' = 'json', 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' ) FRESHNESS = INTERVAL '1' MINUTE REFRESH_MODE = FULL AS SELECT user_id, ds, SUM (payment_amount_cents) AS payed_buy_fee_sum, SUM (1) AS PV FROM ( SELECT user_id, order_created_at AS ds, payment_amount_cents FROM json_source ) AS tmp GROUP BY user_id, ds; Insert data Flink SQL> INSERT INTO json_source VALUES > (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10), > (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20), > (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30), > (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 654cf5d89d1f01ed509ad5578ebf6a8e Before alter !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YzI5Nzc5OTkyNGJjNjE2ZDcwMmFiMmIzNTIwN2E4MGNfeThSUG1vbmJoTkRDaFZwTXg3cmZaZENvNmZRcExDT0ZfVG9rZW46Ym94azR2MjdoVXhZWGtiaU1ad2tqQzBORmdjXzE3NDA2NTY3MjU6MTc0MDY2MDMyNV9WNA|width=738,height=105! Alter table Flink SQL> Alter MATERIALIZED TABLE full_users_shops > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV, > LAST_VALUE(user_name) > FROM ( > SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; The query is as expected Flink SQL> select * from full_users_shops; [INFO] Result retrieval cancelled. !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MzBlZGFhNjBlMTk4NDNiMGYzOTRhMzA4NTY4YzAwNzJfY1paSW9RVWNidlZLaHdrTlBrYnhWUkFrd2VqYlZZWDZfVG9rZW46Ym94azRyT2hnUHlpMnZZR005QUFkeGJ1ekFnXzE3NDA2NTY3NDI6MTc0MDY2MDM0Ml9WNA|width=728,height=81! Manually Refresh Historical Partition Flink SQL> ALTER MATERIALIZED TABLE full_users_shops REFRESH PARTITION(ds='2024-06-20'); {+}---------------------------------{-}{-}{+}--------------------------+ |job id|cluster info| {+}---------------------------------{-}{-}{+}--------------------------+ |6311c78b97c3e1d83c56d4a35612743b|{execution.target=remote}| {+}---------------------------------{-}{-}{+}--------------------------+ 1 row in set The data in partition '2024-06-20' has benn updated in the materialized table !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=Yzc5NzNkMGE1NmFlMThjZjAyYjZkYmJhZDJmZWFhZDVfZDBOclZyTk5US2xtNUxNdHdDN1prTEVvdG9Pdkh6dFBfVG9rZW46Ym94azRzcmhVYnpUbWI4cHUySnl2TUFzVXdjXzE3NDA2NTY3NjA6MTc0MDY2MDM2MF9WNA|width=732,height=124! h1. Full Mode Materialized Table Without Partition Flink SQL> CREATE MATERIALIZED TABLE full_users_shops3 > WITH ( > 'format' = 'json' > ) > FRESHNESS = INTERVAL '1' MINUTE > REFRESH_MODE = FULL > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV > FROM ( > SELECT user_id, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YTI3ZmNjYzk5YjgxOGU2MDk2MzU5NTM4NDk0OGQ3MzFfVU5nQnBVSzJUZjRwajJ0ZWYxbHU3bTlyMTd5M05IREpfVG9rZW46Ym94azRFd3M5c3Z0eWNLR3hmWWprb2dOQVp0XzE3NDA2NTY4OTA6MTc0MDY2MDQ5MF9WNA|width=750,height=182! Alter table Flink SQL> Alter MATERIALIZED TABLE full_users_shops3 > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV, > LAST_VALUE(user_name) > FROM ( > SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; The query meets expectations. Flink SQL> select * from full_users_shops3; [INFO] Result retrieval cancelled. !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=ODg4MTRjMWE0MGJlODUyYzBlYjMwZmFmYTcwM2FlYmJfNTVRaEdnSDJ0SGx0OFJ4cDgyRHppT01sZUgwaWhqZmZfVG9rZW46Ym94azRxTnR5WWtNZG1BT2Q1aXVKekt5RW5oXzE3NDA2NTY5MDU6MTc0MDY2MDUwNV9WNA|width=571,height=113! was (Author: JIRAUSER307080): h1. Conclusion The overall behavior meets expectations. h1. Environment Prepare ./bin/start-cluster.sh ./bin/sql-gateway.sh start ./bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083 Flink SQL> CREATE CATALOG mt_cat WITH ( > 'type' = 'test-filesystem', > 'path' = '/home/feng/flink-liyang-test/catalog_path', > 'default-database' = 'mydb'); [INFO] Execute statement succeeded. Flink SQL> use catalog mt_cat; [INFO] Execute statement succeeded h1. Prepare Source Table Flink SQL> CREATE TABLE json_source ( > order_id BIGINT, > user_id BIGINT, > user_name STRING, > order_created_at STRING, > payment_amount_cents BIGINT > ) WITH ( > 'format' = 'json', > 'source.monitor-interval' = '10s' > ); [INFO] Execute statement succeeded. Flink SQL> INSERT INTO json_source VALUES > (1001, 1, 'user1', '2024-06-19', 10), > (1002, 2, 'user2', '2024-06-19', 20), > (1003, 3, 'user3', '2024-06-19', 30), > (1004, 4, 'user4', '2024-06-19', 40), > (1005, 1, 'user1', '2024-06-20', 10), > (1006, 2, 'user2', '2024-06-20', 20), > (1007, 3, 'user3', '2024-06-20', 30), > (1008, 4, 'user4', '2024-06-20', 40); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: eeedf09027a4b6643b7dcbf0ed345803 h1. Continuous Mode Materialized Table CREATE MATERIALIZED TABLE continuous_users_shops PARTITIONED BY (ds) WITH ( 'format' = 'debezium-json', 'sink.rolling-policy.rollover-interval' = '10s', 'sink.rolling-policy.check-interval' = '10s' ) FRESHNESS = INTERVAL '30' SECOND AS SELECT user_id, ds, SUM (payment_amount_cents) AS payed_buy_fee_sum, SUM (1) AS PV FROM ( SELECT user_id, order_created_at AS ds, payment_amount_cents FROM json_source ) AS tmp GROUP BY user_id, ds; Before alter !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NmQ0NTI5NjNhYjlkMTM3NjQ3YjlkMTE0N2VmMjdlYzVfQUpmY3p5emx4NEJpRDFlOE0yWXdyVHN0V1NJM2k0cGhfVG9rZW46Ym94azRDR0UxTTZYbFpxeWhTa2VrdXJ0WFFmXzE3NDA2NTY2OTI6MTc0MDY2MDI5Ml9WNA|width=729,height=177! Add a field Flink SQL> Alter MATERIALIZED TABLE continuous_users_shops > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV, > LAST_VALUE(user_name) > FROM ( > SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; [INFO] Execute statement succeeded. The result is as expected !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NDhkNWM5ZjgwZWRhYzMzYmM1MjQ4NTg0NTRkNzc0OGFfQmM0MTg1WnJKbnRFS1A2ZXdub1AxTVo0azQwUVRkZ0xfVG9rZW46Ym94azRSUUUwZ0c0R05IVU1pWFp2dnNuR0doXzE3NDA2NTY2MTY6MTc0MDY2MDIxNl9WNA|width=752,height=252! h1. Full Mode Materialized Table With Partition CREATE MATERIALIZED TABLE full_users_shops PARTITIONED BY (ds) WITH ( 'format' = 'json', 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' ) FRESHNESS = INTERVAL '1' MINUTE REFRESH_MODE = FULL AS SELECT user_id, ds, SUM (payment_amount_cents) AS payed_buy_fee_sum, SUM (1) AS PV FROM ( SELECT user_id, order_created_at AS ds, payment_amount_cents FROM json_source ) AS tmp GROUP BY user_id, ds; Insert data Flink SQL> INSERT INTO json_source VALUES > (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10), > (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20), > (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30), > (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 654cf5d89d1f01ed509ad5578ebf6a8e Before alter !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YzI5Nzc5OTkyNGJjNjE2ZDcwMmFiMmIzNTIwN2E4MGNfeThSUG1vbmJoTkRDaFZwTXg3cmZaZENvNmZRcExDT0ZfVG9rZW46Ym94azR2MjdoVXhZWGtiaU1ad2tqQzBORmdjXzE3NDA2NTY3MjU6MTc0MDY2MDMyNV9WNA|width=738,height=105! Alter table Flink SQL> Alter MATERIALIZED TABLE full_users_shops > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV, > LAST_VALUE(user_name) > FROM ( > SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; The query is as expected Flink SQL> select * from full_users_shops; [INFO] Result retrieval cancelled. !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MzBlZGFhNjBlMTk4NDNiMGYzOTRhMzA4NTY4YzAwNzJfY1paSW9RVWNidlZLaHdrTlBrYnhWUkFrd2VqYlZZWDZfVG9rZW46Ym94azRyT2hnUHlpMnZZR005QUFkeGJ1ekFnXzE3NDA2NTY3NDI6MTc0MDY2MDM0Ml9WNA|width=728,height=81! Manually Refresh Historical Partition Flink SQL> ALTER MATERIALIZED TABLE full_users_shops REFRESH PARTITION(ds='2024-06-20'); +----------------------------------+---------------------------+ | job id | cluster info | +----------------------------------+---------------------------+ | 6311c78b97c3e1d83c56d4a35612743b | \{execution.target=remote} | +----------------------------------+---------------------------+ 1 row in set The data in partition '2024-06-20' has benn updated in the materialized table !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=Yzc5NzNkMGE1NmFlMThjZjAyYjZkYmJhZDJmZWFhZDVfZDBOclZyTk5US2xtNUxNdHdDN1prTEVvdG9Pdkh6dFBfVG9rZW46Ym94azRzcmhVYnpUbWI4cHUySnl2TUFzVXdjXzE3NDA2NTY3NjA6MTc0MDY2MDM2MF9WNA|width=732,height=124! h1. Full Mode Materialized Table Without Partition Flink SQL> CREATE MATERIALIZED TABLE full_users_shops3 > WITH ( > 'format' = 'json' > ) > FRESHNESS = INTERVAL '1' MINUTE > REFRESH_MODE = FULL > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV > FROM ( > SELECT user_id, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YTI3ZmNjYzk5YjgxOGU2MDk2MzU5NTM4NDk0OGQ3MzFfVU5nQnBVSzJUZjRwajJ0ZWYxbHU3bTlyMTd5M05IREpfVG9rZW46Ym94azRFd3M5c3Z0eWNLR3hmWWprb2dOQVp0XzE3NDA2NTY4OTA6MTc0MDY2MDQ5MF9WNA|width=750,height=182! Alter table Flink SQL> Alter MATERIALIZED TABLE full_users_shops3 > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV, > LAST_VALUE(user_name) > FROM ( > SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; The query meets expectations. Flink SQL> select * from full_users_shops3; [INFO] Result retrieval cancelled. !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=ODg4MTRjMWE0MGJlODUyYzBlYjMwZmFmYTcwM2FlYmJfNTVRaEdnSDJ0SGx0OFJ4cDgyRHppT01sZUgwaWhqZmZfVG9rZW46Ym94azRxTnR5WWtNZG1BT2Q1aXVKekt5RW5oXzE3NDA2NTY5MDU6MTc0MDY2MDUwNV9WNA|width=743,height=147! > Cross-team verification for FLIP-492 > ------------------------------------ > > Key: FLINK-37134 > URL: https://issues.apache.org/jira/browse/FLINK-37134 > Project: Flink > Issue Type: Sub-task > Reporter: Feng Jin > Assignee: Yang Li > Priority: Major > Fix For: 2.0.0 > > > This is for cross-team verification of the release 2.0 work item: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-492%3A+Support+Query+Modifications+for+Materialized+Tables] > As only the Alter Query capability has been achieved at present, we only need > to verify https://issues.apache.org/jira/browse/FLINK-36994. > > *Operation steps:* > 1. Refer to > quickstart(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/quickstart/) > to prepare the environment required for Materialized table: including > catalog store/test-filesystem plugin/standalone cluster/sql gateway. > 2. Create Materialized Tables in two modes (Continuous and Full). > 3. Modify the As query of Materialized Tables in two modes. > > *Verify The result:* > In Full mode, we need to verify the behavior of partitioned and > non-partitioned tables: > 1. For non-partitioned tablesafter waiting for the next refresh task to be > completed, verify whether the result after modifying the query meets > expectations. > 2. For partitioned tables, we need to verify that only the result of the > latest partition is consistent with the result after modifying the query, and > the historical partitions remain unchanged. Then, by manually refreshing, > confirm again that the partition after refreshing is generated by the > modified query. > > In Continuous mode, we need to verify the execution of the background > Continuous job after modification. > 1. After modifying the as query, the new job is generated by the new query. > 2. The new job does not resume from the state of historical jobs. > -- This message was sent by Atlassian Jira (v8.20.10#820010)