[ 
https://issues.apache.org/jira/browse/FLINK-37663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37663:
-----------------------------------
    Labels: pull-request-available  (was: )

> Thread Synchronization Issue in FutureCompletingBlockingQueue <> Connector 
> Split Fetching
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-37663
>                 URL: https://issues.apache.org/jira/browse/FLINK-37663
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.18.1
>            Reporter: Herbert Wang
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Potential Thread Synchronization Issue in FutureCompletingBlockingQueue 
> and Connector Split Fetching (Flink 1.18)
> *Context:* We are investigating potential sources of unresponsiveness or 
> deadlocks related to connector split fetching in Flink 1.18, and our analysis 
> points towards a possible synchronization issue within the 
> {{FutureCompletingBlockingQueue}}. We are not certain this is the definitive 
> cause, but we wanted to share our findings and reasoning for expert review.
> h2. Suspected Problem Description
> We suspect a potential thread synchronization issue exists in the 
> {{FutureCompletingBlockingQueue}}'s interaction with split fetchers, 
> specifically concerning the {{wakeUpPuttingThread()}} method. Our concern is 
> that when this method manually wakes up a putting thread, it signals the 
> associated condition variable but might not remove it from the internal 
> {{notFull}} waiting queue. This could potentially lead to an inconsistent 
> state where subsequent {{signalNextPutter()}} calls might signal an 
> already-awoken or non-waiting thread's condition, effectively causing a "lost 
> signal" for a genuinely waiting thread.
> h2. Hypothesized Scenario Illustrating the Concern
> Consider the following sequence of events based on our understanding of the 
> code:
> # *Initial State*: The element queue (size 1) is full. A fetcher (Fetcher A) 
> has completed its task, fetched records, and is blocked attempting to 
> {{put()}} elements into the full queue.
> #* Fetcher A's thread calls {{waitOnPut()}} and goes to sleep.
> #* Its condition variable is added to the {{notFull}} queue.
> # *External Wake-up Trigger*: The {{SplitFetcherManager}} (or similar 
> component) calls {{SplitFetcher.addSplits()}}, which eventually leads to a 
> task being enqueued and a wake-up signal.
> #* Path: {{addSplits -> enqueueTaskUnsafe -> wakeUpUnsafe -> 
> currentTask.wakeUp()}} (assuming Fetcher A was executing {{currentTask}}).
> # *Fetcher Wake-up Path*:
> #* {{FetchTask.wakeUp()}} sets an internal {{wakeup}} flag to {{true}}.
> #* It then calls {{elementsQueue.wakeUpPuttingThread(fetcherIndex)}} for 
> Fetcher A.
> # *Potential Inconsistency Point*: Inside 
> {{wakeUpPuttingThread(fetcherIndex)}}:
> #* The corresponding {{fetcherIndex}}'s wake-up flag is set to {{true}}.
> #* The associated condition variable (Fetcher A's) is signaled.
> #* Fetcher A's thread wakes up from {{waitOnPut()}}, likely checks its 
> wake-up flag, and returns {{false}} from {{put()}}.
> #* *Key Concern*: Our analysis suggests that the condition variable for 
> Fetcher A might *remain* in the {{notFull}} queue at this point, as 
> {{wakeUpPuttingThread}} doesn't appear to remove it.
> # *Fetcher State Change*: Fetcher A, having returned from {{put()}} (possibly 
> due to the wake-up), might subsequently be closed or enter an idle state.
> # *New Fetcher Blocks*: A different fetcher (Fetcher B) becomes active, 
> fetches data, and attempts to {{put()}} elements into the queue, which is 
> still full.
> #* Fetcher B calls {{waitOnPut()}}.
> #* Its condition variable is added to the {{notFull}} queue (potentially 
> *after* Fetcher A's condition, if it remained).
> #* Fetcher B's thread goes to sleep.
> # *Consumer Action*: The source reader thread consumes an element from the 
> queue via {{poll()}}.
> #* Path: {{getNextFetch() -> elementsQueue.poll() -> dequeue()}}
> # *Signaling Attempt*: Since the queue was full and is now not full, 
> {{dequeue()}} calls {{signalNextPutter()}}.
> #* Path: {{dequeue() -> signalNextPutter() -> notFull.poll().signal()}}
> # *Potential Lost Signal*: *If* Fetcher A's condition remained in the 
> {{notFull}} queue (as suspected in step 4) and is at the head of the queue, 
> {{notFull.poll()}} will retrieve and signal Fetcher A's condition variable.
> #* This signal might be effectively lost because Fetcher A is no longer 
> waiting on that condition (it was woken up manually or might even be closed).
> #* Fetcher B, which *is* genuinely waiting for space, remains asleep because 
> its condition variable was not polled and signaled.
> #* This could lead to Fetcher B (and potentially others) never being woken 
> up, resulting in stalled data fetching or apparent deadlocks.
> h2. Suggested Area for Investigation / Potential Fix
> Based on this hypothesis, the potential inconsistency seems to stem from 
> {{wakeUpPuttingThread}} not removing the condition from the {{notFull}} 
> queue. If this analysis is correct, a possible solution could involve 
> ensuring the condition is removed when a thread is woken up manually via this 
> path. For example, adding a line similar to:
> {code:java}
> // Inside wakeUpPuttingThread, after retrieving caf:
> notFull.remove(caf.condition()); // Conceptual - requires correct 
> implementation details
> {code}
> This would aim to keep the {{notFull}} queue consistent with only genuinely 
> waiting threads' conditions.
> *Request for Review:* We would greatly appreciate it if developers familiar 
> with Flink's connector concurrency mechanisms could review this analysis. We 
> are open to corrections if our understanding of the execution flow or 
> synchronization logic is inaccurate. Our goal is to help identify or rule out 
> this potential issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to