[ 
https://issues.apache.org/jira/browse/FLINK-34559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823012#comment-17823012
 ] 

xuyang commented on FLINK-34559:
--------------------------------

Hi, [~roman], I'm a little interested in how you would solve this problem. 

> limit the amount of data buffered in Global Aggregation nodes

IIUC, for non-session global window aggregation, it only stores the aggregated 
results in the state. I am a little curious about what buffering the data means 
here(maybe you mean input network buffer in operator level, right?).

> disable two-phase aggregations

Even if two-stage optimization is disabled, if the user's watermark interval is 
set very long, or the window is set very large, then in the global window 
aggregation node, the state will still be updated frequently.

As you mentioned, I believe it is now difficult to relate the availability of 
network buffers to the optimization of the plan, such as two-phase 
optimization. So, I'm somewhat looking forward to seeing your solution. :)

 

> TVF Window Aggregations might get stuck
> ---------------------------------------
>
>                 Key: FLINK-34559
>                 URL: https://issues.apache.org/jira/browse/FLINK-34559
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.0, 1.18.1
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.19.0
>
>
> RecordsWindowBuffer flushes buffered records in the following cases:
>  * watermark
>  * checkpoint barrier
>  * buffer overflow
>  
> In two-phase aggregations, this creates the following problems:
> 1) Local aggregation: enters hard-backpressure because for flush, it outputs 
> the data downstream and doesn't check network buffer availability
> This already disrupts normal checkpointing and watermarks progression
>  
> 2) Global aggregation: 
> When the window is large enough and/or the watermark is lagging, lots of data 
> is flushed to state backend (and the state is updated) in checkpoint SYNC 
> phase.
>  
> All this eventually causes checkpoint timeouts (10 minutes in our env).
>  
> Example query
> {code:java}
> INSERT INTO `target_table` 
> SELECT window_start, window_end, some, attributes, SUM(view_time) AS 
> total_view_time, COUNT(*) AS num, LISTAGG(DISTINCT page_url) AS pages 
> FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR($rowtime), INTERVAL '1' 
> HOUR)) 
> GROUP BY window_start, window_end, some, attributes;{code}
> In our setup, the issue can be reproduced deterministically.
>  
> As a quick fix, we might want to:
>  # limit the amount of data buffered in Global Aggregation nodes
>  # disable two-phase aggregations, i.e. Local Aggregations (we can try to 
> limit buffing there two, but network buffer availability can not be easily 
> checked from the operator)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to