Hi Sachin,
The increamental mongodb source is under package
`org.apache.flink.cdc.connectors.mongodb.source`, and the legacy source is
under package `org.apache.flink.cdc.connectors.mongodb`.
Here's an example:
-----------------------------------------------------------------------------
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts("<hosts>")
.databaseList("<database>")
.collectionList("<database.collection>")
.username(<username>)
.password(<password>)
.deserializer(new JsonDebeziumDeserializationSchema())
.closeIdleReaders(true)
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 2
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB
Increamental Source")
.setParallelism(2)
.print()
.setParallelism(1);
env.execute("Print MongoDB Snapshot + Change Stream");
-----------------------------------------------------------------------------
Best,
Jiabao
On 2024/08/19 14:39:57 Sachin Mittal wrote:
> Thanks for the explanation.
>
> One quick question how do I enable:
> scan.incremental.snapshot.enabled = true
>
> for my MongoDBSource ?
>
> I don't see any option in the builder for the same.
>
> Regards
> Sachin
>
>
>
> On Mon, Aug 19, 2024 at 8:00 PM Jiabao Sun <[email protected]> wrote:
>
> > Sorry, in my previous reply, I mistakenly wrote Flink 2.0 instead of Flink
> > CDC 2.0.
> > I am correcting it here to avoid any misunderstanding.
> >
> > Incremental snapshot reading is a new feature introduced in Flink CDC 2.0.
> >
> > The following table shows the version mapping between Flink CDC Connectors
> > and Flink[1]:
> >
> > Flink CDC Version Flink Version
> > 1.0.0 1.11.*
> > 1.1.0 1.11.*
> > 1.2.0 1.12.*
> > 1.3.0 1.12.*
> > 1.4.0 1.13.*
> > 2.0.* 1.13.*
> > 2.1.* 1.13.*
> > 2.2.* 1.13.*, 1.14.*
> > 2.3.* 1.13.*, 1.14.*, 1.15.*, 1.16.*
> > 2.4.* 1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*
> > 3.0.* 1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.*
> >
> > Best,
> > Jiabao
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/#supported-flink-versions
> >
> > On 2024/08/19 09:27:48 Sachin Mittal wrote:
> > > So basically if I set startupOptions(StartupOptions.initial())
> > > and also scan.incremental.snapshot.enabled = true
> > > Then it would read from the source in parallel, thereby reading the
> > entire
> > > mongo collection faster.
> > >
> > > Am I understanding that correctly?
> > >
> > > Also I am using Flink 1.8, would it work with this version of flink ?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Mon, Aug 19, 2024 at 2:29 PM Jiabao Sun <[email protected]> wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > Incremental snapshot reading is a new feature introduced in Flink 2.0.
> > > >
> > > > It has the following capabilities:
> > > > - Source can be parallel during snapshot reading to improve snapshot
> > speed
> > > > - Source can perform checkpoints in the chunk granularity during
> > snapshot
> > > > reading
> > > >
> > > > Limitation:
> > > > - MongoDB version needs to be greater than 4.0
> > > >
> > > > Best,
> > > > Jiabao
> > > >
> > > > On 2024/08/19 06:48:39 Sachin Mittal wrote:
> > > > > Hi,
> > > > > I am using mongodb cdc connector version 3.1.1
> > > > > I am connecting to mongodb atlas, which uses mongodb version 7.0.
> > > > >
> > > > > In the cdc connector I find a property:
> > > > >
> > > > > scan.incremental.snapshot.enabled with default as false.
> > > > >
> > > > > I wanted to know in what cases we should set this as true and what
> > does
> > > > > this property help with ?
> > > > >
> > > > > Please note that I am configuring my connector with:
> > > > >
> > > > > .startupOptions(StartupOptions.initial())
> > > > > .batchSize(2048)
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > >
> > >
> >
>