kimgr opened a new pull request, #25340: URL: https://github.com/apache/flink/pull/25340
This issue triggered when running a highly parallel batch job with FileSource pointed at a large S3 bucket. The visible symptom was: java.util.NoSuchElementException at java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471) at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94) at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82) at org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242) at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428) ... While the exact sequence of events is unclear, there's an obvious time-of-check/time-of-use bug in getRunningFetcher where fetchers may be cleared between the time it's checked for empty and the time we try to take the first element. Use an Iterator, which provides a consistent view of the data even if the underlying collection changes. ## What is the purpose of the change There is a tiny race condition in `SingleThreadFetcherManager` that can cause `java.util.NoSuchElementException` on reader close. This patch fixes the race condition. ## Brief change log Use the consistent view provided by `ConcurrentHashMap`'s `ValuesView` and its iterator. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. (Caveat: it would be possible to verify this using a multithreaded stress test, but they usually require some time to run to be useful. I did run a reduced testcase for `ConcurrentHashMap` to see that the fix behaves better than the original code.) ## Does this pull request potentially affect one of the following parts: - The S3 file system connector: actually, yes! But not in a way that should be noticeable except for better stability. ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org