I am working on developing a custom source with the new Source api.

What I'm noticing is that during periods of low incoming data it repeatedly
will shutdown and restart the fetchers when the split assignments are empty
and periodically added.

I get log message such as

org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher -
Finished running task FetchTask
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher -
Cleaned wakeup flag.
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished
reading split(s) [2022-03-04T17:09:29.000Z - 2022-03-04T17:09:34.000Z]
 org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager
- Closing splitFetcher 6412 because it is idle.
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher -
Shutting down split fetcher 6412

But then right at the 5 seconds mark when a new split is available to be
assigned it will go recreate a new fetcher to read it.

The fetchers in this instance are not terribly lightweight with some spinup
time and the constant churn of creation/dropping/recreation is an
unnecessary waste of CPU time and latency.

Is there something I am missing in the new Source API to prevent this
fetcher churn?   It doesn't occur under periods of high activity (splits
constantly available to be assigned), but only when the incoming data is
just a single split here and there. (Think IOT sensor data that arrives in
chunks of records when connectivity is available.)

Any ideas?

Thanks!
Jonathan

Reply via email to