Hi,

If I understand the code correctly, the only option is to implement a
custom SplitFetcherManager. There, you can either:
1) override maybeShutdownFinishedFetchers(), or
2) override createSplitFetcher() to return a custom fetcher; that
fetcher would override isIdle() and return true after some delay after
being marked as such.

I don't see any configuration options.

I'm also pulling in Becket who could probably suggest a better solution.

Regards,
Roman

On Fri, Mar 4, 2022 at 6:21 PM Jonathan Weaver <myordinat...@gmail.com> wrote:
>
> I am working on developing a custom source with the new Source api.
>
> What I'm noticing is that during periods of low incoming data it repeatedly 
> will shutdown and restart the fetchers when the split assignments are empty 
> and periodically added.
>
> I get log message such as
>
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished 
> running task FetchTask
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Cleaned 
> wakeup flag.
> org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished 
> reading split(s) [2022-03-04T17:09:29.000Z - 2022-03-04T17:09:34.000Z]
>  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager - 
> Closing splitFetcher 6412 because it is idle.
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting 
> down split fetcher 6412
>
> But then right at the 5 seconds mark when a new split is available to be 
> assigned it will go recreate a new fetcher to read it.
>
> The fetchers in this instance are not terribly lightweight with some spinup 
> time and the constant churn of creation/dropping/recreation is an unnecessary 
> waste of CPU time and latency.
>
> Is there something I am missing in the new Source API to prevent this fetcher 
> churn?   It doesn't occur under periods of high activity (splits constantly 
> available to be assigned), but only when the incoming data is just a single 
> split here and there. (Think IOT sensor data that arrives in chunks of 
> records when connectivity is available.)
>
> Any ideas?
>
> Thanks!
> Jonathan
>

Reply via email to