Kim Gräsman created FLINK-36146:
-----------------------------------
Summary: NoSuchElement exception from SingleThreadFetcherManager
Key: FLINK-36146
URL: https://issues.apache.org/jira/browse/FLINK-36146
Project: Flink
Issue Type: Bug
Components: API / Core
Environment: N/A
Reporter: Kim Gräsman
We're running Flink 1.14.2, but this appears to be an issue still on mainline,
so I thought I'd report it.
When running with high parallelism we've noticed a spurious error triggered by
a FileSource reader from S3;
{code:java}
2024-08-19 15:23:07,044 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished
reading split(s) [0000543131]
2024-08-19 15:23:07,044 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished reading from splits [0000543131]
2024-08-19 15:23:07,044 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] -
Closing splitFetcher 157 because it is idle.
2024-08-19 15:23:07,045 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Shutting down split fetcher 157
2024-08-19 15:23:07,045 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split
fetcher 157 exited.
2024-08-19 15:23:07,048 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
split(s) to reader: [FileSourceSplit: ... [0, 21679984) hosts=[localhost]
ID=0000201373 position=null]
2024-08-19 15:23:07,064 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing
Source Reader.
2024-08-19 15:23:07,069 WARN org.apache.flink.runtime.taskmanager.Task
[] - Source: ... -> ... (114/1602)#0 (...) switched from RUNNING to
FAILED with failure cause: java.util.NoSuchElementException
at
java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471)
at
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94)
at
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242)
at
org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428)
at
org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:70)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:83)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1473)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
I believe this may be caused by a tiny TOCTOU race in
{{{}SingleThreadedFetcherManager{}}}. I'll admit that I don't fully understand
what the execution flows through that code look like, but the use of atomic and
synchronized indicate that it's used by multiple threads. If that's not the
case, this report can be safely ignored.
The backtrace points to
[https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165|https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165.]
And it looks like the concurrent hash map might be modified between the check
for {{isEmpty()}} and the call to {{{}fetchers.values().iterator().next(){}}}.
I would suggest Python-style:
{code:java}
try {
return fetchers.values().iterator().next();
} catch (NoSuchElementException) {
return null;
}{code}
here instead, which should let {{ConcurrentHashMap}} handle its synchronization
internally.
For some reason we were able to reproduce consistently with 2 Task Managers and
2 slots per node, but not with 1 Task Manager and 4 slots, if that helps
construct a repro test case (presumably more interlocking from {{synchronized}}
in a single-TM environment, but not sure).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)