Hi, I have configured a MongoDB CDC source as : MongoDBSource.<T>builder() .hosts(HOSTNAME) .scheme(SCHEME) .databaseList(MONGO_DB) .collectionList(collectionName) .username(USERNAME) .password(PASSWORD) .startupOptions(StartupOptions.initial()) .batchSize(2048) .deserializer( new DebeziumDeserializationSchema<T>() {
@Override public TypeInformation<T> getProducedType() { return Types.POJO(clazz); } @Override public void deserialize(SourceRecord record, Collector<T> collector) { logger.info("Reading source record {}", record); ... } }) .build(); In the flink's task manager logs I see following: 2024-08-17 17:30:29,134 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader [] - Source reader 0 discovers table schema for stream split stream-split success 2024-08-17 17:30:29,134 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader [] - Source reader 0 received the stream split : StreamSplit{splitId= 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401}, endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= false}. 2024-08-17 17:30:29,153 INFO org.apache.flink.connector.base.source.reader. SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId= 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401}, endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= false}] 2024-08-17 17:30:29,161 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0 2024-08-17 17:30:29,182 INFO org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils [] - Preparing change stream for database <db> with namespace regex filter ^(<collection name>)$ 2024-08-17 17:30:29,280 INFO org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. MongoDBStreamFetchTask [] - Open the change stream at the timestamp: Timestamp{value=7404077049079398401, seconds=1723896025, inc=1} >From the logs it seems that we are able to connect to the CDC stream and it should start by loading existing records in the collections as snapshot is set as initial. However I don't see any records being read or even any error in my Flink UI/logs. Any idea what may be going wrong. Thanks Sachin