Hi,
I have configured a MongoDB CDC source as :

MongoDBSource.<T>builder()
    .hosts(HOSTNAME)
    .scheme(SCHEME)
    .databaseList(MONGO_DB)
    .collectionList(collectionName)
    .username(USERNAME)
    .password(PASSWORD)
    .startupOptions(StartupOptions.initial())
    .batchSize(2048)
    .deserializer(
        new DebeziumDeserializationSchema<T>() {

          @Override
          public TypeInformation<T> getProducedType() {
            return Types.POJO(clazz);
          }

          @Override
          public void deserialize(SourceRecord record, Collector<T> collector) {
            logger.info("Reading source record {}", record);
            ...
          }
        })
    .build();


In the flink's task manager logs I see following:

2024-08-17 17:30:29,134 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 discovers table schema for stream split stream-split
success
2024-08-17 17:30:29,134 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 received the stream split : StreamSplit{splitId=
'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
false}.
2024-08-17 17:30:29,153 INFO org.apache.flink.connector.base.source.reader.
SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId=
'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
false}]
2024-08-17 17:30:29,161 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Starting split fetcher 0
2024-08-17 17:30:29,182 INFO
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils [] -
Preparing change stream for database <db> with namespace regex filter
^(<collection
name>)$
2024-08-17 17:30:29,280 INFO
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
MongoDBStreamFetchTask [] - Open the change stream at the timestamp:
Timestamp{value=7404077049079398401, seconds=1723896025, inc=1}


>From the logs it seems that we are able to connect to the CDC stream and it
should start by loading existing records in the collections as snapshot is
set as initial.

However I don't see any records being read or even any error in my Flink
UI/logs.

Any idea what may be going wrong.

Thanks
Sachin

Reply via email to