[ 
https://issues.apache.org/jira/browse/FLINK-34874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-34874:
----------------------------------

    Assignee: Runkang He

> [MongoDB] Support initial.snapshotting.pipeline related configs in table api
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-34874
>                 URL: https://issues.apache.org/jira/browse/FLINK-34874
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>            Reporter: Flink CDC Issue Import
>            Assignee: Runkang He
>            Priority: Major
>              Labels: github-import, pull-request-available
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
> nothing similar.
> ### Motivation
> MongoDB's startup.mode.copy.existing.pipeline(akka 
> initial.snapshotting.pipeline in mongo-cdc] is an array of JSON objects 
> describing the pipeline operations to run when copying existing data, see 
> [link|https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/startup/#std-label-source-configuration-startup].
>  This can improve the use of indexes by the copying manager and make copying 
> more efficient, which is very important in some user scenarios. Besides, 
> there are also some related configs, like 
> startup.mode.copy.existing.queue.size, startup.mode.copy.existing.max.threads.
> Currently we only support these configs in datastream api, for the 
> convenience of users, we should also support them in table api.
> ### Solution
> Support initial.snapshotting.pipeline related configs in table api
> ### Alternatives
> _No response_
> ### Anything else?
> Note that in 2.3.0, we remove these configs from table api when support 
> incremental snapshot mode for MongoDB in this 
> [commit|https://github.com/ververica/flink-cdc-connectors/commit/301e5a8ab08f7b6c4414c0a81561b9a1bf7fab19],
>  since in incremental snapshot mode, the semantic is inconsistent when uses 
> the pipeline operations. The reason is that in snapshot phase of incremental 
> snapshot mode, the oplog will be played back after each snapshot to 
> compensate for changes, but the pipeline operations in copy.existing.pipeline 
> are not applied to the playback oplog, which means the semantic of this 
> config is inconsistent.
> But in legacy debezium mode, the behaviour is correct, so we add these 
> configs back in debezium mode for better forward compatibility. And notify 
> user not to use them in incremental snapshot mode due to above reason.
> ### Are you willing to submit a PR?
> - [X] I'm willing to submit a PR!
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/apache/flink-cdc/issues/3069
> Created by: [herunkang2018|https://github.com/herunkang2018]
> Labels: enhancement, 
> Assignee: [herunkang2018|https://github.com/herunkang2018]
> Created at: Tue Feb 20 10:28:11 CST 2024
> State: open



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to