[
https://issues.apache.org/jira/browse/FLINK-38678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18039082#comment-18039082
]
Feng Jin commented on FLINK-38678:
----------------------------------
[~ruanhang1993] Please assign this Jira to me and I will follow it up.
> Release Testing: Validate FLIP-551: Make FRESHNESS Optional for Materialized
> Tables
> -----------------------------------------------------------------------------------
>
> Key: FLINK-38678
> URL: https://issues.apache.org/jira/browse/FLINK-38678
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Reporter: Ramin Gharib
> Priority: Blocker
> Labels: release-testing
> Fix For: 2.2.0
>
>
> You can use the Flink SQL CLI to test the queries:
> 1. Create Source table
> {code:java}
> CREATE TABLE datagenSource (
> order_id BIGINT,
> order_number VARCHAR(20),
> user_id BIGINT,
> shop_id BIGINT,
> product_id BIGINT,
> status BIGINT,
> order_type BIGINT,
> order_created_at TIMESTAMP(3),
> payment_amount_cents BIGINT
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10'
> ); {code}
> 2. Create the materialized table (without freshness)
> {code:java}
> CREATE MATERIALIZED TABLE users_shops
> PARTITIONED BY (ds)
> WITH(
> 'connector' = 'blackhole'
> )
> REFRESH_MODE = CONTINUOUS
> AS SELECT
> user_id,
> shop_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS
> ds, payment_amount_cents FROM datagenSource
> ) AS tmp
> GROUP BY user_id, shop_id, ds; {code}
> 3. run
> {noformat}
> SHOW CREATE MATERIALIZED TABLE users_shops;{noformat}
> And you should see the default FRESHNESS value (3 MINUTE)
> {code:java}
> CREATE MATERIALIZED TABLE `default_catalog`.`default_database`.`users_shops`
> PARTITIONED BY (`ds`)
> WITH (
> 'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '3' MINUTE
> REFRESH_MODE = CONTINUOUS
> AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`,
> SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
> FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
> DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
> `datagenSource`.`payment_amount_cents`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`)
> AS `tmp`
> GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds` {code}
> 4. create materialized table with freshness clause
> {code:java}
> CREATE MATERIALIZED TABLE users_shops_explicit
> PARTITIONED BY (ds)
> WITH(
> 'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '10' SECOND
> REFRESH_MODE = CONTINUOUS
> AS SELECT
> user_id,
> shop_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS
> ds, payment_amount_cents FROM datagenSource
> ) AS tmp
> GROUP BY user_id, shop_id, ds; {code}
>
> 5. run
> {noformat}
> SHOW CREATE MATERIALIZED TABLE users_shops_explicit;{noformat}
> should yield
> {code:java}
> CREATE MATERIALIZED TABLE
> `default_catalog`.`default_database`.`users_shops_explicit`
> PARTITIONED BY (`ds`)
> WITH (
> 'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '10' SECOND
> REFRESH_MODE = CONTINUOUS
> AS SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`,
> SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`
> FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
> DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
> `datagenSource`.`payment_amount_cents`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`)
> AS `tmp`
> GROUP BY `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`{code}
> 6. create materialized table without FRESHNESS and REFRESH_MODE
> {code:java}
> CREATE MATERIALIZED TABLE users_shops_continuous
> WITH(
> 'connector' = 'blackhole'
> )
> AS SELECT
> user_id,
> shop_id,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM datagenSource
> GROUP BY user_id, shop_id; {code}
> 7. run
> {code:java}
> SHOW CREATE MATERIALIZED TABLE users_shops_continuous{code}
> should include the FRESHNESS and REFRESH_MODE
> {code:java}
> CREATE MATERIALIZED TABLE
> `default_catalog`.`default_database`.`users_shops_continuous`
> WITH (
> 'connector' = 'blackhole'
> )
> FRESHNESS = INTERVAL '3' MINUTE
> REFRESH_MODE = CONTINUOUS
> AS SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`,
> SUM(`datagenSource`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS
> `pv`
> FROM `default_catalog`.`default_database`.`datagenSource` AS `datagenSource`
> GROUP BY `datagenSource`.`user_id`, `datagenSource`.`shop_id`{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)