[ 
https://issues.apache.org/jira/browse/FLINK-26498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hehuiyuan updated FLINK-26498:
------------------------------
    Description: 
the sql of job :
{code:java}
CREATE TABLE tableSource(
    name string,
    age int not null,
    sex string,
    dt TIMESTAMP(3),
    WATERMARK FOR dt AS dt - INTERVAL '0' SECOND
) WITH (

);



CREATE TABLE tableSink(
    windowstart timestamp(3),
    windowend timestamp(3),
    name string,
    age int,
    cou bigint
)
WITH (

);
INSERT INTO tablesink
  SELECT
    TUMBLE_START(dt, INTERVAL '1' HOUR),
    TUMBLE_END(dt, INTERVAL '1' HOUR),
    name,
    age,
    count(sex)
FROM tableSource
GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code}
 

and table config:
{code:java}
table.exec.emit.allow-lateness = 1 hour 
table.exec.emit.late-fire.delay = 1 min
table.exec.emit.early-fire.delay = 1min{code}
 

The data:
{code:java}
>hehuiyuan1,22,woman,2022-03-05 00:30:22.000
>hehuiyuan1,22,woman,2022-03-05 00:40:22.000
 //pause ,wait for the window trigger for earlyTrigger 1 min
>hehuiyuan1,22,woman,2022-03-05 00:50:22.000
>hehuiyuan1,22,woman,2022-03-05 00:56:22.000
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 01:00:00.000
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data 
>hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for [0:00:00 
>1:00:00]
>hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code}
 

The result:
{code:java}
> +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) 
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 

> +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
 
 
> +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
 

`hehuiyuan1,22,woman,2022-03-05 00:59:20.100` window result is lost, the 
lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is cleaned when the 
data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived that updated 
watermark.

 

The window[0:00:00 ,1:00:00]   has 6 pieces of data, but we got 5.

The trigger is AfterEndOfWindowEarlyAndLate .

So WindowOpearator may need to emit reuslt when the window cleanupTimer call 
onEventTime.

 

I think the correct result is as follows:
{code:java}
> +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])

> +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6])
> +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
 

  was:
the sql of job :
{code:java}
CREATE TABLE tableSource(
    name string,
    age int not null,
    sex string,
    dt TIMESTAMP(3),
    WATERMARK FOR dt AS dt - INTERVAL '0' SECOND
) WITH (

);



CREATE TABLE tableSink(
    windowstart timestamp(3),
    windowend timestamp(3),
    name string,
    age int,
    cou bigint
)
WITH (

);
INSERT INTO tablesink
  SELECT
    TUMBLE_START(dt, INTERVAL '1' HOUR),
    TUMBLE_END(dt, INTERVAL '1' HOUR),
    name,
    age,
    count(sex)
FROM tableSource
GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code}
 

and table config:
{code:java}
table.exec.emit.allow-lateness = 1 hour 
table.exec.emit.late-fire.delay = 1 min
table.exec.emit.early-fire.delay = 1min{code}
 

The data:
{code:java}
>hehuiyuan1,22,woman,2022-03-05 00:30:22.000
>hehuiyuan1,22,woman,2022-03-05 00:40:22.000
 //pause ,wait for the window trigger for earlyTrigger 1 min
>hehuiyuan1,22,woman,2022-03-05 00:50:22.000
>hehuiyuan1,22,woman,2022-03-05 00:56:22.000
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 01:00:00.000
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data 
>hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for [0:00:00 
>1:00:00]
>hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code}
 

The result:
{code:java}
> +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) 
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 

> +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
 
 
> +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
 

`hehuiyuan1,22,woman,2022-03-05 00:59:20.100` is lost, the lateTrigger is not 
trigger and the window[0:00:00 ,1:00:00] is cleaned when the data 
`hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived that updated watermark.

 

The window[0:00:00 ,1:00:00]   has 6 pieces of data, but we got 5.

The trigger is AfterEndOfWindowEarlyAndLate .

So WindowOpearator may need to emit reuslt when the window cleanupTimer call 
onEventTime.

 

I think the correct result is as follows:
{code:java}
> +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])

> +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6])
> +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
 


> The window result may not have been  emitted when use window emit feature and 
> use lateTrigger. 
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26498
>                 URL: https://issues.apache.org/jira/browse/FLINK-26498
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: hehuiyuan
>            Priority: Major
>         Attachments: image-2022-03-05-23-53-37-086.png, 
> image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png
>
>
> the sql of job :
> {code:java}
> CREATE TABLE tableSource(
>     name string,
>     age int not null,
>     sex string,
>     dt TIMESTAMP(3),
>     WATERMARK FOR dt AS dt - INTERVAL '0' SECOND
> ) WITH (
> );
> CREATE TABLE tableSink(
>     windowstart timestamp(3),
>     windowend timestamp(3),
>     name string,
>     age int,
>     cou bigint
> )
> WITH (
> );
> INSERT INTO tablesink
>   SELECT
>     TUMBLE_START(dt, INTERVAL '1' HOUR),
>     TUMBLE_END(dt, INTERVAL '1' HOUR),
>     name,
>     age,
>     count(sex)
> FROM tableSource
> GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code}
>  
> and table config:
> {code:java}
> table.exec.emit.allow-lateness = 1 hour 
> table.exec.emit.late-fire.delay = 1 min
> table.exec.emit.early-fire.delay = 1min{code}
>  
> The data:
> {code:java}
> >hehuiyuan1,22,woman,2022-03-05 00:30:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:40:22.000
>  //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:50:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:56:22.000
> //pause ,wait for the window trigger for earlyTrigger 1 min 
> >hehuiyuan1,22,woman,2022-03-05 01:00:00.000
> //pause ,wait for the window trigger for earlyTrigger 1 min 
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data
> //pause ,wait for the window trigger for earlyTrigger 1 min 
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data 
> >hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for 
> >[0:00:00 1:00:00]
> >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code}
>  
> The result:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) 
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
>  
>  
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>  
> `hehuiyuan1,22,woman,2022-03-05 00:59:20.100` window result is lost, the 
> lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is cleaned when 
> the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived that updated 
> watermark.
>  
> The window[0:00:00 ,1:00:00]   has 6 pieces of data, but we got 5.
> The trigger is AfterEndOfWindowEarlyAndLate .
> So WindowOpearator may need to emit reuslt when the window cleanupTimer call 
> onEventTime.
>  
> I think the correct result is as follows:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6])
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to