[ https://issues.apache.org/jira/browse/FLINK-37663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-37663: ----------------------------------- Labels: pull-request-available (was: ) > Thread Synchronization Issue in FutureCompletingBlockingQueue <> Connector > Split Fetching > ----------------------------------------------------------------------------------------- > > Key: FLINK-37663 > URL: https://issues.apache.org/jira/browse/FLINK-37663 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 1.18.1 > Reporter: Herbert Wang > Priority: Major > Labels: pull-request-available > > h1. Potential Thread Synchronization Issue in FutureCompletingBlockingQueue > and Connector Split Fetching (Flink 1.18) > *Context:* We are investigating potential sources of unresponsiveness or > deadlocks related to connector split fetching in Flink 1.18, and our analysis > points towards a possible synchronization issue within the > {{FutureCompletingBlockingQueue}}. We are not certain this is the definitive > cause, but we wanted to share our findings and reasoning for expert review. > h2. Suspected Problem Description > We suspect a potential thread synchronization issue exists in the > {{FutureCompletingBlockingQueue}}'s interaction with split fetchers, > specifically concerning the {{wakeUpPuttingThread()}} method. Our concern is > that when this method manually wakes up a putting thread, it signals the > associated condition variable but might not remove it from the internal > {{notFull}} waiting queue. This could potentially lead to an inconsistent > state where subsequent {{signalNextPutter()}} calls might signal an > already-awoken or non-waiting thread's condition, effectively causing a "lost > signal" for a genuinely waiting thread. > h2. Hypothesized Scenario Illustrating the Concern > Consider the following sequence of events based on our understanding of the > code: > # *Initial State*: The element queue (size 1) is full. A fetcher (Fetcher A) > has completed its task, fetched records, and is blocked attempting to > {{put()}} elements into the full queue. > #* Fetcher A's thread calls {{waitOnPut()}} and goes to sleep. > #* Its condition variable is added to the {{notFull}} queue. > # *External Wake-up Trigger*: The {{SplitFetcherManager}} (or similar > component) calls {{SplitFetcher.addSplits()}}, which eventually leads to a > task being enqueued and a wake-up signal. > #* Path: {{addSplits -> enqueueTaskUnsafe -> wakeUpUnsafe -> > currentTask.wakeUp()}} (assuming Fetcher A was executing {{currentTask}}). > # *Fetcher Wake-up Path*: > #* {{FetchTask.wakeUp()}} sets an internal {{wakeup}} flag to {{true}}. > #* It then calls {{elementsQueue.wakeUpPuttingThread(fetcherIndex)}} for > Fetcher A. > # *Potential Inconsistency Point*: Inside > {{wakeUpPuttingThread(fetcherIndex)}}: > #* The corresponding {{fetcherIndex}}'s wake-up flag is set to {{true}}. > #* The associated condition variable (Fetcher A's) is signaled. > #* Fetcher A's thread wakes up from {{waitOnPut()}}, likely checks its > wake-up flag, and returns {{false}} from {{put()}}. > #* *Key Concern*: Our analysis suggests that the condition variable for > Fetcher A might *remain* in the {{notFull}} queue at this point, as > {{wakeUpPuttingThread}} doesn't appear to remove it. > # *Fetcher State Change*: Fetcher A, having returned from {{put()}} (possibly > due to the wake-up), might subsequently be closed or enter an idle state. > # *New Fetcher Blocks*: A different fetcher (Fetcher B) becomes active, > fetches data, and attempts to {{put()}} elements into the queue, which is > still full. > #* Fetcher B calls {{waitOnPut()}}. > #* Its condition variable is added to the {{notFull}} queue (potentially > *after* Fetcher A's condition, if it remained). > #* Fetcher B's thread goes to sleep. > # *Consumer Action*: The source reader thread consumes an element from the > queue via {{poll()}}. > #* Path: {{getNextFetch() -> elementsQueue.poll() -> dequeue()}} > # *Signaling Attempt*: Since the queue was full and is now not full, > {{dequeue()}} calls {{signalNextPutter()}}. > #* Path: {{dequeue() -> signalNextPutter() -> notFull.poll().signal()}} > # *Potential Lost Signal*: *If* Fetcher A's condition remained in the > {{notFull}} queue (as suspected in step 4) and is at the head of the queue, > {{notFull.poll()}} will retrieve and signal Fetcher A's condition variable. > #* This signal might be effectively lost because Fetcher A is no longer > waiting on that condition (it was woken up manually or might even be closed). > #* Fetcher B, which *is* genuinely waiting for space, remains asleep because > its condition variable was not polled and signaled. > #* This could lead to Fetcher B (and potentially others) never being woken > up, resulting in stalled data fetching or apparent deadlocks. > h2. Suggested Area for Investigation / Potential Fix > Based on this hypothesis, the potential inconsistency seems to stem from > {{wakeUpPuttingThread}} not removing the condition from the {{notFull}} > queue. If this analysis is correct, a possible solution could involve > ensuring the condition is removed when a thread is woken up manually via this > path. For example, adding a line similar to: > {code:java} > // Inside wakeUpPuttingThread, after retrieving caf: > notFull.remove(caf.condition()); // Conceptual - requires correct > implementation details > {code} > This would aim to keep the {{notFull}} queue consistent with only genuinely > waiting threads' conditions. > *Request for Review:* We would greatly appreciate it if developers familiar > with Flink's connector concurrency mechanisms could review this analysis. We > are open to corrections if our understanding of the execution flow or > synchronization logic is inaccurate. Our goal is to help identify or rule out > this potential issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)