[ 
https://issues.apache.org/jira/browse/FLINK-36146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881648#comment-17881648
 ] 

Jiangjie Qin commented on FLINK-36146:
--------------------------------------

Yes, this is an apparent race condition.  Are you interested in submitting a 
patch? The fix might be something like following:
{code:java}
Iterator<SplitFetcher<E, SplitT>> iter = fetchers.values().iterator();
return iter.hasNext() ? iter.next() : null;{code}

> NoSuchElement exception from SingleThreadFetcherManager
> -------------------------------------------------------
>
>                 Key: FLINK-36146
>                 URL: https://issues.apache.org/jira/browse/FLINK-36146
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>         Environment: AWS EMR/Yarn
>            Reporter: Kim Gräsman
>            Priority: Minor
>
> We're running Flink 1.14.2, but this appears to be an issue still on 
> mainline, so I thought I'd report it.
> When running with high parallelism we've noticed a spurious error triggered 
> by a FileSource reader from S3;
> {code:java}
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
> reading split(s) [0000543131]
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished reading from splits [0000543131]
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
> - Closing splitFetcher 157 because it is idle.
> 2024-08-19 15:23:07,045 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Shutting down split fetcher 157
> 2024-08-19 15:23:07,045 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
> fetcher 157 exited.
> 2024-08-19 15:23:07,048 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [FileSourceSplit: ... [0, 21679984)  hosts=[localhost] 
> ID=0000201373 position=null]
> 2024-08-19 15:23:07,064 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
> Source Reader.
> 2024-08-19 15:23:07,069 WARN  org.apache.flink.runtime.taskmanager.Task       
>              [] - Source: ... -> ... (114/1602)#0 (...) switched from RUNNING 
> to FAILED with failure cause: java.util.NoSuchElementException
>         at 
> java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82)
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242)
>         at 
> org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428)
>         at 
> org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:70)
>         at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:83)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1473)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>         at java.base/java.lang.Thread.run(Thread.java:829) {code}
> I believe this may be caused by a tiny TOCTOU race in 
> {{{}SingleThreadedFetcherManager{}}}. I'll admit that I don't fully 
> understand what the execution flows through that code look like, but the use 
> of atomic and synchronized indicate that it's used by multiple threads. If 
> that's not the case, this report can be safely ignored.
> The backtrace points to 
> [https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165|https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165.]
> And it looks like the concurrent hash map might be modified between the check 
> for {{isEmpty()}} and the call to {{{}fetchers.values().iterator().next(){}}}.
> I would suggest Python-style:
> {code:java}
> try {
>   return fetchers.values().iterator().next();
> } catch (NoSuchElementException) {
>   return null;
> }{code}
> here instead, which should let {{ConcurrentHashMap}} handle its 
> synchronization internally.
> For some reason we were able to reproduce consistently with 2 Task Managers 
> and 2 slots per node, but not with 1 Task Manager and 4 slots, if that helps 
> construct a repro test case (presumably more interlocking from 
> {{synchronized}} in a single-TM environment, but not sure).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to