[ https://issues.apache.org/jira/browse/FLINK-36146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881648#comment-17881648 ]
Jiangjie Qin commented on FLINK-36146: -------------------------------------- Yes, this is an apparent race condition. Are you interested in submitting a patch? The fix might be something like following: {code:java} Iterator<SplitFetcher<E, SplitT>> iter = fetchers.values().iterator(); return iter.hasNext() ? iter.next() : null;{code} > NoSuchElement exception from SingleThreadFetcherManager > ------------------------------------------------------- > > Key: FLINK-36146 > URL: https://issues.apache.org/jira/browse/FLINK-36146 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: AWS EMR/Yarn > Reporter: Kim Gräsman > Priority: Minor > > We're running Flink 1.14.2, but this appears to be an issue still on > mainline, so I thought I'd report it. > When running with high parallelism we've noticed a spurious error triggered > by a FileSource reader from S3; > {code:java} > 2024-08-19 15:23:07,044 INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished > reading split(s) [0000543131] > 2024-08-19 15:23:07,044 INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished reading from splits [0000543131] > 2024-08-19 15:23:07,044 INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] > - Closing splitFetcher 157 because it is idle. > 2024-08-19 15:23:07,045 INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Shutting down split fetcher 157 > 2024-08-19 15:23:07,045 INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split > fetcher 157 exited. > 2024-08-19 15:23:07,048 INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding > split(s) to reader: [FileSourceSplit: ... [0, 21679984) hosts=[localhost] > ID=0000201373 position=null] > 2024-08-19 15:23:07,064 INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing > Source Reader. > 2024-08-19 15:23:07,069 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: ... -> ... (114/1602)#0 (...) switched from RUNNING > to FAILED with failure cause: java.util.NoSuchElementException > at > java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471) > at > org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94) > at > org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242) > at > org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428) > at > org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:70) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:83) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1473) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Thread.java:829) {code} > I believe this may be caused by a tiny TOCTOU race in > {{{}SingleThreadedFetcherManager{}}}. I'll admit that I don't fully > understand what the execution flows through that code look like, but the use > of atomic and synchronized indicate that it's used by multiple threads. If > that's not the case, this report can be safely ignored. > The backtrace points to > [https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165|https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165.] > And it looks like the concurrent hash map might be modified between the check > for {{isEmpty()}} and the call to {{{}fetchers.values().iterator().next(){}}}. > I would suggest Python-style: > {code:java} > try { > return fetchers.values().iterator().next(); > } catch (NoSuchElementException) { > return null; > }{code} > here instead, which should let {{ConcurrentHashMap}} handle its > synchronization internally. > For some reason we were able to reproduce consistently with 2 Task Managers > and 2 slots per node, but not with 1 Task Manager and 4 slots, if that helps > construct a repro test case (presumably more interlocking from > {{synchronized}} in a single-TM environment, but not sure). -- This message was sent by Atlassian Jira (v8.20.10#820010)