[ 
https://issues.apache.org/jira/browse/FLINK-22160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17325479#comment-17325479
 ] 

Leonard Xu commented on FLINK-22160:
------------------------------------

* Tested TUMBLE/HOP/CUMULATE TVF in Sql-Client, the result is as expected
 * Tested TUMBLE/HOP/CUMULATE TVF using TIMESTAMP_LTZ rowtime in Sql-Client, 
the result is as expected
 * Tested PROCTIME TUMBLE/HOP/CUMULATE TVF in different timezone  in 
Sql-Client, the result is as expected 
 * Tested rowtime/proctime TopN on TUMBLE TVF, the result is as expected 
 * Tested Failure and recovery by Killing one TM, the job is recovery and run 
continue as expected.

1. Found the session window table value function parse ERROR, due to the 
session window is unsupported, I think we can ignore it,
{code:java}
 Flink SQL> SELECT bidder, window_start, window_end, SUM(price)
 > FROM TABLE(
 > SESSION(TABLE BidSession PARTITION BY bidder, DESCRIPTOR(bidtime), 
 > DESCRIPTOR(bidder), INTERVAL '5' MINUTES)
 > GROUP BY bidder, window_start, window_end;
 [ERROR] Could not execute SQL statement. Reason:
 org.apache.flink.sql.parser.impl.ParseException: Encountered "PARTITION" at 
line 3, column 30.
 Was expecting one of:
 Was expecting one of:
 "EXCEPT" ...
 "FETCH" ...
 "INTERSECT" ...
 "LIMIT" ...
 "OFFSET" ...
 "ORDER" ...
 "MINUS" ...
 "UNION" ...
{code}
2. Found the doc PR has some minor improvement, I left comments on the doc PR.

 

The detail tests can refer here:
{code:java}
In FLINK-19604 (FLIP-145), we introduced a new syntax to express Window 
Aggregate and Window TopN. For Window Aggregate, we have also introduced a new 
window kind: cumulate windows.The scope of this task is to make sure:1. The old 
window aggergate syntax (GROUP BY TUMBLE(...)) can be rewrite using the new 
syntax, and get the same results. Note, session window is not supported yet in 
the new syntax.
2. Verify the new CUMULATE window works as expect
3. Verify the new Window TopN workss as expect
4. Failure and recovery and rescale case: results are still correct.
5. Window emitting: window should be fired once watermark advances window end 
(we can manually generate source data with monotonically and slowly increasing 
timestamp)
6. The feature is well-documentedNote: the documentation for this feature is 
still going on (FLINK-22159), for testing the feature, we can use the FLIP 
documentation as an instruction for now.
## Test ROWTIME WINDOW TVF 
### Test TUMBLE WINDOWcreate table Bid (
     bidtime TIMESTAMP(3),
     bidtime_ltz TIMESTAMP_LTZ(3),
     price DOUBLE,
     item STRING,
     proctime AS PROCTIME(),
     watermark for bidtime as bidtime - interval '10' MINUTES 
 ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
 );2020-04-20 08:07:00,2020-04-20 08:07:00Z,2,A
2020-04-20 08:11:00,2020-04-20 08:11:00Z,3,B
2020-04-20 08:05:00,2020-04-20 08:05:00Z,4,C
2020-04-20 08:09:00,2020-04-20 08:09:00Z,5,D
2020-04-20 08:13:00,2020-04-20 08:13:00Z,1,E 
2020-04-20 08:17:00,2020-04-20 08:17:00Z,6,F
2020-04-20 08:30:01,2020-04-20 08:17:00Z,6,G     ADD note flink doesn't support 
individual window table-valued function now.
 SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
Flink SQL> SELECT window_start, window_end, SUM(price)
>   FROM TABLE(
>     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:10:00.000 |                      
     11.0 |
| +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:20:00.000 |                      
     10.0 |
2020-04-20 08:07:00,2020-04-20 08:07:00Z,2,A
2020-04-20 08:11:00,2020-04-20 08:11:00Z,3,B
2020-04-20 08:05:00,2020-04-20 08:05:00Z,4,C
2020-04-20 08:09:00,2020-04-20 08:09:00Z,5,D
2020-04-20 08:13:00,2020-04-20 08:13:00Z,1,E 
2020-04-20 08:17:00,2020-04-20 08:17:00Z,6,F
2020-04-20 08:35:01,2020-04-20 08:17:00Z,6,G 
### Test HOP WINDOWFlink SQL> SELECT window_start, window_end, SUM(price)
>   FROM TABLE(
>     HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' 
> MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:10:00.000 |                      
     11.0 |
| +I | 2020-04-20 08:05:00.000 | 2020-04-20 08:15:00.000 |                      
     15.0 |
| +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:20:00.000 |                      
     10.0 |
| +I | 2020-04-20 08:15:00.000 | 2020-04-20 08:25:00.000 |                      
      6.0 |### Test CUMULATE WINDOWFlink SQL> SELECT window_start, window_end, 
SUM(price)
>   FROM TABLE(
>     CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL 
> '10' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:06:00.000 |                      
      4.0 |
| +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:08:00.000 |                      
      6.0 |
| +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:10:00.000 |                      
     11.0 |
| +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:12:00.000 |                      
      3.0 |
| +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:14:00.000 |                      
      4.0 |
| +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:16:00.000 |                      
      4.0 |
| +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:18:00.000 |                      
     10.0 |
| +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:20:00.000 |                      
     10.0 |### test session window TVF 
create table BidSession (
     bidtime TIMESTAMP(3),
     bidtime_ltz TIMESTAMP_LTZ(3),
     price DOUBLE,
     item STRING,
     bidder STRING,
     proctime AS PROCTIME(),
     watermark for bidtime as bidtime - interval '0.001' SECOND 
 ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9988',
     'format' = 'csv'
 );Flink SQL> create table BidSession (
>      bidtime TIMESTAMP(3),
>      bidtime_ltz TIMESTAMP_LTZ(3),
>      price DOUBLE,
>      item STRING,
>      bidder STRING,
>      proctime AS PROCTIME(),
>      watermark for bidtime as bidtime - interval '0.001' SECOND
>  ) WITH (
>      'connector' = 'socket',
>      'hostname' = '127.0.0.1',
>      'port' = '9988',
>      'format' = 'csv'
>  );
>
[INFO] Execute statement succeed.Flink SQL> SELECT bidder, window_start, 
window_end, SUM(price)
>   FROM TABLE(
>     SESSION(TABLE BidSession PARTITION BY bidder, DESCRIPTOR(bidtime), 
> DESCRIPTOR(bidder), INTERVAL '5' MINUTES)
>   GROUP BY bidder, window_start, window_end;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "PARTITION" at 
line 3, column 30.
Was expecting one of:
Was expecting one of:
    "EXCEPT" ...
    "FETCH" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
    "MINUS" ...
    "UNION" ...
    ### TEST TIMESTAMP_LTZ ROWTIME WINDOW TVF 
create table Bid (
     bidtime TIMESTAMP(3),
     bidtime_ltz TIMESTAMP_LTZ(3),
     price DOUBLE,
     item STRING,
     proctime AS PROCTIME(),
     watermark for bidtime_ltz as bidtime_ltz - interval '10' MINUTES 
 ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
 );2020-04-20 08:07:00,2020-04-20 08:07:00Z,2,A
2020-04-20 08:11:00,2020-04-20 08:11:00Z,3,B
2020-04-20 08:05:00,2020-04-20 08:05:00Z,4,C
2020-04-20 08:09:00,2020-04-20 08:09:00Z,5,D
2020-04-20 08:13:00,2020-04-20 08:13:00Z,1,E 
2020-04-20 08:17:00,2020-04-20 08:17:00Z,6,F
2020-04-20 08:30:01,2020-04-20 08:30:01Z,6,G     SELECT window_start, 
window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;   
Flink SQL> SELECT window_start, window_end, SUM(price)
>   FROM TABLE(
>     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2020-04-20 16:00:00.000 | 2020-04-20 16:10:00.000 |                      
     11.0 |
| +I | 2020-04-20 16:10:00.000 | 2020-04-20 16:20:00.000 |                      
     10.0 |
Flink SQL> SET table.local-time-zone=UTC;
[INFO] Session property has been set.Flink SQL> SELECT window_start, 
window_end, SUM(price)
>   FROM TABLE(
>     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:10:00.000 |                      
     11.0 |
| +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:20:00.000 |                      
     10.0 |Flink SQL> SET table.local-time-zone='GMT-08:00';
[INFO] Session property has been set.
Flink SQL> SELECT window_start, window_end, SUM(price)
>   FROM TABLE(
>     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:10:00.000 |                      
     11.0 |
| +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:20:00.000 |                      
     10.0 |
2020-04-20 08:07:00,2020-04-20 08:07:00Z,2,A
2020-04-20 08:11:00,2020-04-20 08:11:00Z,3,B
2020-04-20 08:05:00,2020-04-20 08:05:00Z,4,C
2020-04-20 08:09:00,2020-04-20 08:09:00Z,5,D
2020-04-20 08:13:00,2020-04-20 08:13:00Z,1,E
2020-04-20 08:17:00,2020-04-20 08:17:00Z,6,F
2020-04-20 08:30:01,2020-04-20 08:30:01Z,6,G
2020-04-20 08:35:01,2020-04-20 08:35:00Z,6,G 
Flink SQL>  SELECT window_start, window_end, SUM(price)
> FROM TABLE(
>      HOP(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '5' MINUTES, INTERVAL 
> '10' MINUTES))
>    GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:10:00.000 |                      
     11.0 |
| +I | 2020-04-20 00:05:00.000 | 2020-04-20 00:15:00.000 |                      
     15.0 |
| +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:20:00.000 |                      
     10.0 |
| +I | 2020-04-20 00:15:00.000 | 2020-04-20 00:25:00.000 |                      
      6.0 |
Flink SQL>
> SELECT window_start, window_end, SUM(price)
>  FROM TABLE(
>      CUMULATE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '2' MINUTES, 
> INTERVAL '10' MINUTES))
>    GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:06:00.000 |                      
      4.0 |
| +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:08:00.000 |                      
      6.0 |
| +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:10:00.000 |                      
     11.0 |
| +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:12:00.000 |                      
      3.0 |
| +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:14:00.000 |                      
      4.0 |
| +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:16:00.000 |                      
      4.0 |
| +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:18:00.000 |                      
     10.0 |
| +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:20:00.000 |                      
     10.0 |
## TEST PROCTIME WINDOW TVF create table Bid (
     bidtime TIMESTAMP(3),
     bidtime_ltz TIMESTAMP_LTZ(3),
     price DOUBLE,
     item STRING,
     supplier_id STRING,
     proctime AS PROCTIME()
      ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
 );
manually input data by `nc -l 9999`2021-04-20 08:05:00,2021-04-20 
08:05:00Z,4,A,supplier1
2021-04-20 08:06:00,2021-04-20 08:06:00Z,4,C,supplier2
2021-04-20 08:07:00,2021-04-20 08:07:00Z,2,G,supplier1
2021-04-20 08:09:00,2021-04-20 08:09:00Z,5,D,supplier4
2021-04-20 08:30:00,2021-04-20 08:30:00Z,6,F,supplier6
2021-04-20 08:30:00,2021-04-20 08:30:00Z,6,F,supplier6
2021-04-20 08:30:00,2021-04-20 08:30:00Z,6,F,supplier6
Flink SQL> SELECT window_start, window_end, SUM(price)
>   FROM TABLE(
>     TUMBLE(TABLE Bid, DESCRIPTOR(proctime), INTERVAL '5' SECONDS))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2021-04-19 20:33:20.000 | 2021-04-19 20:33:25.000 |                      
      8.0 |
| +I | 2021-04-19 20:33:35.000 | 2021-04-19 20:33:40.000 |                      
      2.0 |
| +I | 2021-04-19 20:33:40.000 | 2021-04-19 20:33:45.000 |                      
      5.0 |
| +I | 2021-04-19 20:34:00.000 | 2021-04-19 20:34:05.000 |                      
      6.0 |
| +I | 2021-04-19 20:34:05.000 | 2021-04-19 20:34:10.000 |                      
     12.0 |Flink SQL> SET table.local-time-zone='Asia/Shanghai';
[INFO] Session property has been set.Flink SQL> SELECT window_start, 
window_end, SUM(price)
>   FROM TABLE(
>     TUMBLE(TABLE Bid, DESCRIPTOR(proctime), INTERVAL '5' SECONDS))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+--------------------------------+
| op |            window_start |              window_end |                      
   EXPR$2 |
+----+-------------------------+-------------------------+--------------------------------+
| +I | 2021-04-20 12:39:10.000 | 2021-04-20 12:39:15.000 |                      
      8.0 |
| +I | 2021-04-20 12:39:20.000 | 2021-04-20 12:39:25.000 |                      
      8.0 |
| +I | 2021-04-20 12:39:30.000 | 2021-04-20 12:39:35.000 |                      
     12.0 |
## Test TopN  WINDOW TVF  
2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1
2021-04-20 08:06:00,2021-04-20 08:06:00Z,4,C,supplier2
2021-04-20 08:07:00,2021-04-20 08:07:00Z,2,G,supplier1
2021-04-20 08:08:00,2021-04-20 08:08:00Z,2,B,supplier3
2021-04-20 08:09:00,2021-04-20 08:09:00Z,5,D,supplier4
2021-04-20 08:11:00,2021-04-20 08:11:00Z,2,B,supplier3
2021-04-20 08:13:00,2021-04-20 08:13:00Z,1,E,supplier1
2021-04-20 08:15:00,2021-04-20 08:15:00Z,3,H,supplier2
2021-04-20 08:17:00,2021-04-20 08:17:00Z,6,F,supplier5
2021-04-20 08:20:00,2021-04-20 08:20:00Z,6,F,supplier6
2021-04-20 08:30:00,2021-04-20 08:30:00Z,6,F,supplier6 create table Bid (
     bidtime TIMESTAMP(3),
     bidtime_ltz TIMESTAMP_LTZ(3),
     price DOUBLE,
     item STRING,
     supplier_id STRING,
     proctime AS PROCTIME(),
     watermark for bidtime as bidtime - interval '10' MINUTES 
 ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
 ); Flink SQL> SELECT *
>   FROM (
>     SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER 
> BY price DESC) as rownum
>     FROM (
>       SELECT window_start, window_end, supplier_id, SUM(price) as price, 
> COUNT(*) as cnt
>       FROM TABLE(
>         TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
>       GROUP BY window_start, window_end, supplier_id
>     )
>   ) WHERE rownum <= 3;
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+
| op |            window_start |              window_end |                    
supplier_id |                          price |                  cnt |           
    rownum |
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+
| +I | 2021-04-20 08:00:00.000 | 2021-04-20 08:10:00.000 |                      
supplier1 |                            6.0 |                    2 |             
       1 |
| +I | 2021-04-20 08:00:00.000 | 2021-04-20 08:10:00.000 |                      
supplier4 |                            5.0 |                    1 |             
       2 |
| +I | 2021-04-20 08:00:00.000 | 2021-04-20 08:10:00.000 |                      
supplier2 |                            4.0 |                    1 |             
       3 |
| +I | 2021-04-20 08:10:00.000 | 2021-04-20 08:20:00.000 |                      
supplier5 |                            6.0 |                    1 |             
       1 |
| +I | 2021-04-20 08:10:00.000 | 2021-04-20 08:20:00.000 |                      
supplier2 |                            3.0 |                    1 |             
       2 |
| +I | 2021-04-20 08:10:00.000 | 2021-04-20 08:20:00.000 |                      
supplier3 |                            2.0 |                    1 |             
       3 |
create table Bid (
     bidtime TIMESTAMP(3),
     bidtime_ltz TIMESTAMP_LTZ(3),
     price DOUBLE,
     item STRING,
     supplier_id STRING,
     proctime AS PROCTIME(),
     watermark for bidtime_ltz as bidtime_ltz - interval '10' MINUTES 
 ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
 );
SELECT *
   FROM (
     SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER 
BY price DESC) as rownum
     FROM (
       SELECT window_start, window_end, supplier_id, SUM(price) as price, 
COUNT(*) as cnt
       FROM TABLE(
         TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES))
       GROUP BY window_start, window_end, supplier_id
     )
   ) WHERE rownum <= 3;Flink SQL> SELECT *
>    FROM (
>      SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER 
> BY price DESC) as rownum
>      FROM (
>        SELECT window_start, window_end, supplier_id, SUM(price) as price, 
> COUNT(*) as cnt
>        FROM TABLE(
>          TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES))
>        GROUP BY window_start, window_end, supplier_id
>      )
>    ) WHERE rownum <= 3;
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+
| op |            window_start |              window_end |                    
supplier_id |                          price |                  cnt |           
    rownum |
+----+-------------------------+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+
| +I | 2021-04-20 00:00:00.000 | 2021-04-20 00:10:00.000 |                      
supplier1 |                            6.0 |                    2 |             
       1 |
| +I | 2021-04-20 00:00:00.000 | 2021-04-20 00:10:00.000 |                      
supplier4 |                            5.0 |                    1 |             
       2 |
| +I | 2021-04-20 00:00:00.000 | 2021-04-20 00:10:00.000 |                      
supplier2 |                            4.0 |                    1 |             
       3 |
| +I | 2021-04-20 00:10:00.000 | 2021-04-20 00:20:00.000 |                      
supplier5 |                            6.0 |                    1 |             
       1 |
| +I | 2021-04-20 00:10:00.000 | 2021-04-20 00:20:00.000 |                      
supplier2 |                            3.0 |                    1 |             
       2 |
| +I | 2021-04-20 00:10:00.000 | 2021-04-20 00:20:00.000 |                      
supplier3 |                            2.0 |                    1 |             
       3 |SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(proctime), INTERVAL '5' SECONDS))
  GROUP BY window_start, window_end;   ## Test Failure and recovery and rescale 
case
start 2 TMs,bang@mac build-target $jps
8706 TaskManagerRunner
8459 StandaloneSessionClusterEntrypoint
8943 TaskManagerRunner
create table Bid (
     bidtime TIMESTAMP(3),
     bidtime_ltz TIMESTAMP_LTZ(3),
     price DOUBLE,
     item STRING,
     supplier_id STRING,
     proctime AS PROCTIME()
      ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
 );
manually input data by `nc -l 9999`Flink SQL> insert into filetest  SELECT 
window_start, window_end, SUM(price)
>    FROM TABLE(
>     TUMBLE(TABLE Bid, DESCRIPTOR(proctime), INTERVAL '5' SECONDS))
>    GROUP BY window_start, window_end;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 26867a8d5cedc42c734814ae120055aekill one TM,the job run normarlly
 11093 StandaloneSessionClusterEntrypoint
 11580 TaskManagerRunner
11342 TaskManagerRunner
bang@mac sink.csv $kill -9 11580
bang@mac sink.csv $cat 
.part-6b7a7828-8d43-42ba-aaa0-244b9c94b5e6-0-0.inprogress.6d6e5635-2c1c-4ccb-ba0d-88deb5015959
"2021-04-20 13:13:05","2021-04-20 13:13:10",8.0
"2021-04-20 13:13:10","2021-04-20 13:13:15",4.0
"2021-04-20 13:13:25","2021-04-20 13:13:30",4.0
bang@mac sink.csv $cat 
.part-6b7a7828-8d43-42ba-aaa0-244b9c94b5e6-0-0.inprogress.6d6e5635-2c1c-4ccb-ba0d-88deb5015959
"2021-04-20 13:13:05","2021-04-20 13:13:10",8.0
"2021-04-20 13:13:10","2021-04-20 13:13:15",4.0
"2021-04-20 13:13:25","2021-04-20 13:13:30",4.0
"2021-04-20 13:13:50","2021-04-20 13:13:55",8.0
bang@mac sink.csv $cat 
.part-6b7a7828-8d43-42ba-aaa0-244b9c94b5e6-0-0.inprogress.6d6e5635-2c1c-4ccb-ba0d-88deb5015959
"2021-04-20 13:13:05","2021-04-20 13:13:10",8.0
"2021-04-20 13:13:10","2021-04-20 13:13:15",4.0
"2021-04-20 13:13:25","2021-04-20 13:13:30",4.0
"2021-04-20 13:13:50","2021-04-20 13:13:55",8.0
"2021-04-20 13:13:55","2021-04-20 13:14:00",4.0
bang@mac sink.csv $cat 
.part-6b7a7828-8d43-42ba-aaa0-244b9c94b5e6-0-0.inprogress.6d6e5635-2c1c-4ccb-ba0d-88deb5015959
"2021-04-20 13:13:05","2021-04-20 13:13:10",8.0
"2021-04-20 13:13:10","2021-04-20 13:13:15",4.0
"2021-04-20 13:13:25","2021-04-20 13:13:30",4.0
"2021-04-20 13:13:50","2021-04-20 13:13:55",8.0
"2021-04-20 13:13:55","2021-04-20 13:14:00",4.0
"2021-04-20 13:14:30","2021-04-20 13:14:35",100.0
bang@mac sink.csv $^Xbang@mac docker-hive (master) $nc -l 9999
2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1
2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1
2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1
2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1
2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1
2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1
2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1
2021-04-20 08:05:00,2021-04-20 08:05:00Z,100,A,test

{code}
 

 

> Test Window TVF based aggregation and TopN
> ------------------------------------------
>
>                 Key: FLINK-22160
>                 URL: https://issues.apache.org/jira/browse/FLINK-22160
>             Project: Flink
>          Issue Type: Test
>          Components: Table SQL / API
>            Reporter: Jark Wu
>            Assignee: Leonard Xu
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.13.0
>
>
> In FLINK-19604 
> ([FLIP-145|https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function]),
>  we introduced a new syntax to express Window Aggregate and Window TopN. For 
> Window Aggregate, we have also introduced a new window kind: cumulate 
> windows. 
> The scope of this task is to make sure:
> 1. The old window aggergate syntax ({{GROUP BY TUMBLE(...)}}) can be rewrite 
> using the new syntax, and get the same results. Note, session window is not 
> supported yet in the new syntax.
> 2. Verify the new CUMULATE window works as expect
> 3. Verify the new Window TopN workss as expect
> 4. Failure and recovery and rescale case: results are still correct.
> 5. Window emitting: window should be fired once watermark advances window end 
> (we can manually generate source data with monotonically and slowly 
> increasing timestamp)
> 6. The feature is well-documented
> Note: the documentation for this feature is still going on (FLINK-22159), for 
> testing the feature, we can use the FLIP documentation as an instruction for 
> now. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to