Hi Sachin,

The 'collectionList' needs to be filled with fully qualified names.

For example,
database: test_db
collection: test_collection

MongoDBSource.<T>builder()
     .hosts(HOSTNAME)
     .scheme(SCHEME)
     .databaseList("test_db")
     .collectionList("test_db.test_collection")
     ...

Best,
Jiabao

On 2024/08/17 12:16:39 Sachin Mittal wrote:
> Hi,
> I have configured a MongoDB CDC source as :
> 
> MongoDBSource.<T>builder()
>     .hosts(HOSTNAME)
>     .scheme(SCHEME)
>     .databaseList(MONGO_DB)
>     .collectionList(collectionName)
>     .username(USERNAME)
>     .password(PASSWORD)
>     .startupOptions(StartupOptions.initial())
>     .batchSize(2048)
>     .deserializer(
>         new DebeziumDeserializationSchema<T>() {
> 
>           @Override
>           public TypeInformation<T> getProducedType() {
>             return Types.POJO(clazz);
>           }
> 
>           @Override
>           public void deserialize(SourceRecord record, Collector<T> 
> collector) {
>             logger.info("Reading source record {}", record);
>             ...
>           }
>         })
>     .build();
> 
> 
> In the flink's task manager logs I see following:
> 
> 2024-08-17 17:30:29,134 INFO
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> [] - Source reader 0 discovers table schema for stream split stream-split
> success
> 2024-08-17 17:30:29,134 INFO
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> [] - Source reader 0 received the stream split : StreamSplit{splitId=
> 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
> endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> false}.
> 2024-08-17 17:30:29,153 INFO org.apache.flink.connector.base.source.reader.
> SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId=
> 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
> endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> false}]
> 2024-08-17 17:30:29,161 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Starting split fetcher 0
> 2024-08-17 17:30:29,182 INFO
> org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils [] -
> Preparing change stream for database <db> with namespace regex filter
> ^(<collection
> name>)$
> 2024-08-17 17:30:29,280 INFO
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
> MongoDBStreamFetchTask [] - Open the change stream at the timestamp:
> Timestamp{value=7404077049079398401, seconds=1723896025, inc=1}
> 
> 
> From the logs it seems that we are able to connect to the CDC stream and it
> should start by loading existing records in the collections as snapshot is
> set as initial.
> 
> However I don't see any records being read or even any error in my Flink
> UI/logs.
> 
> Any idea what may be going wrong.
> 
> Thanks
> Sachin
> 

Reply via email to