[ https://issues.apache.org/jira/browse/FLINK-25205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531875#comment-17531875 ]
Yaroslav Tkachenko commented on FLINK-25205: -------------------------------------------- Hey everyone, I was going to open a similar issue, but I think this one can be quite generic. I wanted to share that the current implementation of SinkUpsertMaterializer kills my application throughput. First, my application when upsert-materialize is disabled: !without-SinkUpsertMaterializer.png! Then, my application with upsert-materialize enabled: !with-SinkUpsertMaterializer.png! When profiling, I see that it just spends time reading and updating RocksDB state (attached flamegraph: [^flamegraph-final.html]). After reading SinkUpsertMaterializer's source code I've realized that it performs a state read and state update for every single record. This looks especially bad when I have a table that needs to perform many upserts for the same key: the state list just keeps increasing with every record. I was thinking about some optimizations, e.g.: * Using ListState<RowData> instead of ValueState<List<RowData>> * Something more radical, like only keeping the list with records in memory and syncing it to state on every checkpoint? > Optimize SinkUpsertMaterializer > ------------------------------- > > Key: FLINK-25205 > URL: https://issues.apache.org/jira/browse/FLINK-25205 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime > Reporter: Jingsong Lee > Priority: Major > Attachments: flamegraph-final.html, with-SinkUpsertMaterializer.png, > without-SinkUpsertMaterializer.png > > > SinkUpsertMaterializer maintains incoming records in state corresponding to > the upsert keys and generates an upsert view for the downstream operator. > It is intended to solve the messy order problem caused by the upstream > computation, but it stores the data in the state, which will get bigger and > bigger. > If we can think that the disorder only occurs within the checkpoint, we can > consider cleaning up the state of each checkpoint, which can control the size > of the state. > We can consider adding an optimized config option first. -- This message was sent by Atlassian Jira (v8.20.7#820007)