[ 
https://issues.apache.org/jira/browse/FLINK-24501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17435136#comment-17435136
 ] 

JING ZHANG edited comment on FLINK-24501 at 10/28/21, 3:13 AM:
---------------------------------------------------------------

[~wenlong.lwl] Thank a lot for your attention to this issue.

> add an operator state in watermark assigner, to avoid it producing wrong 
>watermark after restore?

It is a rejected alternatives because of the following reasons, please correct 
me if I'm wrong.
 # I think we should not use operator state to store watermark.  Because if the 
job parallelism is changed when restore from a checkpoint/savepoint, how to 
merge each old watermark to a new watermark. If takes min value,  watermark may 
be reduce again after restoring from a checkpoint/savepoint. If takes max 
value, some normal data events would be mistaken processed as late event.
 # So we need keyed state to store watermark because of the above stream. But 
the watermark assigner operator does not and should not require inputs data to 
be keyedStream. Besides, if use keyed state to store watermark, we need 
read/write this state frequently. I am very worried that update this general 
operator (watermark assigner operator) will cause bad effect on performance.

So I decide to temporarily correct this bug in a small scope until we have a 
better general solution. 

Any better idea?


was (Author: qingru zhang):
[~wenlong.lwl] Thank a lot for your attention to this issue.

> add an operator state in watermark assigner, to avoid it producing wrong 
>watermark after restore?

It is a rejected alternatives because of the following reasons, please correct 
me if I'm wrong.
 # I think we should not use operator state to store watermark.  Because if the 
job parallelism is changed when restore from a checkpoint/savepoint, how to 
merge each old watermark to a new watermark. If takes min value,  watermark may 
be reduce gain after restoring from a checkpoint/savepoint. If takes max value, 
some normal data events would be mistaken processed as late event.
 # So we need keyed state to store watermark because of the above stream. But 
the watermark assigner operator does not and should not require inputs data to 
be keyedStream. Besides, if use keyed state to store watermark, we need 
read/write this state frequently. I am very worried that update this general 
operator (watermark assigner operator) will cause bad effect on performance.

So I decide to temporarily correct this bug in a small scope until we have a 
better general solution. 

Any better idea?

> Unexpected behavior of cumulate window aggregate for late event after recover 
> from sp/cp
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24501
>                 URL: https://issues.apache.org/jira/browse/FLINK-24501
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: JING ZHANG
>            Assignee: JING ZHANG
>            Priority: Major
>              Labels: pull-request-available
>
> *Problem description*
> After recover from savepoint or checkpoint, unexpected behavior of cumulate 
> window aggregate for late event may happened.
> *Bug analyze*
> Currently, for cumulate window aggregate, late events belongs to the cleaned 
> slice would be merged into the merged window state, and would be counted into 
> the later slice.
> For example, for a CUMULATE window, step is 1 minute, size is 1 day.
> {code:java}
> SELECT window_start, window_end, COUNT(USER_ID)
>   FROM TABLE(
>     CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL 
> '1' DAY))
>   GROUP BY window_start, window_end;{code}
> When the watermark already comes to 11:01, result of window [00:00, 11:01) 
> would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)
> Then if a late record which event time is 11:00 comes, it would be merged 
> into merged state, and would be counted into the later slice, for example, 
> for window [00:00, 11:02), [00:00, 11:03)... But the emitted window result 
> INSERT (00:00, 11:01, 4) would not be retracted and updated.
> The behavior would be different if the job recover from savepoint/checkpoint.
> Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, 
> 4).
> Then recover the job from savepoint. Watermarks are not checkpointed and they 
> need to be repopulated again. So after recovered, the watermark may rollback 
> to 11:00, then if a record which event time is 11:00 comes, it would not be 
> processed as late event, after watermark comes to 11:01 again, a window 
> result INSERT (00:00, 11:01, 5)  would be emitted to downstream.
> So the downstream operator would receive two INSERT record for WINDOW (00:00, 
> 11:01) which may leads to wrong result.
>  
> *Solution*
> There are two solutions for the problem:
>  # save watermark to state in slice shared operator. (Prefered)
>  # update the behavior for late event. For example, retract the emitted 
> result and send the updated result. It needs to change the behavior of slice 
> state clean mechanism because we clean the slice state after watermark 
> exceeds the slice end currently.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to