Yang Li edited comment on FLINK-37134 at 2/27/25 11:50 AM: ----------------------------------------------------------- h1. Conclusion The overall behavior meets expectations. h1. Environment Prepare ./bin/start-cluster.sh ./bin/sql-gateway.sh start ./bin/sql-client.sh gateway --endpoint [|] Flink SQL> CREATE CATALOG mt_cat WITH ( > 'type' = 'test-filesystem', > 'path' = '/home/feng/flink-liyang-test/catalog_path', > 'default-database' = 'mydb'); [INFO] Execute statement succeeded. Flink SQL> use catalog mt_cat; [INFO] Execute statement succeeded h1. Prepare Source Table Flink SQL> CREATE TABLE json_source ( > order_id BIGINT, > user_id BIGINT, > user_name STRING, > order_created_at STRING, > payment_amount_cents BIGINT > ) WITH ( > 'format' = 'json', > 'source.monitor-interval' = '10s' > ); [INFO] Execute statement succeeded. Flink SQL> INSERT INTO json_source VALUES > (1001, 1, 'user1', '2024-06-19', 10), > (1002, 2, 'user2', '2024-06-19', 20), > (1003, 3, 'user3', '2024-06-19', 30), > (1004, 4, 'user4', '2024-06-19', 40), > (1005, 1, 'user1', '2024-06-20', 10), > (1006, 2, 'user2', '2024-06-20', 20), > (1007, 3, 'user3', '2024-06-20', 30), > (1008, 4, 'user4', '2024-06-20', 40); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: eeedf09027a4b6643b7dcbf0ed345803 h1. Continuous Mode Materialized Table CREATE MATERIALIZED TABLE continuous_users_shops PARTITIONED BY (ds) WITH ( 'format' = 'debezium-json', 'sink.rolling-policy.rollover-interval' = '10s', 'sink.rolling-policy.check-interval' = '10s' ) FRESHNESS = INTERVAL '30' SECOND AS SELECT user_id, ds, SUM (payment_amount_cents) AS payed_buy_fee_sum, SUM (1) AS PV FROM ( SELECT user_id, order_created_at AS ds, payment_amount_cents FROM json_source ) AS tmp GROUP BY user_id, ds; Before alter !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NmQ0NTI5NjNhYjlkMTM3NjQ3YjlkMTE0N2VmMjdlYzVfQUpmY3p5emx4NEJpRDFlOE0yWXdyVHN0V1NJM2k0cGhfVG9rZW46Ym94azRDR0UxTTZYbFpxeWhTa2VrdXJ0WFFmXzE3NDA2NTY2OTI6MTc0MDY2MDI5Ml9WNA|width=729,height=177! Add a field Flink SQL> Alter MATERIALIZED TABLE continuous_users_shops > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV, > LAST_VALUE(user_name) > FROM ( > SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; [INFO] Execute statement succeeded. The result is as expected !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NDhkNWM5ZjgwZWRhYzMzYmM1MjQ4NTg0NTRkNzc0OGFfQmM0MTg1WnJKbnRFS1A2ZXdub1AxTVo0azQwUVRkZ0xfVG9rZW46Ym94azRSUUUwZ0c0R05IVU1pWFp2dnNuR0doXzE3NDA2NTY2MTY6MTc0MDY2MDIxNl9WNA|width=752,height=252! h1. Full Mode Materialized Table With Partition CREATE MATERIALIZED TABLE full_users_shops PARTITIONED BY (ds) WITH ( 'format' = 'json', 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' ) FRESHNESS = INTERVAL '1' MINUTE REFRESH_MODE = FULL AS SELECT user_id, ds, SUM (payment_amount_cents) AS payed_buy_fee_sum, SUM (1) AS PV FROM ( SELECT user_id, order_created_at AS ds, payment_amount_cents FROM json_source ) AS tmp GROUP BY user_id, ds; Insert data Flink SQL> INSERT INTO json_source VALUES > (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10), > (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20), > (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30), > (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 654cf5d89d1f01ed509ad5578ebf6a8e Before alter !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YzI5Nzc5OTkyNGJjNjE2ZDcwMmFiMmIzNTIwN2E4MGNfeThSUG1vbmJoTkRDaFZwTXg3cmZaZENvNmZRcExDT0ZfVG9rZW46Ym94azR2MjdoVXhZWGtiaU1ad2tqQzBORmdjXzE3NDA2NTY3MjU6MTc0MDY2MDMyNV9WNA|width=738,height=105! Alter table Flink SQL> Alter MATERIALIZED TABLE full_users_shops > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV, > LAST_VALUE(user_name) > FROM ( > SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; The query is as expected Flink SQL> select * from full_users_shops; [INFO] Result retrieval cancelled. !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MzBlZGFhNjBlMTk4NDNiMGYzOTRhMzA4NTY4YzAwNzJfY1paSW9RVWNidlZLaHdrTlBrYnhWUkFrd2VqYlZZWDZfVG9rZW46Ym94azRyT2hnUHlpMnZZR005QUFkeGJ1ekFnXzE3NDA2NTY3NDI6MTc0MDY2MDM0Ml9WNA|width=728,height=81! Manually Refresh Historical Partition Flink SQL> ALTER MATERIALIZED TABLE full_users_shops REFRESH PARTITION(ds='2024-06-20'); {+}---------------------------------{-}{-}{+}--------------------------+ |job id|cluster info| {+}---------------------------------{-}{-}{+}--------------------------+ |6311c78b97c3e1d83c56d4a35612743b|{execution.target=remote}| {+}---------------------------------{-}{-}{+}--------------------------+ 1 row in set The data in partition '2024-06-20' has benn updated in the materialized table !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=Yzc5NzNkMGE1NmFlMThjZjAyYjZkYmJhZDJmZWFhZDVfZDBOclZyTk5US2xtNUxNdHdDN1prTEVvdG9Pdkh6dFBfVG9rZW46Ym94azRzcmhVYnpUbWI4cHUySnl2TUFzVXdjXzE3NDA2NTY3NjA6MTc0MDY2MDM2MF9WNA|width=732,height=124! h1. Full Mode Materialized Table Without Partition Flink SQL> CREATE MATERIALIZED TABLE full_users_shops3 > WITH ( > 'format' = 'json' > ) > FRESHNESS = INTERVAL '1' MINUTE > REFRESH_MODE = FULL > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS PV > FROM ( > SELECT user_id, order_created_at AS ds, payment_amount_cents > FROM json_source > ) AS tmp > GROUP BY user_id, ds; !https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YTI3ZmNjYzk5YjgxOGU2MDk2MzU5NTM4NDk0OGQ3MzFfVU5nQnBVSzJUZjRwajJ0ZWYxbHU3bTlyMTd5M05IREpfVG9rZW46Ym94azRFd3M5c3Z0eWNLR3hmWWprb2dOQVp0XzE3NDA2NTY4OTA6MTc0MDY2MDQ5MF9WNA|width=750,height=182! Flink SQL> select * from full_users_shops3;
[INFO] Result retrieval cancelled.
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=ODg4MTRjMWE0MGJlODUyYzBlYjMwZmFmYTcwM2FlYmJfNTVRaEdnSDJ0SGx0OFJ4cDgyRHppT01sZUgwaWhqZmZfVG9rZW46Ym94azRxTnR5WWtNZG1BT2Q1aXVKekt5RW5oXzE3NDA2NTY5MDU6MTc0MDY2MDUwNV9WNA|width=743,height=147!

Cross-team verification for FLIP-492
------------------------------------

Key: FLINK-37134
URL: https://issues.apache.org/jira/browse/FLINK-37134
Project: Flink
Issue Type: Sub-task
Reporter: Feng Jin
Assignee: Yang Li
Priority: Major
Fix For: 2.0.0

This is for cross-team verification of the release 2.0 work item:
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-492%3A+Support+Query+Modifications+for+Materialized+Tables]
As only the Alter Query capability has been achieved at present, we only need
to verify https://issues.apache.org/jira/browse/FLINK-36994.

*Operation steps:*
1. Refer to > quickstart(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/quickstart/) > to prepare the environment required for Materialized table: including > catalog store/test-filesystem plugin/standalone cluster/sql gateway. > 2. Create Materialized Tables in two modes (Continuous and Full). > 3. Modify the As query of Materialized Tables in two modes. > > *Verify The result:* > In Full mode, we need to verify the behavior of partitioned and > non-partitioned tables: > 1. For non-partitioned tablesafter waiting for the next refresh task to be > completed, verify whether the result after modifying the query meets > expectations. > 2. For partitioned tables, we need to verify that only the result of the > latest partition is consistent with the result after modifying the query, and > the historical partitions remain unchanged. Then, by manually refreshing,
confirm again that the partition after refreshing is generated by the
modified query.

In Continuous mode, we need to verify the execution of the background
Continuous job after modification.
1. After modifying the as query, the new job is generated by the new query.
2. The new job does not resume from the state of historical jobs.