??blinkval setttings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
------------------ ???????? ------------------ ??????: "Benchao Li"<[email protected]>; ????????: 2020??7??6??(??????) ????11:11 ??????: "user-zh"<[email protected]>; ????: Re: ??????FLINKSQL1.10????????????UV ????????????????????????????????blink planner???? x <[email protected]> ??2020??7??6?????? ????1:24?????? > sorry,????????????????????????group agg. > > ????????tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??????????????????????????????????????????????. > > > ------------------&nbsp;????????&nbsp;------------------ > ??????:&nbsp;"Benchao Li"<[email protected]&gt;; > ????????:&nbsp;2020??7??6??(??????) ????12:52 > ??????:&nbsp;"user-zh"<[email protected]&gt;; > > ????:&nbsp;Re: ??????FLINKSQL1.10????????????UV > > > > ????????SQL?????????????????????????????????????????? > ??????????????????????state retention[1]?????????????????????????????????????? > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time > > x <[email protected]&gt; ??2020??7??6?????? ????11:15?????? > > &gt; ??????1.10.1??????sink????????????????window??????count > &gt; distinct??????????????????????????????????window??????count > &gt; > distinct??????????????????????????????????????????????window????????????????group&amp;nbsp;DATE_FORMAT(rowtm, > &gt; 'yyyy-MM-dd') ????sql?????????????????????????? > &gt; val rt_totaluv_view : Table = tabEnv.sqlQuery( > &gt;&nbsp;&nbsp; """ > &gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT MAX(DATE_FORMAT(rowtm, 'yyyy-MM-dd > HH:mm:00')) > &gt; time_str,COUNT(DISTINCT userkey) uv > &gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM source > &gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') > &gt;&nbsp;&nbsp;&nbsp;&nbsp; """) > &gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) > &gt; > &gt; val totaluvTmp = > tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) > &gt;&nbsp;&nbsp; .filter( line =&amp;gt; line._1 == true ).map( line > =&amp;gt; line._2 ) > &gt; > &gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) > &gt; > &gt; tabEnv.sqlUpdate( > &gt;&nbsp;&nbsp; s""" > &gt;&nbsp;&nbsp;&nbsp;&nbsp; INSERT INTO mysql_totaluv > &gt;&nbsp;&nbsp;&nbsp;&nbsp; SELECT _1,MAX(_2) > &gt;&nbsp;&nbsp;&nbsp;&nbsp; FROM $totaluvTabTmp > &gt;&nbsp;&nbsp;&nbsp;&nbsp; GROUP BY _1 > &gt;&nbsp;&nbsp;&nbsp;&nbsp; """) > &gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------ > &gt; ??????:&amp;nbsp;"Benchao Li"<[email protected]&amp;gt;; > &gt; ????????:&amp;nbsp;2020??7??3??(??????) ????9:47 > &gt; ??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;; > &gt; > &gt; ????:&amp;nbsp;Re: ??????FLINKSQL1.10????????????UV > &gt; > &gt; > &gt; > &gt; ??????????????????????????????????????????[1]??????window??????count distinct?????????????? > &gt; ??????????1.11?????????? > &gt; > &gt; [1] https://issues.apache.org/jira/browse/FLINK-17942 > &gt; > &gt; x <[email protected]&amp;gt; ??2020??7??3?????? ????4:34?????? > &gt; > &gt; &amp;gt; ????????????????????????????????checkpoint?????????????????????????????????? > &gt; &amp;gt; > &gt; &amp;gt; > &gt; > ????????tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??????????????????????key???????????????????????????? > &gt; &amp;gt; > &gt; &amp;gt; > &gt; &amp;gt; > &gt; &amp;gt; > &gt; &amp;gt; > ------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------ > &gt; &amp;gt; ??????:&amp;amp;nbsp;"Jark Wu"<[email protected]&amp;amp;gt;; > &gt; &amp;gt; ????????:&amp;amp;nbsp;2020??6??18??(??????) ????12:16 > &gt; &amp;gt; ??????:&amp;amp;nbsp;"user-zh"<[email protected] > &amp;amp;gt;; > &gt; &amp;gt; > &gt; &amp;gt; ????:&amp;amp;nbsp;Re: ??????FLINKSQL1.10????????????UV > &gt; &amp;gt; > &gt; &amp;gt; > &gt; &amp;gt; > &gt; &amp;gt; ?????????????????????????????? > &gt; &amp;gt; > &gt; &amp;gt; On Thu, 18 Jun 2020 at 10:34, x <[email protected]&amp;amp;gt; > wrote: > &gt; &amp;gt; > &gt; &amp;gt; &amp;amp;gt; ??????1.10??????????????????,??????????????????????????????? > &gt; &amp;gt; &amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( > &gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; """ > &gt; &amp;gt; > &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; SELECT > &gt; MAX(DATE_FORMAT(ts, 'yyyy-MM-dd > &gt; &amp;gt; HH:mm:00')) > &gt; &amp;gt; &amp;amp;gt; time_str,COUNT(DISTINCT userkey) uv > &gt; &amp;gt; > &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FROM > &gt; user_behavior&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; GROUP BY > &gt; &amp;gt; DATE_FORMAT(ts, > 'yyyy-MM-dd')&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; """) > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; val > &gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > &gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; > &gt; &amp;gt; > .filter(line=&amp;amp;amp;gt;line._1==true).map(line=&amp;amp;amp;gt;line._2) > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream) > &gt; &amp;gt; &amp;amp;gt; tabEnv.sqlUpdate( > &gt; &amp;gt; &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp; s""" > &gt; &amp;gt; > &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; INSERT > INTO > &gt; rt_totaluv > &gt; &amp;gt; > &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; SELECT > _1,MAX(_2) > &gt; &amp;gt; > &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FROM > $res > &gt; &amp;gt; > &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; GROUP > BY _1 > &gt; &amp;gt; > &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; """) > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; > &gt; > ------------------&amp;amp;amp;nbsp;????????&amp;amp;amp;nbsp;------------------ > &gt; &amp;gt; &amp;amp;gt; ??????:&amp;amp;amp;nbsp;"Jark Wu"< > [email protected]&amp;amp;amp;gt;; > &gt; &amp;gt; &amp;amp;gt; ????????:&amp;amp;amp;nbsp;2020??6??17??(??????) ????1:55 > &gt; &amp;gt; &amp;amp;gt; ??????:&amp;amp;amp;nbsp;"user-zh"< > [email protected] > &gt; &amp;amp;amp;gt;; > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; ????:&amp;amp;amp;nbsp;Re: ??????FLINKSQL1.10????????????UV > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; ?? Flink 1.11 ???????????????????? > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; CREATE TABLE mysql ( > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; time_str > STRING, > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; uv BIGINT, > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; PRIMARY > KEY (ts) NOT ENFORCED > &gt; &amp;gt; &amp;amp;gt; ) WITH ( > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > 'connector' = 'jdbc', > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 'url' = > &gt; 'jdbc:mysql://localhost:3306/mydatabase', > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > 'table-name' = 'myuv' > &gt; &amp;gt; &amp;amp;gt; ); > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; INSERT INTO mysql > &gt; &amp;gt; &amp;amp;gt; SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd > HH:mm:00')), > &gt; &amp;gt; COUNT(DISTINCT&amp;amp;amp;nbsp; > &gt; &amp;gt; &amp;amp;gt; user_id) > &gt; &amp;gt; &amp;amp;gt; FROM user_behavior; > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; On Wed, 17 Jun 2020 at 13:49, x < > [email protected]&amp;amp;amp;gt; > &gt; wrote: > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > ??????????????????????"??????"????????????????????????????????????????????UV?? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; sink?????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; tm uv > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:46:00 10000 > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:47:00 20000 > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2020/06/17 13:48:00 30000 > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; group by ?????????????????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; > &gt; > ------------------&amp;amp;amp;amp;nbsp;????????&amp;amp;amp;amp;nbsp;------------------ > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > ??????:&amp;amp;amp;amp;nbsp;"Benchao Li"< > &gt; [email protected] > &gt; &amp;gt; &amp;amp;amp;amp;gt;; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > ????????:&amp;amp;amp;amp;nbsp;2020??6??17??(??????) ????11:46 > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > ??????:&amp;amp;amp;amp;nbsp;"user-zh"< > &gt; [email protected] > &gt; &amp;gt; &amp;amp;amp;amp;gt;; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ????:&amp;amp;amp;amp;nbsp;Re: > ??????FLINKSQL1.10????????????UV > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; Hi?? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ?????????????????????????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 1. ??????????group by + mini batch > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; 2. window???? + fast emit > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ????#1??group > &gt; by????????????????????????????????????????????????DATE_FORMAT(rowtm, > &gt; &amp;gt; 'yyyy-MM-dd')?? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ??????????????????????????????state > retention??????????????????????[1] > &gt; ????????mini > &gt; &amp;gt; batch???????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; ??????[2] ???????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > ????#2????????????????????????tumble???????????????????????????????????????????????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; fast > &gt; &amp;gt; > emit????????????????????experimental??feature???????????????????????????????????????????????????????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > table.exec.emit.early-fire.enabled = true > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > table.exec.emit.early-fire.delay = 60 s > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; [1] > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; > &gt; > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; [2] > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; > &gt; &amp;gt; > &gt; > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; x <[email protected] > &amp;amp;amp;amp;gt; > &gt; ??2020??6??17?????? ????11:14?????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; > &gt; &amp;gt; ??????????????????????0??????????????UV??????????????????????????????????????UV?????????????????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; CREATE > VIEW uv_per_10min AS > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; > SELECT&amp;amp;amp;amp;amp;nbsp; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; > &amp;amp;amp;amp;amp;nbsp; > &gt; &amp;gt; MAX(DATE_FORMAT(proctime&amp;amp;amp;amp;amp;nbsp;, > &gt; &amp;gt; &amp;amp;gt; 'yyyy-MM-dd > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; > HH:mm:00'))&amp;amp;amp;amp;amp;nbsp;OVER w > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; AS > &gt; time_str,&amp;amp;amp;amp;amp;nbsp; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; > &amp;amp;amp;amp;amp;nbsp; > &gt; COUNT(DISTINCT user_id) OVER > &gt; &amp;gt; w AS uv > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; FROM > user_behavior > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; WINDOW w > AS (ORDER BY proctime > &gt; ROWS BETWEEN > &gt; &amp;gt; UNBOUNDED > &gt; &amp;gt; &amp;amp;gt; PRECEDING AND > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; CURRENT > ROW); > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; > ?????????????????????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; PARTITION > BY > &gt; DATE_FORMAT(rowtm, 'yyyy-MM-dd') > &gt; &amp;gt; &amp;amp;gt; ?????????????????????????????????? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; > PS??1.10??????????DDL??????????CREATE > &gt; VIEW?? > &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt; ???? > &gt; > &gt; > &gt; > &gt; -- > &gt; > &gt; Best, > &gt; Benchao Li > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
