This worked. Thanks
On Sat, 17 Aug 2024 at 6:02 PM, Jiabao Sun <jiabao...@apache.org> wrote: > Hi Sachin, > > The 'collectionList' needs to be filled with fully qualified names. > > For example, > database: test_db > collection: test_collection > > MongoDBSource.<T>builder() > .hosts(HOSTNAME) > .scheme(SCHEME) > .databaseList("test_db") > .collectionList("test_db.test_collection") > ... > > Best, > Jiabao > > On 2024/08/17 12:16:39 Sachin Mittal wrote: > > Hi, > > I have configured a MongoDB CDC source as : > > > > MongoDBSource.<T>builder() > > .hosts(HOSTNAME) > > .scheme(SCHEME) > > .databaseList(MONGO_DB) > > .collectionList(collectionName) > > .username(USERNAME) > > .password(PASSWORD) > > .startupOptions(StartupOptions.initial()) > > .batchSize(2048) > > .deserializer( > > new DebeziumDeserializationSchema<T>() { > > > > @Override > > public TypeInformation<T> getProducedType() { > > return Types.POJO(clazz); > > } > > > > @Override > > public void deserialize(SourceRecord record, Collector<T> > collector) { > > logger.info("Reading source record {}", record); > > ... > > } > > }) > > .build(); > > > > > > In the flink's task manager logs I see following: > > > > 2024-08-17 17:30:29,134 INFO > > > org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader > > [] - Source reader 0 discovers table schema for stream split stream-split > > success > > 2024-08-17 17:30:29,134 INFO > > > org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader > > [] - Source reader 0 received the stream split : StreamSplit{splitId= > > 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401}, > > endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= > > false}. > > 2024-08-17 17:30:29,153 INFO > org.apache.flink.connector.base.source.reader. > > SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId= > > 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401}, > > endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= > > false}] > > 2024-08-17 17:30:29,161 INFO > > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > > Starting split fetcher 0 > > 2024-08-17 17:30:29,182 INFO > > org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils [] - > > Preparing change stream for database <db> with namespace regex filter > > ^(<collection > > name>)$ > > 2024-08-17 17:30:29,280 INFO > > org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. > > MongoDBStreamFetchTask [] - Open the change stream at the timestamp: > > Timestamp{value=7404077049079398401, seconds=1723896025, inc=1} > > > > > > From the logs it seems that we are able to connect to the CDC stream and > it > > should start by loading existing records in the collections as snapshot > is > > set as initial. > > > > However I don't see any records being read or even any error in my Flink > > UI/logs. > > > > Any idea what may be going wrong. > > > > Thanks > > Sachin > > >