[ https://issues.apache.org/jira/browse/FLINK-22160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17325479#comment-17325479 ]
Leonard Xu commented on FLINK-22160: ------------------------------------ * Tested TUMBLE/HOP/CUMULATE TVF in Sql-Client, the result is as expected * Tested TUMBLE/HOP/CUMULATE TVF using TIMESTAMP_LTZ rowtime in Sql-Client, the result is as expected * Tested PROCTIME TUMBLE/HOP/CUMULATE TVF in different timezone in Sql-Client, the result is as expected * Tested rowtime/proctime TopN on TUMBLE TVF, the result is as expected * Tested Failure and recovery by Killing one TM, the job is recovery and run continue as expected. 1. Found the session window table value function parse ERROR, due to the session window is unsupported, I think we can ignore it, {code:java} Flink SQL> SELECT bidder, window_start, window_end, SUM(price) > FROM TABLE( > SESSION(TABLE BidSession PARTITION BY bidder, DESCRIPTOR(bidtime), > DESCRIPTOR(bidder), INTERVAL '5' MINUTES) > GROUP BY bidder, window_start, window_end; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.sql.parser.impl.ParseException: Encountered "PARTITION" at line 3, column 30. Was expecting one of: Was expecting one of: "EXCEPT" ... "FETCH" ... "INTERSECT" ... "LIMIT" ... "OFFSET" ... "ORDER" ... "MINUS" ... "UNION" ... {code} 2. Found the doc PR has some minor improvement, I left comments on the doc PR. The detail tests can refer here: {code:java} In FLINK-19604 (FLIP-145), we introduced a new syntax to express Window Aggregate and Window TopN. For Window Aggregate, we have also introduced a new window kind: cumulate windows.The scope of this task is to make sure:1. The old window aggergate syntax (GROUP BY TUMBLE(...)) can be rewrite using the new syntax, and get the same results. Note, session window is not supported yet in the new syntax. 2. Verify the new CUMULATE window works as expect 3. Verify the new Window TopN workss as expect 4. Failure and recovery and rescale case: results are still correct. 5. Window emitting: window should be fired once watermark advances window end (we can manually generate source data with monotonically and slowly increasing timestamp) 6. The feature is well-documentedNote: the documentation for this feature is still going on (FLINK-22159), for testing the feature, we can use the FLIP documentation as an instruction for now. ## Test ROWTIME WINDOW TVF ### Test TUMBLE WINDOWcreate table Bid ( bidtime TIMESTAMP(3), bidtime_ltz TIMESTAMP_LTZ(3), price DOUBLE, item STRING, proctime AS PROCTIME(), watermark for bidtime as bidtime - interval '10' MINUTES ) WITH ( 'connector' = 'socket', 'hostname' = '127.0.0.1', 'port' = '9999', 'format' = 'csv' );2020-04-20 08:07:00,2020-04-20 08:07:00Z,2,A 2020-04-20 08:11:00,2020-04-20 08:11:00Z,3,B 2020-04-20 08:05:00,2020-04-20 08:05:00Z,4,C 2020-04-20 08:09:00,2020-04-20 08:09:00Z,5,D 2020-04-20 08:13:00,2020-04-20 08:13:00Z,1,E 2020-04-20 08:17:00,2020-04-20 08:17:00Z,6,F 2020-04-20 08:30:01,2020-04-20 08:17:00Z,6,G ADD note flink doesn't support individual window table-valued function now. SELECT * FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); Flink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:10:00.000 | 11.0 | | +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:20:00.000 | 10.0 | 2020-04-20 08:07:00,2020-04-20 08:07:00Z,2,A 2020-04-20 08:11:00,2020-04-20 08:11:00Z,3,B 2020-04-20 08:05:00,2020-04-20 08:05:00Z,4,C 2020-04-20 08:09:00,2020-04-20 08:09:00Z,5,D 2020-04-20 08:13:00,2020-04-20 08:13:00Z,1,E 2020-04-20 08:17:00,2020-04-20 08:17:00Z,6,F 2020-04-20 08:35:01,2020-04-20 08:17:00Z,6,G ### Test HOP WINDOWFlink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' > MINUTES)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:10:00.000 | 11.0 | | +I | 2020-04-20 08:05:00.000 | 2020-04-20 08:15:00.000 | 15.0 | | +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:20:00.000 | 10.0 | | +I | 2020-04-20 08:15:00.000 | 2020-04-20 08:25:00.000 | 6.0 |### Test CUMULATE WINDOWFlink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL > '10' MINUTES)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:06:00.000 | 4.0 | | +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:08:00.000 | 6.0 | | +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:10:00.000 | 11.0 | | +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:12:00.000 | 3.0 | | +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:14:00.000 | 4.0 | | +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:16:00.000 | 4.0 | | +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:18:00.000 | 10.0 | | +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:20:00.000 | 10.0 |### test session window TVF create table BidSession ( bidtime TIMESTAMP(3), bidtime_ltz TIMESTAMP_LTZ(3), price DOUBLE, item STRING, bidder STRING, proctime AS PROCTIME(), watermark for bidtime as bidtime - interval '0.001' SECOND ) WITH ( 'connector' = 'socket', 'hostname' = '127.0.0.1', 'port' = '9988', 'format' = 'csv' );Flink SQL> create table BidSession ( > bidtime TIMESTAMP(3), > bidtime_ltz TIMESTAMP_LTZ(3), > price DOUBLE, > item STRING, > bidder STRING, > proctime AS PROCTIME(), > watermark for bidtime as bidtime - interval '0.001' SECOND > ) WITH ( > 'connector' = 'socket', > 'hostname' = '127.0.0.1', > 'port' = '9988', > 'format' = 'csv' > ); > [INFO] Execute statement succeed.Flink SQL> SELECT bidder, window_start, window_end, SUM(price) > FROM TABLE( > SESSION(TABLE BidSession PARTITION BY bidder, DESCRIPTOR(bidtime), > DESCRIPTOR(bidder), INTERVAL '5' MINUTES) > GROUP BY bidder, window_start, window_end; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.sql.parser.impl.ParseException: Encountered "PARTITION" at line 3, column 30. Was expecting one of: Was expecting one of: "EXCEPT" ... "FETCH" ... "INTERSECT" ... "LIMIT" ... "OFFSET" ... "ORDER" ... "MINUS" ... "UNION" ... ### TEST TIMESTAMP_LTZ ROWTIME WINDOW TVF create table Bid ( bidtime TIMESTAMP(3), bidtime_ltz TIMESTAMP_LTZ(3), price DOUBLE, item STRING, proctime AS PROCTIME(), watermark for bidtime_ltz as bidtime_ltz - interval '10' MINUTES ) WITH ( 'connector' = 'socket', 'hostname' = '127.0.0.1', 'port' = '9999', 'format' = 'csv' );2020-04-20 08:07:00,2020-04-20 08:07:00Z,2,A 2020-04-20 08:11:00,2020-04-20 08:11:00Z,3,B 2020-04-20 08:05:00,2020-04-20 08:05:00Z,4,C 2020-04-20 08:09:00,2020-04-20 08:09:00Z,5,D 2020-04-20 08:13:00,2020-04-20 08:13:00Z,1,E 2020-04-20 08:17:00,2020-04-20 08:17:00Z,6,F 2020-04-20 08:30:01,2020-04-20 08:30:01Z,6,G SELECT window_start, window_end, SUM(price) FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; Flink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2020-04-20 16:00:00.000 | 2020-04-20 16:10:00.000 | 11.0 | | +I | 2020-04-20 16:10:00.000 | 2020-04-20 16:20:00.000 | 10.0 | Flink SQL> SET table.local-time-zone=UTC; [INFO] Session property has been set.Flink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2020-04-20 08:00:00.000 | 2020-04-20 08:10:00.000 | 11.0 | | +I | 2020-04-20 08:10:00.000 | 2020-04-20 08:20:00.000 | 10.0 |Flink SQL> SET table.local-time-zone='GMT-08:00'; [INFO] Session property has been set. Flink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:10:00.000 | 11.0 | | +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:20:00.000 | 10.0 | 2020-04-20 08:07:00,2020-04-20 08:07:00Z,2,A 2020-04-20 08:11:00,2020-04-20 08:11:00Z,3,B 2020-04-20 08:05:00,2020-04-20 08:05:00Z,4,C 2020-04-20 08:09:00,2020-04-20 08:09:00Z,5,D 2020-04-20 08:13:00,2020-04-20 08:13:00Z,1,E 2020-04-20 08:17:00,2020-04-20 08:17:00Z,6,F 2020-04-20 08:30:01,2020-04-20 08:30:01Z,6,G 2020-04-20 08:35:01,2020-04-20 08:35:00Z,6,G Flink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > HOP(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '5' MINUTES, INTERVAL > '10' MINUTES)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:10:00.000 | 11.0 | | +I | 2020-04-20 00:05:00.000 | 2020-04-20 00:15:00.000 | 15.0 | | +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:20:00.000 | 10.0 | | +I | 2020-04-20 00:15:00.000 | 2020-04-20 00:25:00.000 | 6.0 | Flink SQL> > SELECT window_start, window_end, SUM(price) > FROM TABLE( > CUMULATE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '2' MINUTES, > INTERVAL '10' MINUTES)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:06:00.000 | 4.0 | | +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:08:00.000 | 6.0 | | +I | 2020-04-20 00:00:00.000 | 2020-04-20 00:10:00.000 | 11.0 | | +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:12:00.000 | 3.0 | | +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:14:00.000 | 4.0 | | +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:16:00.000 | 4.0 | | +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:18:00.000 | 10.0 | | +I | 2020-04-20 00:10:00.000 | 2020-04-20 00:20:00.000 | 10.0 | ## TEST PROCTIME WINDOW TVF create table Bid ( bidtime TIMESTAMP(3), bidtime_ltz TIMESTAMP_LTZ(3), price DOUBLE, item STRING, supplier_id STRING, proctime AS PROCTIME() ) WITH ( 'connector' = 'socket', 'hostname' = '127.0.0.1', 'port' = '9999', 'format' = 'csv' ); manually input data by `nc -l 9999`2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:06:00,2021-04-20 08:06:00Z,4,C,supplier2 2021-04-20 08:07:00,2021-04-20 08:07:00Z,2,G,supplier1 2021-04-20 08:09:00,2021-04-20 08:09:00Z,5,D,supplier4 2021-04-20 08:30:00,2021-04-20 08:30:00Z,6,F,supplier6 2021-04-20 08:30:00,2021-04-20 08:30:00Z,6,F,supplier6 2021-04-20 08:30:00,2021-04-20 08:30:00Z,6,F,supplier6 Flink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(proctime), INTERVAL '5' SECONDS)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2021-04-19 20:33:20.000 | 2021-04-19 20:33:25.000 | 8.0 | | +I | 2021-04-19 20:33:35.000 | 2021-04-19 20:33:40.000 | 2.0 | | +I | 2021-04-19 20:33:40.000 | 2021-04-19 20:33:45.000 | 5.0 | | +I | 2021-04-19 20:34:00.000 | 2021-04-19 20:34:05.000 | 6.0 | | +I | 2021-04-19 20:34:05.000 | 2021-04-19 20:34:10.000 | 12.0 |Flink SQL> SET table.local-time-zone='Asia/Shanghai'; [INFO] Session property has been set.Flink SQL> SELECT window_start, window_end, SUM(price) > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(proctime), INTERVAL '5' SECONDS)) > GROUP BY window_start, window_end; +----+-------------------------+-------------------------+--------------------------------+ | op | window_start | window_end | EXPR$2 | +----+-------------------------+-------------------------+--------------------------------+ | +I | 2021-04-20 12:39:10.000 | 2021-04-20 12:39:15.000 | 8.0 | | +I | 2021-04-20 12:39:20.000 | 2021-04-20 12:39:25.000 | 8.0 | | +I | 2021-04-20 12:39:30.000 | 2021-04-20 12:39:35.000 | 12.0 | ## Test TopN WINDOW TVF 2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:06:00,2021-04-20 08:06:00Z,4,C,supplier2 2021-04-20 08:07:00,2021-04-20 08:07:00Z,2,G,supplier1 2021-04-20 08:08:00,2021-04-20 08:08:00Z,2,B,supplier3 2021-04-20 08:09:00,2021-04-20 08:09:00Z,5,D,supplier4 2021-04-20 08:11:00,2021-04-20 08:11:00Z,2,B,supplier3 2021-04-20 08:13:00,2021-04-20 08:13:00Z,1,E,supplier1 2021-04-20 08:15:00,2021-04-20 08:15:00Z,3,H,supplier2 2021-04-20 08:17:00,2021-04-20 08:17:00Z,6,F,supplier5 2021-04-20 08:20:00,2021-04-20 08:20:00Z,6,F,supplier6 2021-04-20 08:30:00,2021-04-20 08:30:00Z,6,F,supplier6 create table Bid ( bidtime TIMESTAMP(3), bidtime_ltz TIMESTAMP_LTZ(3), price DOUBLE, item STRING, supplier_id STRING, proctime AS PROCTIME(), watermark for bidtime as bidtime - interval '10' MINUTES ) WITH ( 'connector' = 'socket', 'hostname' = '127.0.0.1', 'port' = '9999', 'format' = 'csv' ); Flink SQL> SELECT * > FROM ( > SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER > BY price DESC) as rownum > FROM ( > SELECT window_start, window_end, supplier_id, SUM(price) as price, > COUNT(*) as cnt > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) > GROUP BY window_start, window_end, supplier_id > ) > ) WHERE rownum <= 3; +----+-------------------------+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+ | op | window_start | window_end | supplier_id | price | cnt | rownum | +----+-------------------------+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+ | +I | 2021-04-20 08:00:00.000 | 2021-04-20 08:10:00.000 | supplier1 | 6.0 | 2 | 1 | | +I | 2021-04-20 08:00:00.000 | 2021-04-20 08:10:00.000 | supplier4 | 5.0 | 1 | 2 | | +I | 2021-04-20 08:00:00.000 | 2021-04-20 08:10:00.000 | supplier2 | 4.0 | 1 | 3 | | +I | 2021-04-20 08:10:00.000 | 2021-04-20 08:20:00.000 | supplier5 | 6.0 | 1 | 1 | | +I | 2021-04-20 08:10:00.000 | 2021-04-20 08:20:00.000 | supplier2 | 3.0 | 1 | 2 | | +I | 2021-04-20 08:10:00.000 | 2021-04-20 08:20:00.000 | supplier3 | 2.0 | 1 | 3 | create table Bid ( bidtime TIMESTAMP(3), bidtime_ltz TIMESTAMP_LTZ(3), price DOUBLE, item STRING, supplier_id STRING, proctime AS PROCTIME(), watermark for bidtime_ltz as bidtime_ltz - interval '10' MINUTES ) WITH ( 'connector' = 'socket', 'hostname' = '127.0.0.1', 'port' = '9999', 'format' = 'csv' ); SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum FROM ( SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, supplier_id ) ) WHERE rownum <= 3;Flink SQL> SELECT * > FROM ( > SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER > BY price DESC) as rownum > FROM ( > SELECT window_start, window_end, supplier_id, SUM(price) as price, > COUNT(*) as cnt > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES)) > GROUP BY window_start, window_end, supplier_id > ) > ) WHERE rownum <= 3; +----+-------------------------+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+ | op | window_start | window_end | supplier_id | price | cnt | rownum | +----+-------------------------+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+ | +I | 2021-04-20 00:00:00.000 | 2021-04-20 00:10:00.000 | supplier1 | 6.0 | 2 | 1 | | +I | 2021-04-20 00:00:00.000 | 2021-04-20 00:10:00.000 | supplier4 | 5.0 | 1 | 2 | | +I | 2021-04-20 00:00:00.000 | 2021-04-20 00:10:00.000 | supplier2 | 4.0 | 1 | 3 | | +I | 2021-04-20 00:10:00.000 | 2021-04-20 00:20:00.000 | supplier5 | 6.0 | 1 | 1 | | +I | 2021-04-20 00:10:00.000 | 2021-04-20 00:20:00.000 | supplier2 | 3.0 | 1 | 2 | | +I | 2021-04-20 00:10:00.000 | 2021-04-20 00:20:00.000 | supplier3 | 2.0 | 1 | 3 |SELECT window_start, window_end, SUM(price) FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(proctime), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end; ## Test Failure and recovery and rescale case start 2 TMs,bang@mac build-target $jps 8706 TaskManagerRunner 8459 StandaloneSessionClusterEntrypoint 8943 TaskManagerRunner create table Bid ( bidtime TIMESTAMP(3), bidtime_ltz TIMESTAMP_LTZ(3), price DOUBLE, item STRING, supplier_id STRING, proctime AS PROCTIME() ) WITH ( 'connector' = 'socket', 'hostname' = '127.0.0.1', 'port' = '9999', 'format' = 'csv' ); manually input data by `nc -l 9999`Flink SQL> insert into filetest SELECT window_start, window_end, SUM(price) > FROM TABLE( > TUMBLE(TABLE Bid, DESCRIPTOR(proctime), INTERVAL '5' SECONDS)) > GROUP BY window_start, window_end; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 26867a8d5cedc42c734814ae120055aekill one TM,the job run normarlly 11093 StandaloneSessionClusterEntrypoint 11580 TaskManagerRunner 11342 TaskManagerRunner bang@mac sink.csv $kill -9 11580 bang@mac sink.csv $cat .part-6b7a7828-8d43-42ba-aaa0-244b9c94b5e6-0-0.inprogress.6d6e5635-2c1c-4ccb-ba0d-88deb5015959 "2021-04-20 13:13:05","2021-04-20 13:13:10",8.0 "2021-04-20 13:13:10","2021-04-20 13:13:15",4.0 "2021-04-20 13:13:25","2021-04-20 13:13:30",4.0 bang@mac sink.csv $cat .part-6b7a7828-8d43-42ba-aaa0-244b9c94b5e6-0-0.inprogress.6d6e5635-2c1c-4ccb-ba0d-88deb5015959 "2021-04-20 13:13:05","2021-04-20 13:13:10",8.0 "2021-04-20 13:13:10","2021-04-20 13:13:15",4.0 "2021-04-20 13:13:25","2021-04-20 13:13:30",4.0 "2021-04-20 13:13:50","2021-04-20 13:13:55",8.0 bang@mac sink.csv $cat .part-6b7a7828-8d43-42ba-aaa0-244b9c94b5e6-0-0.inprogress.6d6e5635-2c1c-4ccb-ba0d-88deb5015959 "2021-04-20 13:13:05","2021-04-20 13:13:10",8.0 "2021-04-20 13:13:10","2021-04-20 13:13:15",4.0 "2021-04-20 13:13:25","2021-04-20 13:13:30",4.0 "2021-04-20 13:13:50","2021-04-20 13:13:55",8.0 "2021-04-20 13:13:55","2021-04-20 13:14:00",4.0 bang@mac sink.csv $cat .part-6b7a7828-8d43-42ba-aaa0-244b9c94b5e6-0-0.inprogress.6d6e5635-2c1c-4ccb-ba0d-88deb5015959 "2021-04-20 13:13:05","2021-04-20 13:13:10",8.0 "2021-04-20 13:13:10","2021-04-20 13:13:15",4.0 "2021-04-20 13:13:25","2021-04-20 13:13:30",4.0 "2021-04-20 13:13:50","2021-04-20 13:13:55",8.0 "2021-04-20 13:13:55","2021-04-20 13:14:00",4.0 "2021-04-20 13:14:30","2021-04-20 13:14:35",100.0 bang@mac sink.csv $^Xbang@mac docker-hive (master) $nc -l 9999 2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:05:00,2021-04-20 08:05:00Z,4,A,supplier1 2021-04-20 08:05:00,2021-04-20 08:05:00Z,100,A,test {code} > Test Window TVF based aggregation and TopN > ------------------------------------------ > > Key: FLINK-22160 > URL: https://issues.apache.org/jira/browse/FLINK-22160 > Project: Flink > Issue Type: Test > Components: Table SQL / API > Reporter: Jark Wu > Assignee: Leonard Xu > Priority: Blocker > Labels: release-testing > Fix For: 1.13.0 > > > In FLINK-19604 > ([FLIP-145|https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function]), > we introduced a new syntax to express Window Aggregate and Window TopN. For > Window Aggregate, we have also introduced a new window kind: cumulate > windows. > The scope of this task is to make sure: > 1. The old window aggergate syntax ({{GROUP BY TUMBLE(...)}}) can be rewrite > using the new syntax, and get the same results. Note, session window is not > supported yet in the new syntax. > 2. Verify the new CUMULATE window works as expect > 3. Verify the new Window TopN workss as expect > 4. Failure and recovery and rescale case: results are still correct. > 5. Window emitting: window should be fired once watermark advances window end > (we can manually generate source data with monotonically and slowly > increasing timestamp) > 6. The feature is well-documented > Note: the documentation for this feature is still going on (FLINK-22159), for > testing the feature, we can use the FLIP documentation as an instruction for > now. -- This message was sent by Atlassian Jira (v8.3.4#803005)