This worked.

Thanks


On Sat, 17 Aug 2024 at 6:02 PM, Jiabao Sun <jiabao...@apache.org> wrote:

> Hi Sachin,
>
> The 'collectionList' needs to be filled with fully qualified names.
>
> For example,
> database: test_db
> collection: test_collection
>
> MongoDBSource.<T>builder()
>      .hosts(HOSTNAME)
>      .scheme(SCHEME)
>      .databaseList("test_db")
>      .collectionList("test_db.test_collection")
>      ...
>
> Best,
> Jiabao
>
> On 2024/08/17 12:16:39 Sachin Mittal wrote:
> > Hi,
> > I have configured a MongoDB CDC source as :
> >
> > MongoDBSource.<T>builder()
> >     .hosts(HOSTNAME)
> >     .scheme(SCHEME)
> >     .databaseList(MONGO_DB)
> >     .collectionList(collectionName)
> >     .username(USERNAME)
> >     .password(PASSWORD)
> >     .startupOptions(StartupOptions.initial())
> >     .batchSize(2048)
> >     .deserializer(
> >         new DebeziumDeserializationSchema<T>() {
> >
> >           @Override
> >           public TypeInformation<T> getProducedType() {
> >             return Types.POJO(clazz);
> >           }
> >
> >           @Override
> >           public void deserialize(SourceRecord record, Collector<T>
> collector) {
> >             logger.info("Reading source record {}", record);
> >             ...
> >           }
> >         })
> >     .build();
> >
> >
> > In the flink's task manager logs I see following:
> >
> > 2024-08-17 17:30:29,134 INFO
> >
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> > [] - Source reader 0 discovers table schema for stream split stream-split
> > success
> > 2024-08-17 17:30:29,134 INFO
> >
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> > [] - Source reader 0 received the stream split : StreamSplit{splitId=
> > 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
> > endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> > false}.
> > 2024-08-17 17:30:29,153 INFO
> org.apache.flink.connector.base.source.reader.
> > SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId=
> > 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
> > endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> > false}]
> > 2024-08-17 17:30:29,161 INFO
> > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> > Starting split fetcher 0
> > 2024-08-17 17:30:29,182 INFO
> > org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils [] -
> > Preparing change stream for database <db> with namespace regex filter
> > ^(<collection
> > name>)$
> > 2024-08-17 17:30:29,280 INFO
> > org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
> > MongoDBStreamFetchTask [] - Open the change stream at the timestamp:
> > Timestamp{value=7404077049079398401, seconds=1723896025, inc=1}
> >
> >
> > From the logs it seems that we are able to connect to the CDC stream and
> it
> > should start by loading existing records in the collections as snapshot
> is
> > set as initial.
> >
> > However I don't see any records being read or even any error in my Flink
> > UI/logs.
> >
> > Any idea what may be going wrong.
> >
> > Thanks
> > Sachin
> >
>

Reply via email to