Hi, The Flink SQL Planner uses the ChangelogNormalize operator to cache all incoming data for upsert type Changelog in order to complete the pre-image values, which results in additional state overhead.
When the MongoDB version is below 6.0, the oplog does not contain Pre-Images of changed record, so it can only be converted into a Flink upsert type changelog. It is recommended to use a state backend that supports large states, such as the RocksDB backend. In MongoDB version 6.0, the Pre- and Post-Images option is provided, allowing the Change Stream to return complete pre- and post-image values of changes. With the complete change image configuration enabled[1], the Flink SQL Planner will remove the Changelog Normalize operator, thereby significantly reducing state overhead and improving throughput. Best, Jiabao [1] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/flink-sources/mongodb-cdc/#full-changeloga-namefull-changelog-id003-a wangye...@yeah.net <wangye...@yeah.net> 于2024年11月16日周六 21:26写道: > Hi all: > While using Flink with MongoDB CDC, I've noticed that my Flink job > causes MongoDB's memory usage to continuously increase. Below, I will > detail the specific scenario to help identify where the issue lies. > > 1. MongoDB deployment architecture: sharded. > 2. The memory usage of the MongoDB config server keeps increasing. > 3. Flink SQL task: This task separately monitors the incremental data > of two documents, processes and aggregates them, and then writes the > results to MySQL. > > > ------------------------------ > wangye...@yeah.net >