Hi, If I understand the code correctly, the only option is to implement a custom SplitFetcherManager. There, you can either: 1) override maybeShutdownFinishedFetchers(), or 2) override createSplitFetcher() to return a custom fetcher; that fetcher would override isIdle() and return true after some delay after being marked as such.
I don't see any configuration options. I'm also pulling in Becket who could probably suggest a better solution. Regards, Roman On Fri, Mar 4, 2022 at 6:21 PM Jonathan Weaver <myordinat...@gmail.com> wrote: > > I am working on developing a custom source with the new Source api. > > What I'm noticing is that during periods of low incoming data it repeatedly > will shutdown and restart the fetchers when the split assignments are empty > and periodically added. > > I get log message such as > > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > running task FetchTask > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Cleaned > wakeup flag. > org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [2022-03-04T17:09:29.000Z - 2022-03-04T17:09:34.000Z] > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager - > Closing splitFetcher 6412 because it is idle. > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 6412 > > But then right at the 5 seconds mark when a new split is available to be > assigned it will go recreate a new fetcher to read it. > > The fetchers in this instance are not terribly lightweight with some spinup > time and the constant churn of creation/dropping/recreation is an unnecessary > waste of CPU time and latency. > > Is there something I am missing in the new Source API to prevent this fetcher > churn? It doesn't occur under periods of high activity (splits constantly > available to be assigned), but only when the incoming data is just a single > split here and there. (Think IOT sensor data that arrives in chunks of > records when connectivity is available.) > > Any ideas? > > Thanks! > Jonathan >