[ 
https://issues.apache.org/jira/browse/FLINK-25205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531875#comment-17531875
 ] 

Yaroslav Tkachenko commented on FLINK-25205:
--------------------------------------------

Hey everyone, I was going to open a similar issue, but I think this one can be 
quite generic. 

I wanted to share that the current implementation of SinkUpsertMaterializer 
kills my application throughput.

First, my application when upsert-materialize is disabled:

!without-SinkUpsertMaterializer.png!

Then, my application with upsert-materialize enabled:

!with-SinkUpsertMaterializer.png!

When profiling, I see that it just spends time reading and updating RocksDB 
state (attached flamegraph: [^flamegraph-final.html]).

After reading SinkUpsertMaterializer's source code I've realized that it 
performs a state read and state update for every single record. This looks 
especially bad when I have a table that needs to perform many upserts for the 
same key: the state list just keeps increasing with every record.

I was thinking about some optimizations, e.g.:
 * Using ListState<RowData> instead of ValueState<List<RowData>>
 * Something more radical, like only keeping the list with records in memory 
and syncing it to state on every checkpoint?

 

 

> Optimize SinkUpsertMaterializer
> -------------------------------
>
>                 Key: FLINK-25205
>                 URL: https://issues.apache.org/jira/browse/FLINK-25205
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>            Reporter: Jingsong Lee
>            Priority: Major
>         Attachments: flamegraph-final.html, with-SinkUpsertMaterializer.png, 
> without-SinkUpsertMaterializer.png
>
>
> SinkUpsertMaterializer maintains incoming records in state corresponding to 
> the upsert keys and generates an upsert view for the downstream operator.
> It is intended to solve the messy order problem caused by the upstream 
> computation, but it stores the data in the state, which will get bigger and 
> bigger.
> If we can think that the disorder only occurs within the checkpoint, we can 
> consider cleaning up the state of each checkpoint, which can control the size 
> of the state.
> We can consider adding an optimized config option first.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to