Kim Gräsman created FLINK-36146: ----------------------------------- Summary: NoSuchElement exception from SingleThreadFetcherManager Key: FLINK-36146 URL: https://issues.apache.org/jira/browse/FLINK-36146 Project: Flink Issue Type: Bug Components: API / Core Environment: N/A Reporter: Kim Gräsman
We're running Flink 1.14.2, but this appears to be an issue still on mainline, so I thought I'd report it. When running with high parallelism we've noticed a spurious error triggered by a FileSource reader from S3; {code:java} 2024-08-19 15:23:07,044 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000543131] 2024-08-19 15:23:07,044 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [0000543131] 2024-08-19 15:23:07,044 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 157 because it is idle. 2024-08-19 15:23:07,045 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 157 2024-08-19 15:23:07,045 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 157 exited. 2024-08-19 15:23:07,048 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [FileSourceSplit: ... [0, 21679984) hosts=[localhost] ID=0000201373 position=null] 2024-08-19 15:23:07,064 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader. 2024-08-19 15:23:07,069 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: ... -> ... (114/1602)#0 (...) switched from RUNNING to FAILED with failure cause: java.util.NoSuchElementException at java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471) at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94) at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82) at org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242) at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428) at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:70) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:83) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:829) {code} I believe this may be caused by a tiny TOCTOU race in {{{}SingleThreadedFetcherManager{}}}. I'll admit that I don't fully understand what the execution flows through that code look like, but the use of atomic and synchronized indicate that it's used by multiple threads. If that's not the case, this report can be safely ignored. The backtrace points to [https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165|https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165.] And it looks like the concurrent hash map might be modified between the check for {{isEmpty()}} and the call to {{{}fetchers.values().iterator().next(){}}}. I would suggest Python-style: {code:java} try { return fetchers.values().iterator().next(); } catch (NoSuchElementException) { return null; }{code} here instead, which should let {{ConcurrentHashMap}} handle its synchronization internally. For some reason we were able to reproduce consistently with 2 Task Managers and 2 slots per node, but not with 1 Task Manager and 4 slots, if that helps construct a repro test case (presumably more interlocking from {{synchronized}} in a single-TM environment, but not sure). -- This message was sent by Atlassian Jira (v8.20.10#820010)