[ 
https://issues.apache.org/jira/browse/FLINK-37134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931128#comment-17931128
 ] 

Yang Li commented on FLINK-37134:
---------------------------------

h1. Conclusion
The overall behavior meets expectations.
h1. Environment Prepare
./bin/start-cluster.sh
./bin/sql-gateway.sh start
./bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083

Flink SQL> CREATE CATALOG mt_cat WITH (
>   'type' = 'test-filesystem',
>   'path' = '/home/feng/flink-liyang-test/catalog_path',
>   'default-database' = 'mydb');
[INFO] Execute statement succeeded.

Flink SQL> use catalog mt_cat;
[INFO] Execute statement succeeded
h1. Prepare Source Table
Flink SQL> CREATE TABLE json_source (
>   order_id BIGINT,
>   user_id BIGINT,
>   user_name STRING,
>   order_created_at STRING,
>   payment_amount_cents BIGINT
> ) WITH (
>   'format' = 'json',
>   'source.monitor-interval' = '10s'
> );
[INFO] Execute statement succeeded.

Flink SQL> INSERT INTO json_source VALUES 
>   (1001, 1, 'user1', '2024-06-19', 10),
>   (1002, 2, 'user2', '2024-06-19', 20),
>   (1003, 3, 'user3', '2024-06-19', 30),
>   (1004, 4, 'user4', '2024-06-19', 40),
>   (1005, 1, 'user1', '2024-06-20', 10),
>   (1006, 2, 'user2', '2024-06-20', 20),
>   (1007, 3, 'user3', '2024-06-20', 30),
>   (1008, 4, 'user4', '2024-06-20', 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: eeedf09027a4b6643b7dcbf0ed345803 
 
h1. Continuous Mode Materialized Table
CREATE MATERIALIZED TABLE continuous_users_shops
PARTITIONED BY (ds)
WITH (
  'format' = 'debezium-json',
  'sink.rolling-policy.rollover-interval' = '10s',
  'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
  user_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS PV
FROM (
  SELECT user_id, order_created_at AS ds, payment_amount_cents
    FROM json_source
  ) AS tmp
GROUP BY user_id, ds;
Before alter
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NmQ0NTI5NjNhYjlkMTM3NjQ3YjlkMTE0N2VmMjdlYzVfQUpmY3p5emx4NEJpRDFlOE0yWXdyVHN0V1NJM2k0cGhfVG9rZW46Ym94azRDR0UxTTZYbFpxeWhTa2VrdXJ0WFFmXzE3NDA2NTY2OTI6MTc0MDY2MDI5Ml9WNA|width=729,height=177!
Add a field
Flink SQL> Alter MATERIALIZED TABLE continuous_users_shops
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV,
>   LAST_VALUE(user_name)
> FROM (
>   SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
>     FROM json_source
>   ) AS tmp
> GROUP BY user_id, ds;
[INFO] Execute statement succeeded.
The result is as expected
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NDhkNWM5ZjgwZWRhYzMzYmM1MjQ4NTg0NTRkNzc0OGFfQmM0MTg1WnJKbnRFS1A2ZXdub1AxTVo0azQwUVRkZ0xfVG9rZW46Ym94azRSUUUwZ0c0R05IVU1pWFp2dnNuR0doXzE3NDA2NTY2MTY6MTc0MDY2MDIxNl9WNA|width=752,height=252!
h1. Full Mode Materialized Table With Partition
CREATE MATERIALIZED TABLE full_users_shops
PARTITIONED BY (ds)
WITH (
  'format' = 'json',
  'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' MINUTE
REFRESH_MODE = FULL
AS SELECT
  user_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS PV
FROM (
  SELECT user_id, order_created_at AS ds, payment_amount_cents
  FROM json_source
) AS tmp
GROUP BY user_id, ds;
Insert data
Flink SQL> INSERT INTO json_source VALUES 
>   (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
>   (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
>   (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
>   (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 654cf5d89d1f01ed509ad5578ebf6a8e
Before alter
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YzI5Nzc5OTkyNGJjNjE2ZDcwMmFiMmIzNTIwN2E4MGNfeThSUG1vbmJoTkRDaFZwTXg3cmZaZENvNmZRcExDT0ZfVG9rZW46Ym94azR2MjdoVXhZWGtiaU1ad2tqQzBORmdjXzE3NDA2NTY3MjU6MTc0MDY2MDMyNV9WNA|width=738,height=105!
Alter table
Flink SQL> Alter MATERIALIZED TABLE full_users_shops
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV,
>   LAST_VALUE(user_name)
> FROM (
>   SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
>     FROM json_source
>   ) AS tmp
> GROUP BY user_id, ds;
 
The query is as expected
Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.
 
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MzBlZGFhNjBlMTk4NDNiMGYzOTRhMzA4NTY4YzAwNzJfY1paSW9RVWNidlZLaHdrTlBrYnhWUkFrd2VqYlZZWDZfVG9rZW46Ym94azRyT2hnUHlpMnZZR005QUFkeGJ1ekFnXzE3NDA2NTY3NDI6MTc0MDY2MDM0Ml9WNA|width=728,height=81!
Manually Refresh Historical Partition
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops REFRESH 
PARTITION(ds='2024-06-20');
+----------------------------------+---------------------------+
|                           job id |              cluster info |
+----------------------------------+---------------------------+
| 6311c78b97c3e1d83c56d4a35612743b | \{execution.target=remote} |
+----------------------------------+---------------------------+
1 row in set
The data in partition '2024-06-20' has benn updated in the materialized table
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=Yzc5NzNkMGE1NmFlMThjZjAyYjZkYmJhZDJmZWFhZDVfZDBOclZyTk5US2xtNUxNdHdDN1prTEVvdG9Pdkh6dFBfVG9rZW46Ym94azRzcmhVYnpUbWI4cHUySnl2TUFzVXdjXzE3NDA2NTY3NjA6MTc0MDY2MDM2MF9WNA|width=732,height=124!
h1. Full Mode Materialized Table Without Partition
 
Flink SQL> CREATE MATERIALIZED TABLE full_users_shops3
> WITH (
>   'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV
> FROM (
>   SELECT user_id, order_created_at AS ds, payment_amount_cents
>   FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
 
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YTI3ZmNjYzk5YjgxOGU2MDk2MzU5NTM4NDk0OGQ3MzFfVU5nQnBVSzJUZjRwajJ0ZWYxbHU3bTlyMTd5M05IREpfVG9rZW46Ym94azRFd3M5c3Z0eWNLR3hmWWprb2dOQVp0XzE3NDA2NTY4OTA6MTc0MDY2MDQ5MF9WNA|width=750,height=182!
Alter table
Flink SQL> Alter MATERIALIZED TABLE full_users_shops3
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV,
>   LAST_VALUE(user_name)
> FROM (
>   SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
>     FROM json_source
>   ) AS tmp
> GROUP BY user_id, ds;
The query meets expectations.
Flink SQL> select * from full_users_shops3;
[INFO] Result retrieval cancelled.
 
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=ODg4MTRjMWE0MGJlODUyYzBlYjMwZmFmYTcwM2FlYmJfNTVRaEdnSDJ0SGx0OFJ4cDgyRHppT01sZUgwaWhqZmZfVG9rZW46Ym94azRxTnR5WWtNZG1BT2Q1aXVKekt5RW5oXzE3NDA2NTY5MDU6MTc0MDY2MDUwNV9WNA|width=743,height=147!

> Cross-team verification for FLIP-492
> ------------------------------------
>
>                 Key: FLINK-37134
>                 URL: https://issues.apache.org/jira/browse/FLINK-37134
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Feng Jin
>            Assignee: Yang Li
>            Priority: Major
>             Fix For: 2.0.0
>
>
> This is for cross-team verification of the release 2.0 work item:
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-492%3A+Support+Query+Modifications+for+Materialized+Tables]
> As only the Alter Query capability has been achieved at present, we only need 
> to verify https://issues.apache.org/jira/browse/FLINK-36994.
>  
> *Operation steps:*
> 1. Refer to 
> quickstart(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/quickstart/)
>  to prepare the environment required for Materialized table: including 
> catalog store/test-filesystem plugin/standalone cluster/sql gateway.
> 2. Create Materialized Tables in two modes (Continuous and Full).
> 3. Modify the As query of Materialized Tables in two modes.
>  
> *Verify The result:* 
> In Full mode, we need to verify the behavior of partitioned and 
> non-partitioned tables:
> 1. For non-partitioned tablesafter waiting for the next refresh task to be 
> completed, verify whether the result after modifying the query meets 
> expectations.
> 2. For partitioned tables, we need to verify that only the result of the 
> latest partition is consistent with the result after modifying the query, and 
> the historical partitions remain unchanged. Then, by manually refreshing, 
> confirm again that the partition after refreshing is generated by the 
> modified query.
>  
> In Continuous mode, we need to verify the execution of the background 
> Continuous job after modification.
> 1. After modifying the as query, the new job is generated by the new query.
> 2. The new job does not resume from the state of historical jobs.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to