[ https://issues.apache.org/jira/browse/FLINK-26498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503393#comment-17503393 ]
Jark Wu edited comment on FLINK-26498 at 3/9/22, 8:42 AM: ---------------------------------------------------------- Thanks [~hehuiyuan], I got what you mean. The window has been fired when the "01:00:00.000" record arrived. So the {{hasFired}} is true when cleanup timer triggerred. So I think the result is as expected, because the records are out-of-order, and you declared the watermark definition as "dt - INTERVAL '0' SECOND" which expects strictly ascending timestamps (out-of-order records maybe dropped, the "00:59:20.100" record in your case). If you don't want to drop out-of-order records, please declare watermark to allow out-of-orders, e.g. "dt - INTERVAL '5' MINUTE" allows 5min out-of-order. was (Author: jark): Thanks [~hehuiyuan], I got what you mean. The window has been fired when the "01:00:00.000" record arrived. So the {{hasFired}} is true when cleanup timer triggerred. So I think the result is as expected, because the records are out-of-order, and you declared the watermark definition as "dt - INTERVAL '0' SECOND" which expects strictly ascending timestamps (out-of-order records maybe dropped). If you don't want to drop out-of-order records, please declare watermark to allow out-of-orders, e.g. "dt - INTERVAL '5' MINUTE" allows 5min out-of-order. > The window result may not have been emitted when use window emit feature and > set allow-latency > ------------------------------------------------------------------------------------------------ > > Key: FLINK-26498 > URL: https://issues.apache.org/jira/browse/FLINK-26498 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: hehuiyuan > Priority: Major > Labels: pull-request-available > Attachments: image-2022-03-05-23-53-37-086.png, > image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png > > > the sql of job : > {code:java} > CREATE TABLE tableSource( > name string, > age int not null, > sex string, > dt TIMESTAMP(3), > WATERMARK FOR dt AS dt - INTERVAL '0' SECOND > ) WITH ( > ); > CREATE TABLE tableSink( > windowstart timestamp(3), > windowend timestamp(3), > name string, > age int, > cou bigint > ) > WITH ( > ); > INSERT INTO tablesink > SELECT > TUMBLE_START(dt, INTERVAL '1' HOUR), > TUMBLE_END(dt, INTERVAL '1' HOUR), > name, > age, > count(sex) > FROM tableSource > GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code} > > and table config: > {code:java} > table.exec.emit.allow-lateness = 1 hour > table.exec.emit.late-fire.delay = 1 min > table.exec.emit.early-fire.delay = 1min{code} > > The data: > {code:java} > >hehuiyuan1,22,woman,2022-03-05 00:30:22.000 > >hehuiyuan1,22,woman,2022-03-05 00:40:22.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:50:22.000 > >hehuiyuan1,22,woman,2022-03-05 00:56:22.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 01:00:00.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data > >hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for > >[0:00:00 1:00:00] > >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code} > > The result: > {code:java} > > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > > > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code} > > The window result is lost when `hehuiyuan1,22,woman,2022-03-05 00:59:20.100` > arrived, the lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is > cleaned when the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived > that updated watermark. > > The window[0:00:00 ,1:00:00] has 6 pieces of data, but we got 5. > The trigger is AfterEndOfWindowEarlyAndLate . > So WindowOpearator may need to emit reuslt when the window cleanupTimer call > onEventTime. > > I think the correct result is as follows: > {code:java} > > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6]) > > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)