sorry,????????????????????????group agg.
????????tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??????????????????????????????????????????????.


------------------ ???????? ------------------
??????:&nbsp;"Benchao Li"<[email protected]&gt;;
????????:&nbsp;2020??7??6??(??????) ????12:52
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: ??????FLINKSQL1.10????????????UV



????????SQL??????????????????????????????????????????
??????????????????????state retention[1]??????????????????????????????????????

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

x <[email protected]&gt; ??2020??7??6?????? ????11:15??????

&gt; ??????1.10.1??????sink????????????????window??????count
&gt; distinct??????????????????????????????????window??????count
&gt; 
distinct??????????????????????????????????????????????window????????????????group&amp;nbsp;DATE_FORMAT(rowtm,
&gt; 'yyyy-MM-dd') ????sql??????????????????????????
&gt; val rt_totaluv_view : Table = tabEnv.sqlQuery(
&gt;&nbsp;&nbsp; """
&gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT MAX(DATE_FORMAT(rowtm, 'yyyy-MM-dd 
HH:mm:00'))
&gt; time_str,COUNT(DISTINCT userkey) uv
&gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM source
&gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY DATE_FORMAT(rowtm, 'yyyy-MM-dd')
&gt;&nbsp;&nbsp;&nbsp;&nbsp; """)
&gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
&gt;
&gt; val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
&gt;&nbsp;&nbsp; .filter( line =&amp;gt; line._1 == true ).map( line =&amp;gt; 
line._2 )
&gt;
&gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
&gt;
&gt; tabEnv.sqlUpdate(
&gt;&nbsp;&nbsp; s"""
&gt;&nbsp;&nbsp;&nbsp;&nbsp; INSERT INTO mysql_totaluv
&gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT _1,MAX(_2)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM $totaluvTabTmp
&gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY _1
&gt;&nbsp;&nbsp;&nbsp;&nbsp; """)
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:&amp;nbsp;"Benchao Li"<[email protected]&amp;gt;;
&gt; ????????:&amp;nbsp;2020??7??3??(??????) ????9:47
&gt; ??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;Re: ??????FLINKSQL1.10????????????UV
&gt;
&gt;
&gt;
&gt; ??????????????????????????????????????????[1]??????window??????count 
distinct??????????????
&gt; ??????????1.11??????????
&gt;
&gt; [1] https://issues.apache.org/jira/browse/FLINK-17942
&gt;
&gt; x <[email protected]&amp;gt; ??2020??7??3?????? ????4:34??????
&gt;
&gt; &amp;gt; 
????????????????????????????????checkpoint??????????????????????????????????
&gt; &amp;gt;
&gt; &amp;gt;
&gt; 
????????tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??????????????????????key????????????????????????????
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 
------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------
&gt; &amp;gt; ??????:&amp;amp;nbsp;"Jark Wu"<[email protected]&amp;amp;gt;;
&gt; &amp;gt; ????????:&amp;amp;nbsp;2020??6??18??(??????) ????12:16
&gt; &amp;gt; 
??????:&amp;amp;nbsp;"user-zh"<[email protected]&amp;amp;gt;;
&gt; &amp;gt;
&gt; &amp;gt; ????:&amp;amp;nbsp;Re: ??????FLINKSQL1.10????????????UV
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ??????????????????????????????
&gt; &amp;gt;
&gt; &amp;gt; On Thu, 18 Jun 2020 at 10:34, x <[email protected]&amp;amp;gt; 
wrote:
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;gt; 
??????1.10??????????????????,???????????????????????????????
&gt; &amp;gt; &amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; """
&gt; &amp;gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; SELECT
&gt; MAX(DATE_FORMAT(ts, 'yyyy-MM-dd
&gt; &amp;gt; HH:mm:00'))
&gt; &amp;gt; &amp;amp;gt; time_str,COUNT(DISTINCT userkey) uv
&gt; &amp;gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FROM
&gt; user_behavior&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; GROUP BY
&gt; &amp;gt; DATE_FORMAT(ts, 
'yyyy-MM-dd')&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; """)
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; val
&gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; 
.filter(line=&amp;amp;amp;gt;line._1==true).map(line=&amp;amp;amp;gt;line._2)
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
&gt; &amp;gt; &amp;amp;gt; tabEnv.sqlUpdate(
&gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; s"""
&gt; &amp;gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; INSERT INTO
&gt; rt_totaluv
&gt; &amp;gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; SELECT 
_1,MAX(_2)
&gt; &amp;gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FROM $res
&gt; &amp;gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; GROUP BY _1
&gt; &amp;gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; """)
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; 
------------------&amp;amp;amp;nbsp;????????&amp;amp;amp;nbsp;------------------
&gt; &amp;gt; &amp;amp;gt; ??????:&amp;amp;amp;nbsp;"Jark 
Wu"<[email protected]&amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; ????????:&amp;amp;amp;nbsp;2020??6??17??(??????) 
????1:55
&gt; &amp;gt; &amp;amp;gt; 
??????:&amp;amp;amp;nbsp;"user-zh"<[email protected]
&gt; &amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; ????:&amp;amp;amp;nbsp;Re: 
??????FLINKSQL1.10????????????UV
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; ?? Flink 1.11 ????????????????????
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; CREATE TABLE mysql (
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; time_str STRING,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; uv BIGINT,
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; PRIMARY KEY 
(ts) NOT ENFORCED
&gt; &amp;gt; &amp;amp;gt; ) WITH (
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 'connector' = 
'jdbc',
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 'url' =
&gt; 'jdbc:mysql://localhost:3306/mydatabase',
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 'table-name' = 
'myuv'
&gt; &amp;gt; &amp;amp;gt; );
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; INSERT INTO mysql
&gt; &amp;gt; &amp;amp;gt; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')),
&gt; &amp;gt; COUNT(DISTINCT&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; user_id)
&gt; &amp;gt; &amp;amp;gt; FROM user_behavior;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; On Wed, 17 Jun 2020 at 13:49, x 
<[email protected]&amp;amp;amp;gt;
&gt; wrote:
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 
??????????????????????"??????"????????????????????????????????????????????UV??
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; sink??????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; tm uv
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:46:00 10000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:47:00 20000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:48:00 30000
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; group by ??????????????????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt;
&gt; 
------------------&amp;amp;amp;amp;nbsp;????????&amp;amp;amp;amp;nbsp;------------------
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 
??????:&amp;amp;amp;amp;nbsp;"Benchao Li"<
&gt; [email protected]
&gt; &amp;gt; &amp;amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 
????????:&amp;amp;amp;amp;nbsp;2020??6??17??(??????) ????11:46
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 
??????:&amp;amp;amp;amp;nbsp;"user-zh"<
&gt; [email protected]
&gt; &amp;gt; &amp;amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ????:&amp;amp;amp;amp;nbsp;Re: 
??????FLINKSQL1.10????????????UV
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; Hi??
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ??????????????????????????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 1. ??????????group by + mini batch
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2. window???? + fast emit
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ????#1??group
&gt; by????????????????????????????????????????????????DATE_FORMAT(rowtm,
&gt; &amp;gt; 'yyyy-MM-dd')??
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ??????????????????????????????state 
retention??????????????????????[1]
&gt; ????????mini
&gt; &amp;gt; batch????????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ??????[2] ????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 
????#2????????????????????????tumble????????????????????????????????????????????????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; fast
&gt; &amp;gt; 
emit????????????????????experimental??feature????????????????????????????????????????????????????????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; table.exec.emit.early-fire.enabled 
= true
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; table.exec.emit.early-fire.delay = 
60 s
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; [1]
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; [2]
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; x 
<[email protected]&amp;amp;amp;amp;gt;
&gt; ??2020??6??17?????? ????11:14??????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; 
??????????????????????0??????????????UV??????????????????????????????????????UV??????????????????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; CREATE VIEW 
uv_per_10min AS
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 
SELECT&amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 
&amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; MAX(DATE_FORMAT(proctime&amp;amp;amp;amp;amp;nbsp;,
&gt; &amp;gt; &amp;amp;gt; 'yyyy-MM-dd
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 
HH:mm:00'))&amp;amp;amp;amp;amp;nbsp;OVER w
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; AS
&gt; time_str,&amp;amp;amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 
&amp;amp;amp;amp;amp;nbsp;
&gt; COUNT(DISTINCT user_id) OVER
&gt; &amp;gt; w AS uv
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; FROM 
user_behavior
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; WINDOW w AS 
(ORDER BY proctime
&gt; ROWS BETWEEN
&gt; &amp;gt; UNBOUNDED
&gt; &amp;gt; &amp;amp;gt; PRECEDING AND
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; CURRENT ROW);
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 
??????????????????????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; PARTITION BY
&gt; DATE_FORMAT(rowtm, 'yyyy-MM-dd')
&gt; &amp;gt; &amp;amp;gt; ??????????????????????????????????
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; 
PS??1.10??????????DDL??????????CREATE
&gt; VIEW??
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; ????
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li



-- 

Best,
Benchao Li

回复