Kim Gräsman created FLINK-36146:
-----------------------------------

             Summary: NoSuchElement exception from SingleThreadFetcherManager
                 Key: FLINK-36146
                 URL: https://issues.apache.org/jira/browse/FLINK-36146
             Project: Flink
          Issue Type: Bug
          Components: API / Core
         Environment: N/A
            Reporter: Kim Gräsman


We're running Flink 1.14.2, but this appears to be an issue still on mainline, 
so I thought I'd report it.

When running with high parallelism we've noticed a spurious error triggered by 
a FileSource reader from S3;
{code:java}
2024-08-19 15:23:07,044 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [0000543131]
2024-08-19 15:23:07,044 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [0000543131]
2024-08-19 15:23:07,044 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Closing splitFetcher 157 because it is idle.
2024-08-19 15:23:07,045 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Shutting down split fetcher 157
2024-08-19 15:23:07,045 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
fetcher 157 exited.
2024-08-19 15:23:07,048 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [FileSourceSplit: ... [0, 21679984)  hosts=[localhost] 
ID=0000201373 position=null]
2024-08-19 15:23:07,064 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
Source Reader.
2024-08-19 15:23:07,069 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: ... -> ... (114/1602)#0 (...) switched from RUNNING to 
FAILED with failure cause: java.util.NoSuchElementException
        at 
java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:70)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:83)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1473)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.base/java.lang.Thread.run(Thread.java:829) {code}
I believe this may be caused by a tiny TOCTOU race in 
{{{}SingleThreadedFetcherManager{}}}. I'll admit that I don't fully understand 
what the execution flows through that code look like, but the use of atomic and 
synchronized indicate that it's used by multiple threads. If that's not the 
case, this report can be safely ignored.

The backtrace points to 
[https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165|https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165.]

And it looks like the concurrent hash map might be modified between the check 
for {{isEmpty()}} and the call to {{{}fetchers.values().iterator().next(){}}}.

I would suggest Python-style:
{code:java}
try {
  return fetchers.values().iterator().next();
} catch (NoSuchElementException) {
  return null;
}{code}
here instead, which should let {{ConcurrentHashMap}} handle its synchronization 
internally.

For some reason we were able to reproduce consistently with 2 Task Managers and 
2 slots per node, but not with 1 Task Manager and 4 slots, if that helps 
construct a repro test case (presumably more interlocking from {{synchronized}} 
in a single-TM environment, but not sure).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to