I am working on developing a custom source with the new Source api. What I'm noticing is that during periods of low incoming data it repeatedly will shutdown and restart the fetchers when the split assignments are empty and periodically added.
I get log message such as org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished running task FetchTask org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Cleaned wakeup flag. org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished reading split(s) [2022-03-04T17:09:29.000Z - 2022-03-04T17:09:34.000Z] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager - Closing splitFetcher 6412 because it is idle. org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting down split fetcher 6412 But then right at the 5 seconds mark when a new split is available to be assigned it will go recreate a new fetcher to read it. The fetchers in this instance are not terribly lightweight with some spinup time and the constant churn of creation/dropping/recreation is an unnecessary waste of CPU time and latency. Is there something I am missing in the new Source API to prevent this fetcher churn? It doesn't occur under periods of high activity (splits constantly available to be assigned), but only when the incoming data is just a single split here and there. (Think IOT sensor data that arrives in chunks of records when connectivity is available.) Any ideas? Thanks! Jonathan