[ 
https://issues.apache.org/jira/browse/FLINK-38678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18039384#comment-18039384
 ] 

Ramin Gharib commented on FLINK-38678:
--------------------------------------

[~hackergin] Thanks for testing this out!

> Release Testing: Validate FLIP-551: Make FRESHNESS Optional for Materialized 
> Tables
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-38678
>                 URL: https://issues.apache.org/jira/browse/FLINK-38678
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>            Reporter: Ramin Gharib
>            Assignee: Feng Jin
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 2.2.0
>
>         Attachments: 截屏2025-11-19 21.11.03.png, 截屏2025-11-19 21.11.14.png, 
> 截屏2025-11-19 21.11.49.png
>
>
> You can use the Flink SQL CLI to test the queries:
> 1. Create Source table
> {code:java}
> CREATE TABLE datagenSource (
>   order_id BIGINT,
>   order_number VARCHAR(20),
>   user_id BIGINT,
>   shop_id BIGINT,
>   product_id BIGINT,
>   status BIGINT,
>   order_type BIGINT,
>   order_created_at TIMESTAMP(3),
>   payment_amount_cents BIGINT
> )
> WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '10'
> ); {code}
> 2. Create the materialized table (without freshness)
> {code:java}
> CREATE MATERIALIZED TABLE users_shops
> PARTITIONED BY (ds)
> WITH(
>    'connector' = 'blackhole'
> )
> REFRESH_MODE = CONTINUOUS
> AS SELECT
>   user_id,
>   shop_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
>  FROM (
>     SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS 
> ds, payment_amount_cents FROM datagenSource
>  ) AS tmp
>  GROUP BY user_id, shop_id, ds; {code}
> 3. run
> {noformat}
> SHOW CREATE MATERIALIZED TABLE users_shops;{noformat}
> And you should see the default FRESHNESS value (3 MINUTE)
> {code:java}
> CREATE MATERIALIZED TABLE `default_catalog`.`default_database`.`users_shops`
> PARTITIONED BY (`ds`)
> WITH (
>   'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '3' MINUTE
> REFRESH_MODE = CONTINUOUS
> AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, 
> SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
> FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
> DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
> `datagenSource`.`payment_amount_cents`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`) 
> AS `tmp`
> GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds` {code}
> 4. create materialized table with freshness clause
> {code:java}
> CREATE MATERIALIZED TABLE users_shops_explicit
> PARTITIONED BY (ds)
> WITH(
>    'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '10' SECOND
> REFRESH_MODE = CONTINUOUS
> AS SELECT
>   user_id,
>   shop_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
>  FROM (
>     SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS 
> ds, payment_amount_cents FROM datagenSource
>  ) AS tmp
>  GROUP BY user_id, shop_id, ds; {code}
>  
> 5. run
> {noformat}
> SHOW CREATE MATERIALIZED TABLE users_shops_explicit;{noformat}
> should yield
> {code:java}
> CREATE MATERIALIZED TABLE 
> `default_catalog`.`default_database`.`users_shops_explicit`
> PARTITIONED BY (`ds`)
> WITH (
>   'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '10' SECOND
> REFRESH_MODE = CONTINUOUS
> AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, 
> SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
> FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
> DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
> `datagenSource`.`payment_amount_cents`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`) 
> AS `tmp`
> GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`{code}
> 6. create materialized table without FRESHNESS and REFRESH_MODE
> {code:java}
> CREATE MATERIALIZED TABLE users_shops_continuous
> WITH(
>    'connector' = 'blackhole'
> )
> AS SELECT
>   user_id,
>   shop_id,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
>  FROM datagenSource
>  GROUP BY user_id, shop_id; {code}
> 7. run
> {code:java}
> SHOW CREATE MATERIALIZED TABLE users_shops_continuous{code}
> should include the FRESHNESS and REFRESH_MODE
> {code:java}
> CREATE MATERIALIZED TABLE 
> `default_catalog`.`default_database`.`users_shops_continuous`
> WITH (
>   'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '3' MINUTE
> REFRESH_MODE = CONTINUOUS
> AS SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
> SUM(`datagenSource`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS 
> `pv`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`
> GROUP BY `datagenSource`.`user_id`, `datagenSource`.`shop_id`{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to