Hi Sachin, The 'collectionList' needs to be filled with fully qualified names.
For example, database: test_db collection: test_collection MongoDBSource.<T>builder() .hosts(HOSTNAME) .scheme(SCHEME) .databaseList("test_db") .collectionList("test_db.test_collection") ... Best, Jiabao On 2024/08/17 12:16:39 Sachin Mittal wrote: > Hi, > I have configured a MongoDB CDC source as : > > MongoDBSource.<T>builder() > .hosts(HOSTNAME) > .scheme(SCHEME) > .databaseList(MONGO_DB) > .collectionList(collectionName) > .username(USERNAME) > .password(PASSWORD) > .startupOptions(StartupOptions.initial()) > .batchSize(2048) > .deserializer( > new DebeziumDeserializationSchema<T>() { > > @Override > public TypeInformation<T> getProducedType() { > return Types.POJO(clazz); > } > > @Override > public void deserialize(SourceRecord record, Collector<T> > collector) { > logger.info("Reading source record {}", record); > ... > } > }) > .build(); > > > In the flink's task manager logs I see following: > > 2024-08-17 17:30:29,134 INFO > org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader > [] - Source reader 0 discovers table schema for stream split stream-split > success > 2024-08-17 17:30:29,134 INFO > org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader > [] - Source reader 0 received the stream split : StreamSplit{splitId= > 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401}, > endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= > false}. > 2024-08-17 17:30:29,153 INFO org.apache.flink.connector.base.source.reader. > SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId= > 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401}, > endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= > false}] > 2024-08-17 17:30:29,161 INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Starting split fetcher 0 > 2024-08-17 17:30:29,182 INFO > org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils [] - > Preparing change stream for database <db> with namespace regex filter > ^(<collection > name>)$ > 2024-08-17 17:30:29,280 INFO > org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. > MongoDBStreamFetchTask [] - Open the change stream at the timestamp: > Timestamp{value=7404077049079398401, seconds=1723896025, inc=1} > > > From the logs it seems that we are able to connect to the CDC stream and it > should start by loading existing records in the collections as snapshot is > set as initial. > > However I don't see any records being read or even any error in my Flink > UI/logs. > > Any idea what may be going wrong. > > Thanks > Sachin >