Herbert Wang created FLINK-37663: ------------------------------------ Summary: Thread Synchronization Issue in FutureCompletingBlockingQueue <> Connector Split Fetching Key: FLINK-37663 URL: https://issues.apache.org/jira/browse/FLINK-37663 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.18.1 Reporter: Herbert Wang
# Potential Thread Synchronization Issue in FutureCompletingBlockingQueue and Connector Split Fetching (Flink 1.18) **Context:** We are investigating potential sources of unresponsiveness or deadlocks related to connector split fetching in Flink 1.18, and our analysis points towards a possible synchronization issue within the `FutureCompletingBlockingQueue`. We are not certain this is the definitive cause, but we wanted to share our findings and reasoning for expert review. ## Suspected Problem Description We suspect a potential thread synchronization issue exists in the `FutureCompletingBlockingQueue`'s interaction with split fetchers, specifically concerning the `wakeUpPuttingThread()` method. Our concern is that when this method manually wakes up a putting thread, it signals the associated condition variable but might not remove it from the internal `notFull` waiting queue. This could potentially lead to an inconsistent state where subsequent `signalNextPutter()` calls might signal an already-awoken or non-waiting thread's condition, effectively causing a "lost signal" for a genuinely waiting thread. ## Hypothesized Scenario Illustrating the Concern Consider the following sequence of events based on our understanding of the code: 1. **Initial State**: The element queue (size 1) is full. A fetcher (Fetcher A) has completed its task, fetched records, and is blocked attempting to `put()` elements into the full queue. * Fetcher A's thread calls `waitOnPut()` and goes to sleep. * Its condition variable is added to the `notFull` queue. 2. **External Wake-up Trigger**: The `SplitFetcherManager` (or similar component) calls `SplitFetcher.addSplits()`, which eventually leads to a task being enqueued and a wake-up signal. * Path: `addSplits` -> `enqueueTaskUnsafe` -> `wakeUpUnsafe` -> `currentTask.wakeUp()` (assuming Fetcher A was executing `currentTask`). 3. **Fetcher Wake-up Path**: * `FetchTask.wakeUp()` sets an internal `wakeup` flag to `true`. * It then calls `elementsQueue.wakeUpPuttingThread(fetcherIndex)` for Fetcher A. 4. **Potential Inconsistency Point**: Inside `wakeUpPuttingThread(fetcherIndex)`: * The corresponding `fetcherIndex`'s wake-up flag is set to `true`. * The associated condition variable (Fetcher A's) is signaled. * Fetcher A's thread wakes up from `waitOnPut()`, likely checks its wake-up flag, and returns `false` from `put()`. * **Key Concern**: Our analysis suggests that the condition variable for Fetcher A might *remain* in the `notFull` queue at this point, as `wakeUpPuttingThread` doesn't appear to remove it. 5. **Fetcher State Change**: Fetcher A, having returned from `put()` (possibly due to the wake-up), might subsequently be closed or enter an idle state. 6. **New Fetcher Blocks**: A different fetcher (Fetcher B) becomes active, fetches data, and attempts to `put()` elements into the queue, which is still full. * Fetcher B calls `waitOnPut()`. * Its condition variable is added to the `notFull` queue (potentially *after* Fetcher A's condition, if it remained). * Fetcher B's thread goes to sleep. 7. **Consumer Action**: The source reader thread consumes an element from the queue via `poll()`. * Path: `getNextFetch()` -> `elementsQueue.poll()` -> `dequeue()` 8. **Signaling Attempt**: Since the queue was full and is now not full, `dequeue()` calls `signalNextPutter()`. * Path: `dequeue()` -> `signalNextPutter()` -> `notFull.poll().signal()` 9. **Potential Lost Signal**: **If** Fetcher A's condition remained in the `notFull` queue (as suspected in step 4) and is at the head of the queue, `notFull.poll()` will retrieve and signal Fetcher A's condition variable. * This signal might be effectively lost because Fetcher A is no longer waiting on that condition (it was woken up manually or might even be closed). * Fetcher B, which *is* genuinely waiting for space, remains asleep because its condition variable was not polled and signaled. * This could lead to Fetcher B (and potentially others) never being woken up, resulting in stalled data fetching or apparent deadlocks. ## Suggested Area for Investigation / Potential Fix Based on this hypothesis, the potential inconsistency seems to stem from `wakeUpPuttingThread` not removing the condition from the `notFull` queue. If this analysis is correct, a possible solution could involve ensuring the condition is removed when a thread is woken up manually via this path. For example, adding a line similar to: ```java // Inside wakeUpPuttingThread, after retrieving caf: notFull.remove(caf.condition()); // Conceptual - requires correct implementation details -- This message was sent by Atlassian Jira (v8.20.10#820010)