Hi Sachin,

The increamental mongodb source is under package 
`org.apache.flink.cdc.connectors.mongodb.source`, and the legacy source is 
under package `org.apache.flink.cdc.connectors.mongodb`.

Here's an example:

-----------------------------------------------------------------------------
  import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;

  MongoDBSource<String> mongoSource =
          MongoDBSource.<String>builder()
                  .hosts("<hosts>")
                  .databaseList("<database>")
                  .collectionList("<database.collection>")
                  .username(<username>)
                  .password(<password>)
                  .deserializer(new JsonDebeziumDeserializationSchema())
                  .closeIdleReaders(true)
                  .build();
 
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  // enable checkpoint
  env.enableCheckpointing(3000);
  // set the source parallelism to 2
  env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB 
Increamental Source")
          .setParallelism(2)
          .print()
          .setParallelism(1);
 
  env.execute("Print MongoDB Snapshot + Change Stream");
-----------------------------------------------------------------------------

Best,
Jiabao

On 2024/08/19 14:39:57 Sachin Mittal wrote:
> Thanks for the explanation.
> 
> One quick question how do I enable:
> scan.incremental.snapshot.enabled = true
> 
> for my MongoDBSource ?
> 
> I don't see any option in the builder for the same.
> 
> Regards
> Sachin
> 
> 
> 
> On Mon, Aug 19, 2024 at 8:00 PM Jiabao Sun <jiabao...@apache.org> wrote:
> 
> > Sorry, in my previous reply, I mistakenly wrote Flink 2.0 instead of Flink
> > CDC 2.0.
> > I am correcting it here to avoid any misunderstanding.
> >
> > Incremental snapshot reading is a new feature introduced in Flink CDC 2.0.
> >
> > The following table shows the version mapping between Flink CDC Connectors
> > and Flink[1]:
> >
> > Flink CDC Version       Flink Version
> > 1.0.0   1.11.*
> > 1.1.0   1.11.*
> > 1.2.0   1.12.*
> > 1.3.0   1.12.*
> > 1.4.0   1.13.*
> > 2.0.*           1.13.*
> > 2.1.*           1.13.*
> > 2.2.*           1.13.*, 1.14.*
> > 2.3.*           1.13.*, 1.14.*, 1.15.*, 1.16.*
> > 2.4.*           1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*
> > 3.0.*           1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.*
> >
> > Best,
> > Jiabao
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/#supported-flink-versions
> >
> > On 2024/08/19 09:27:48 Sachin Mittal wrote:
> > > So basically if I set startupOptions(StartupOptions.initial())
> > > and also scan.incremental.snapshot.enabled = true
> > > Then it would read from the source in parallel, thereby reading the
> > entire
> > > mongo collection faster.
> > >
> > > Am I understanding that correctly?
> > >
> > > Also I am using Flink 1.8, would it work with this version of flink ?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Mon, Aug 19, 2024 at 2:29 PM Jiabao Sun <jiabao...@apache.org> wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > Incremental snapshot reading is a new feature introduced in Flink 2.0.
> > > >
> > > > It has the following capabilities:
> > > > - Source can be parallel during snapshot reading to improve snapshot
> > speed
> > > > - Source can perform checkpoints in the chunk granularity during
> > snapshot
> > > > reading
> > > >
> > > > Limitation:
> > > > - MongoDB version needs to be greater than 4.0
> > > >
> > > > Best,
> > > > Jiabao
> > > >
> > > > On 2024/08/19 06:48:39 Sachin Mittal wrote:
> > > > > Hi,
> > > > > I am using mongodb cdc connector version 3.1.1
> > > > > I am connecting to mongodb atlas, which uses mongodb version 7.0.
> > > > >
> > > > > In the cdc connector I find a property:
> > > > >
> > > > > scan.incremental.snapshot.enabled with default as false.
> > > > >
> > > > > I wanted to know in what cases we should set this as true and what
> > does
> > > > > this property help with ?
> > > > >
> > > > > Please note that I am configuring my connector with:
> > > > >
> > > > > .startupOptions(StartupOptions.initial())
> > > > > .batchSize(2048)
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > >
> > >
> >
> 

Reply via email to