[ https://issues.apache.org/jira/browse/FLINK-26498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503348#comment-17503348 ]
hehuiyuan edited comment on FLINK-26498 at 3/9/22, 7:50 AM: ------------------------------------------------------------ [~jark] , It is time for cleanupTimer but laterTrigger is not time. The laterTrigger registered time has exceeded the cleanup time. The cleanup time arrived faster ,then clean window and trigger. For event time timer , it has only 2 timers : window.maxTimestamp and window.maxTimestamp+allow-latency(cleanup timer). Then We can only change AfterEndOfWindowEarlyAndLate trigger. {code:java} public boolean onEventTime(long time, W window) throws Exception { ValueState<Boolean> hasFiredState = ctx.getPartitionedState(hasFiredOnTimeStateDesc); Boolean hasFired = hasFiredState.value(); if (hasFired != null && hasFired) { // late fire return lateTrigger != null && lateTrigger.onEventTime(time, window); } else { if (time == window.maxTimestamp()) { // fire on time and update state hasFiredState.update(true); return true; } else { // early fire return earlyTrigger != null && earlyTrigger.onEventTime(time, window); } } } {code} was (Author: hehuiyuan): [~jark] , It is time for cleanupTimer but laterTrigger is not triggered. The laterTrigger registered time has exceeded the cleanup time. The cleanup time arrived faster ,then clean window and trigger. > The window result may not have been emitted when use window emit feature and > set allow-latency > ------------------------------------------------------------------------------------------------ > > Key: FLINK-26498 > URL: https://issues.apache.org/jira/browse/FLINK-26498 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: hehuiyuan > Priority: Major > Labels: pull-request-available > Attachments: image-2022-03-05-23-53-37-086.png, > image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png > > > the sql of job : > {code:java} > CREATE TABLE tableSource( > name string, > age int not null, > sex string, > dt TIMESTAMP(3), > WATERMARK FOR dt AS dt - INTERVAL '0' SECOND > ) WITH ( > ); > CREATE TABLE tableSink( > windowstart timestamp(3), > windowend timestamp(3), > name string, > age int, > cou bigint > ) > WITH ( > ); > INSERT INTO tablesink > SELECT > TUMBLE_START(dt, INTERVAL '1' HOUR), > TUMBLE_END(dt, INTERVAL '1' HOUR), > name, > age, > count(sex) > FROM tableSource > GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code} > > and table config: > {code:java} > table.exec.emit.allow-lateness = 1 hour > table.exec.emit.late-fire.delay = 1 min > table.exec.emit.early-fire.delay = 1min{code} > > The data: > {code:java} > >hehuiyuan1,22,woman,2022-03-05 00:30:22.000 > >hehuiyuan1,22,woman,2022-03-05 00:40:22.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:50:22.000 > >hehuiyuan1,22,woman,2022-03-05 00:56:22.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 01:00:00.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data > >hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for > >[0:00:00 1:00:00] > >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code} > > The result: > {code:java} > > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > > > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code} > > The window result is lost when `hehuiyuan1,22,woman,2022-03-05 00:59:20.100` > arrived, the lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is > cleaned when the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived > that updated watermark. > > The window[0:00:00 ,1:00:00] has 6 pieces of data, but we got 5. > The trigger is AfterEndOfWindowEarlyAndLate . > So WindowOpearator may need to emit reuslt when the window cleanupTimer call > onEventTime. > > I think the correct result is as follows: > {code:java} > > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6]) > > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)