[ https://issues.apache.org/jira/browse/FLINK-34874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Leonard Xu reassigned FLINK-34874: ---------------------------------- Assignee: Runkang He > [MongoDB] Support initial.snapshotting.pipeline related configs in table api > ---------------------------------------------------------------------------- > > Key: FLINK-34874 > URL: https://issues.apache.org/jira/browse/FLINK-34874 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Reporter: Flink CDC Issue Import > Assignee: Runkang He > Priority: Major > Labels: github-import, pull-request-available > > ### Search before asking > - [X] I searched in the > [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found > nothing similar. > ### Motivation > MongoDB's startup.mode.copy.existing.pipeline(akka > initial.snapshotting.pipeline in mongo-cdc] is an array of JSON objects > describing the pipeline operations to run when copying existing data, see > [link|https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/startup/#std-label-source-configuration-startup]. > This can improve the use of indexes by the copying manager and make copying > more efficient, which is very important in some user scenarios. Besides, > there are also some related configs, like > startup.mode.copy.existing.queue.size, startup.mode.copy.existing.max.threads. > Currently we only support these configs in datastream api, for the > convenience of users, we should also support them in table api. > ### Solution > Support initial.snapshotting.pipeline related configs in table api > ### Alternatives > _No response_ > ### Anything else? > Note that in 2.3.0, we remove these configs from table api when support > incremental snapshot mode for MongoDB in this > [commit|https://github.com/ververica/flink-cdc-connectors/commit/301e5a8ab08f7b6c4414c0a81561b9a1bf7fab19], > since in incremental snapshot mode, the semantic is inconsistent when uses > the pipeline operations. The reason is that in snapshot phase of incremental > snapshot mode, the oplog will be played back after each snapshot to > compensate for changes, but the pipeline operations in copy.existing.pipeline > are not applied to the playback oplog, which means the semantic of this > config is inconsistent. > But in legacy debezium mode, the behaviour is correct, so we add these > configs back in debezium mode for better forward compatibility. And notify > user not to use them in incremental snapshot mode due to above reason. > ### Are you willing to submit a PR? > - [X] I'm willing to submit a PR! > ---------------- Imported from GitHub ---------------- > Url: https://github.com/apache/flink-cdc/issues/3069 > Created by: [herunkang2018|https://github.com/herunkang2018] > Labels: enhancement, > Assignee: [herunkang2018|https://github.com/herunkang2018] > Created at: Tue Feb 20 10:28:11 CST 2024 > State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)