Hi Sachin, The increamental mongodb source is under package `org.apache.flink.cdc.connectors.mongodb.source`, and the legacy source is under package `org.apache.flink.cdc.connectors.mongodb`.
Here's an example: ----------------------------------------------------------------------------- import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; MongoDBSource<String> mongoSource = MongoDBSource.<String>builder() .hosts("<hosts>") .databaseList("<database>") .collectionList("<database.collection>") .username(<username>) .password(<password>) .deserializer(new JsonDebeziumDeserializationSchema()) .closeIdleReaders(true) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); // set the source parallelism to 2 env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB Increamental Source") .setParallelism(2) .print() .setParallelism(1); env.execute("Print MongoDB Snapshot + Change Stream"); ----------------------------------------------------------------------------- Best, Jiabao On 2024/08/19 14:39:57 Sachin Mittal wrote: > Thanks for the explanation. > > One quick question how do I enable: > scan.incremental.snapshot.enabled = true > > for my MongoDBSource ? > > I don't see any option in the builder for the same. > > Regards > Sachin > > > > On Mon, Aug 19, 2024 at 8:00 PM Jiabao Sun <jiabao...@apache.org> wrote: > > > Sorry, in my previous reply, I mistakenly wrote Flink 2.0 instead of Flink > > CDC 2.0. > > I am correcting it here to avoid any misunderstanding. > > > > Incremental snapshot reading is a new feature introduced in Flink CDC 2.0. > > > > The following table shows the version mapping between Flink CDC Connectors > > and Flink[1]: > > > > Flink CDC Version Flink Version > > 1.0.0 1.11.* > > 1.1.0 1.11.* > > 1.2.0 1.12.* > > 1.3.0 1.12.* > > 1.4.0 1.13.* > > 2.0.* 1.13.* > > 2.1.* 1.13.* > > 2.2.* 1.13.*, 1.14.* > > 2.3.* 1.13.*, 1.14.*, 1.15.*, 1.16.* > > 2.4.* 1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.* > > 3.0.* 1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.* > > > > Best, > > Jiabao > > > > [1] > > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/#supported-flink-versions > > > > On 2024/08/19 09:27:48 Sachin Mittal wrote: > > > So basically if I set startupOptions(StartupOptions.initial()) > > > and also scan.incremental.snapshot.enabled = true > > > Then it would read from the source in parallel, thereby reading the > > entire > > > mongo collection faster. > > > > > > Am I understanding that correctly? > > > > > > Also I am using Flink 1.8, would it work with this version of flink ? > > > > > > Thanks > > > Sachin > > > > > > > > > On Mon, Aug 19, 2024 at 2:29 PM Jiabao Sun <jiabao...@apache.org> wrote: > > > > > > > Hi Sachin, > > > > > > > > Incremental snapshot reading is a new feature introduced in Flink 2.0. > > > > > > > > It has the following capabilities: > > > > - Source can be parallel during snapshot reading to improve snapshot > > speed > > > > - Source can perform checkpoints in the chunk granularity during > > snapshot > > > > reading > > > > > > > > Limitation: > > > > - MongoDB version needs to be greater than 4.0 > > > > > > > > Best, > > > > Jiabao > > > > > > > > On 2024/08/19 06:48:39 Sachin Mittal wrote: > > > > > Hi, > > > > > I am using mongodb cdc connector version 3.1.1 > > > > > I am connecting to mongodb atlas, which uses mongodb version 7.0. > > > > > > > > > > In the cdc connector I find a property: > > > > > > > > > > scan.incremental.snapshot.enabled with default as false. > > > > > > > > > > I wanted to know in what cases we should set this as true and what > > does > > > > > this property help with ? > > > > > > > > > > Please note that I am configuring my connector with: > > > > > > > > > > .startupOptions(StartupOptions.initial()) > > > > > .batchSize(2048) > > > > > > > > > > Thanks > > > > > Sachin > > > > > > > > > > > > > > >