[ https://issues.apache.org/jira/browse/FLINK-37663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Herbert Wang updated FLINK-37663: --------------------------------- Description: h1. Potential Thread Synchronization Issue in FutureCompletingBlockingQueue and Connector Split Fetching (Flink 1.18) *Context:* We are investigating potential sources of unresponsiveness or deadlocks related to connector split fetching in Flink 1.18, and our analysis points towards a possible synchronization issue within the {{FutureCompletingBlockingQueue}}. We are not certain this is the definitive cause, but we wanted to share our findings and reasoning for expert review. h2. Suspected Problem Description We suspect a potential thread synchronization issue exists in the {{FutureCompletingBlockingQueue}}'s interaction with split fetchers, specifically concerning the {{wakeUpPuttingThread()}} method. Our concern is that when this method manually wakes up a putting thread, it signals the associated condition variable but might not remove it from the internal {{notFull}} waiting queue. This could potentially lead to an inconsistent state where subsequent {{signalNextPutter()}} calls might signal an already-awoken or non-waiting thread's condition, effectively causing a "lost signal" for a genuinely waiting thread. h2. Hypothesized Scenario Illustrating the Concern Consider the following sequence of events based on our understanding of the code: # *Initial State*: The element queue (size 1) is full. A fetcher (Fetcher A) has completed its task, fetched records, and is blocked attempting to {{put()}} elements into the full queue. #* Fetcher A's thread calls {{waitOnPut()}} and goes to sleep. #* Its condition variable is added to the {{notFull}} queue. # *External Wake-up Trigger*: The {{SplitFetcherManager}} (or similar component) calls {{SplitFetcher.addSplits()}}, which eventually leads to a task being enqueued and a wake-up signal. #* Path: {{addSplits -> enqueueTaskUnsafe -> wakeUpUnsafe -> currentTask.wakeUp()}} (assuming Fetcher A was executing {{currentTask}}). # *Fetcher Wake-up Path*: #* {{FetchTask.wakeUp()}} sets an internal {{wakeup}} flag to {{true}}. #* It then calls {{elementsQueue.wakeUpPuttingThread(fetcherIndex)}} for Fetcher A. # *Potential Inconsistency Point*: Inside {{wakeUpPuttingThread(fetcherIndex)}}: #* The corresponding {{fetcherIndex}}'s wake-up flag is set to {{true}}. #* The associated condition variable (Fetcher A's) is signaled. #* Fetcher A's thread wakes up from {{waitOnPut()}}, likely checks its wake-up flag, and returns {{false}} from {{put()}}. #* *Key Concern*: Our analysis suggests that the condition variable for Fetcher A might *remain* in the {{notFull}} queue at this point, as {{wakeUpPuttingThread}} doesn't appear to remove it. # *Fetcher State Change*: Fetcher A, having returned from {{put()}} (possibly due to the wake-up), might subsequently be closed or enter an idle state. # *New Fetcher Blocks*: A different fetcher (Fetcher B) becomes active, fetches data, and attempts to {{put()}} elements into the queue, which is still full. #* Fetcher B calls {{waitOnPut()}}. #* Its condition variable is added to the {{notFull}} queue (potentially *after* Fetcher A's condition, if it remained). #* Fetcher B's thread goes to sleep. # *Consumer Action*: The source reader thread consumes an element from the queue via {{poll()}}. #* Path: {{getNextFetch() -> elementsQueue.poll() -> dequeue()}} # *Signaling Attempt*: Since the queue was full and is now not full, {{dequeue()}} calls {{signalNextPutter()}}. #* Path: {{dequeue() -> signalNextPutter() -> notFull.poll().signal()}} # *Potential Lost Signal*: *If* Fetcher A's condition remained in the {{notFull}} queue (as suspected in step 4) and is at the head of the queue, {{notFull.poll()}} will retrieve and signal Fetcher A's condition variable. #* This signal might be effectively lost because Fetcher A is no longer waiting on that condition (it was woken up manually or might even be closed). #* Fetcher B, which *is* genuinely waiting for space, remains asleep because its condition variable was not polled and signaled. #* This could lead to Fetcher B (and potentially others) never being woken up, resulting in stalled data fetching or apparent deadlocks. h2. Suggested Area for Investigation / Potential Fix Based on this hypothesis, the potential inconsistency seems to stem from {{wakeUpPuttingThread}} not removing the condition from the {{notFull}} queue. If this analysis is correct, a possible solution could involve ensuring the condition is removed when a thread is woken up manually via this path. For example, adding a line similar to: {code:java} // Inside wakeUpPuttingThread, after retrieving caf: notFull.remove(caf.condition()); // Conceptual - requires correct implementation details {code} This would aim to keep the {{notFull}} queue consistent with only genuinely waiting threads' conditions. *Request for Review:* We would greatly appreciate it if developers familiar with Flink's connector concurrency mechanisms could review this analysis. We are open to corrections if our understanding of the execution flow or synchronization logic is inaccurate. Our goal is to help identify or rule out this potential issue. was: # Potential Thread Synchronization Issue in FutureCompletingBlockingQueue and Connector Split Fetching (Flink 1.18) **Context:** We are investigating potential sources of unresponsiveness or deadlocks related to connector split fetching in Flink 1.18, and our analysis points towards a possible synchronization issue within the `FutureCompletingBlockingQueue`. We are not certain this is the definitive cause, but we wanted to share our findings and reasoning for expert review. ## Suspected Problem Description We suspect a potential thread synchronization issue exists in the `FutureCompletingBlockingQueue`'s interaction with split fetchers, specifically concerning the `wakeUpPuttingThread()` method. Our concern is that when this method manually wakes up a putting thread, it signals the associated condition variable but might not remove it from the internal `notFull` waiting queue. This could potentially lead to an inconsistent state where subsequent `signalNextPutter()` calls might signal an already-awoken or non-waiting thread's condition, effectively causing a "lost signal" for a genuinely waiting thread. ## Hypothesized Scenario Illustrating the Concern Consider the following sequence of events based on our understanding of the code: 1. **Initial State**: The element queue (size 1) is full. A fetcher (Fetcher A) has completed its task, fetched records, and is blocked attempting to `put()` elements into the full queue. * Fetcher A's thread calls `waitOnPut()` and goes to sleep. * Its condition variable is added to the `notFull` queue. 2. **External Wake-up Trigger**: The `SplitFetcherManager` (or similar component) calls `SplitFetcher.addSplits()`, which eventually leads to a task being enqueued and a wake-up signal. * Path: `addSplits` -> `enqueueTaskUnsafe` -> `wakeUpUnsafe` -> `currentTask.wakeUp()` (assuming Fetcher A was executing `currentTask`). 3. **Fetcher Wake-up Path**: * `FetchTask.wakeUp()` sets an internal `wakeup` flag to `true`. * It then calls `elementsQueue.wakeUpPuttingThread(fetcherIndex)` for Fetcher A. 4. **Potential Inconsistency Point**: Inside `wakeUpPuttingThread(fetcherIndex)`: * The corresponding `fetcherIndex`'s wake-up flag is set to `true`. * The associated condition variable (Fetcher A's) is signaled. * Fetcher A's thread wakes up from `waitOnPut()`, likely checks its wake-up flag, and returns `false` from `put()`. * **Key Concern**: Our analysis suggests that the condition variable for Fetcher A might *remain* in the `notFull` queue at this point, as `wakeUpPuttingThread` doesn't appear to remove it. 5. **Fetcher State Change**: Fetcher A, having returned from `put()` (possibly due to the wake-up), might subsequently be closed or enter an idle state. 6. **New Fetcher Blocks**: A different fetcher (Fetcher B) becomes active, fetches data, and attempts to `put()` elements into the queue, which is still full. * Fetcher B calls `waitOnPut()`. * Its condition variable is added to the `notFull` queue (potentially *after* Fetcher A's condition, if it remained). * Fetcher B's thread goes to sleep. 7. **Consumer Action**: The source reader thread consumes an element from the queue via `poll()`. * Path: `getNextFetch()` -> `elementsQueue.poll()` -> `dequeue()` 8. **Signaling Attempt**: Since the queue was full and is now not full, `dequeue()` calls `signalNextPutter()`. * Path: `dequeue()` -> `signalNextPutter()` -> `notFull.poll().signal()` 9. **Potential Lost Signal**: **If** Fetcher A's condition remained in the `notFull` queue (as suspected in step 4) and is at the head of the queue, `notFull.poll()` will retrieve and signal Fetcher A's condition variable. * This signal might be effectively lost because Fetcher A is no longer waiting on that condition (it was woken up manually or might even be closed). * Fetcher B, which *is* genuinely waiting for space, remains asleep because its condition variable was not polled and signaled. * This could lead to Fetcher B (and potentially others) never being woken up, resulting in stalled data fetching or apparent deadlocks. ## Suggested Area for Investigation / Potential Fix Based on this hypothesis, the potential inconsistency seems to stem from `wakeUpPuttingThread` not removing the condition from the `notFull` queue. If this analysis is correct, a possible solution could involve ensuring the condition is removed when a thread is woken up manually via this path. For example, adding a line similar to: ```java // Inside wakeUpPuttingThread, after retrieving caf: notFull.remove(caf.condition()); // Conceptual - requires correct implementation details > Thread Synchronization Issue in FutureCompletingBlockingQueue <> Connector > Split Fetching > ----------------------------------------------------------------------------------------- > > Key: FLINK-37663 > URL: https://issues.apache.org/jira/browse/FLINK-37663 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 1.18.1 > Reporter: Herbert Wang > Priority: Major > > h1. Potential Thread Synchronization Issue in FutureCompletingBlockingQueue > and Connector Split Fetching (Flink 1.18) > *Context:* We are investigating potential sources of unresponsiveness or > deadlocks related to connector split fetching in Flink 1.18, and our analysis > points towards a possible synchronization issue within the > {{FutureCompletingBlockingQueue}}. We are not certain this is the definitive > cause, but we wanted to share our findings and reasoning for expert review. > h2. Suspected Problem Description > We suspect a potential thread synchronization issue exists in the > {{FutureCompletingBlockingQueue}}'s interaction with split fetchers, > specifically concerning the {{wakeUpPuttingThread()}} method. Our concern is > that when this method manually wakes up a putting thread, it signals the > associated condition variable but might not remove it from the internal > {{notFull}} waiting queue. This could potentially lead to an inconsistent > state where subsequent {{signalNextPutter()}} calls might signal an > already-awoken or non-waiting thread's condition, effectively causing a "lost > signal" for a genuinely waiting thread. > h2. Hypothesized Scenario Illustrating the Concern > Consider the following sequence of events based on our understanding of the > code: > # *Initial State*: The element queue (size 1) is full. A fetcher (Fetcher A) > has completed its task, fetched records, and is blocked attempting to > {{put()}} elements into the full queue. > #* Fetcher A's thread calls {{waitOnPut()}} and goes to sleep. > #* Its condition variable is added to the {{notFull}} queue. > # *External Wake-up Trigger*: The {{SplitFetcherManager}} (or similar > component) calls {{SplitFetcher.addSplits()}}, which eventually leads to a > task being enqueued and a wake-up signal. > #* Path: {{addSplits -> enqueueTaskUnsafe -> wakeUpUnsafe -> > currentTask.wakeUp()}} (assuming Fetcher A was executing {{currentTask}}). > # *Fetcher Wake-up Path*: > #* {{FetchTask.wakeUp()}} sets an internal {{wakeup}} flag to {{true}}. > #* It then calls {{elementsQueue.wakeUpPuttingThread(fetcherIndex)}} for > Fetcher A. > # *Potential Inconsistency Point*: Inside > {{wakeUpPuttingThread(fetcherIndex)}}: > #* The corresponding {{fetcherIndex}}'s wake-up flag is set to {{true}}. > #* The associated condition variable (Fetcher A's) is signaled. > #* Fetcher A's thread wakes up from {{waitOnPut()}}, likely checks its > wake-up flag, and returns {{false}} from {{put()}}. > #* *Key Concern*: Our analysis suggests that the condition variable for > Fetcher A might *remain* in the {{notFull}} queue at this point, as > {{wakeUpPuttingThread}} doesn't appear to remove it. > # *Fetcher State Change*: Fetcher A, having returned from {{put()}} (possibly > due to the wake-up), might subsequently be closed or enter an idle state. > # *New Fetcher Blocks*: A different fetcher (Fetcher B) becomes active, > fetches data, and attempts to {{put()}} elements into the queue, which is > still full. > #* Fetcher B calls {{waitOnPut()}}. > #* Its condition variable is added to the {{notFull}} queue (potentially > *after* Fetcher A's condition, if it remained). > #* Fetcher B's thread goes to sleep. > # *Consumer Action*: The source reader thread consumes an element from the > queue via {{poll()}}. > #* Path: {{getNextFetch() -> elementsQueue.poll() -> dequeue()}} > # *Signaling Attempt*: Since the queue was full and is now not full, > {{dequeue()}} calls {{signalNextPutter()}}. > #* Path: {{dequeue() -> signalNextPutter() -> notFull.poll().signal()}} > # *Potential Lost Signal*: *If* Fetcher A's condition remained in the > {{notFull}} queue (as suspected in step 4) and is at the head of the queue, > {{notFull.poll()}} will retrieve and signal Fetcher A's condition variable. > #* This signal might be effectively lost because Fetcher A is no longer > waiting on that condition (it was woken up manually or might even be closed). > #* Fetcher B, which *is* genuinely waiting for space, remains asleep because > its condition variable was not polled and signaled. > #* This could lead to Fetcher B (and potentially others) never being woken > up, resulting in stalled data fetching or apparent deadlocks. > h2. Suggested Area for Investigation / Potential Fix > Based on this hypothesis, the potential inconsistency seems to stem from > {{wakeUpPuttingThread}} not removing the condition from the {{notFull}} > queue. If this analysis is correct, a possible solution could involve > ensuring the condition is removed when a thread is woken up manually via this > path. For example, adding a line similar to: > {code:java} > // Inside wakeUpPuttingThread, after retrieving caf: > notFull.remove(caf.condition()); // Conceptual - requires correct > implementation details > {code} > This would aim to keep the {{notFull}} queue consistent with only genuinely > waiting threads' conditions. > *Request for Review:* We would greatly appreciate it if developers familiar > with Flink's connector concurrency mechanisms could review this analysis. We > are open to corrections if our understanding of the execution flow or > synchronization logic is inaccurate. Our goal is to help identify or rule out > this potential issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)