Herbert Wang created FLINK-37663:
------------------------------------

             Summary: Thread Synchronization Issue in 
FutureCompletingBlockingQueue <> Connector Split Fetching
                 Key: FLINK-37663
                 URL: https://issues.apache.org/jira/browse/FLINK-37663
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 1.18.1
            Reporter: Herbert Wang


# Potential Thread Synchronization Issue in FutureCompletingBlockingQueue and 
Connector Split Fetching (Flink 1.18)

**Context:** We are investigating potential sources of unresponsiveness or 
deadlocks related to connector split fetching in Flink 1.18, and our analysis 
points towards a possible synchronization issue within the 
`FutureCompletingBlockingQueue`. We are not certain this is the definitive 
cause, but we wanted to share our findings and reasoning for expert review.

## Suspected Problem Description

We suspect a potential thread synchronization issue exists in the 
`FutureCompletingBlockingQueue`'s interaction with split fetchers, specifically 
concerning the `wakeUpPuttingThread()` method. Our concern is that when this 
method manually wakes up a putting thread, it signals the associated condition 
variable but might not remove it from the internal `notFull` waiting queue. 
This could potentially lead to an inconsistent state where subsequent 
`signalNextPutter()` calls might signal an already-awoken or non-waiting 
thread's condition, effectively causing a "lost signal" for a genuinely waiting 
thread.

## Hypothesized Scenario Illustrating the Concern

Consider the following sequence of events based on our understanding of the 
code:

1.  **Initial State**: The element queue (size 1) is full. A fetcher (Fetcher 
A) has completed its task, fetched records, and is blocked attempting to 
`put()` elements into the full queue.
    *   Fetcher A's thread calls `waitOnPut()` and goes to sleep.
    *   Its condition variable is added to the `notFull` queue.

2.  **External Wake-up Trigger**: The `SplitFetcherManager` (or similar 
component) calls `SplitFetcher.addSplits()`, which eventually leads to a task 
being enqueued and a wake-up signal.
    *   Path: `addSplits` -> `enqueueTaskUnsafe` -> `wakeUpUnsafe` -> 
`currentTask.wakeUp()` (assuming Fetcher A was executing `currentTask`).

3.  **Fetcher Wake-up Path**:
    *   `FetchTask.wakeUp()` sets an internal `wakeup` flag to `true`.
    *   It then calls `elementsQueue.wakeUpPuttingThread(fetcherIndex)` for 
Fetcher A.

4.  **Potential Inconsistency Point**: Inside 
`wakeUpPuttingThread(fetcherIndex)`:
    *   The corresponding `fetcherIndex`'s wake-up flag is set to `true`.
    *   The associated condition variable (Fetcher A's) is signaled.
    *   Fetcher A's thread wakes up from `waitOnPut()`, likely checks its 
wake-up flag, and returns `false` from `put()`.
    *   **Key Concern**: Our analysis suggests that the condition variable for 
Fetcher A might *remain* in the `notFull` queue at this point, as 
`wakeUpPuttingThread` doesn't appear to remove it.

5.  **Fetcher State Change**: Fetcher A, having returned from `put()` (possibly 
due to the wake-up), might subsequently be closed or enter an idle state.

6.  **New Fetcher Blocks**: A different fetcher (Fetcher B) becomes active, 
fetches data, and attempts to `put()` elements into the queue, which is still 
full.
    *   Fetcher B calls `waitOnPut()`.
    *   Its condition variable is added to the `notFull` queue (potentially 
*after* Fetcher A's condition, if it remained).
    *   Fetcher B's thread goes to sleep.

7.  **Consumer Action**: The source reader thread consumes an element from the 
queue via `poll()`.
    *   Path: `getNextFetch()` -> `elementsQueue.poll()` -> `dequeue()`

8.  **Signaling Attempt**: Since the queue was full and is now not full, 
`dequeue()` calls `signalNextPutter()`.
    *   Path: `dequeue()` -> `signalNextPutter()` -> `notFull.poll().signal()`

9.  **Potential Lost Signal**: **If** Fetcher A's condition remained in the 
`notFull` queue (as suspected in step 4) and is at the head of the queue, 
`notFull.poll()` will retrieve and signal Fetcher A's condition variable.
    *   This signal might be effectively lost because Fetcher A is no longer 
waiting on that condition (it was woken up manually or might even be closed).
    *   Fetcher B, which *is* genuinely waiting for space, remains asleep 
because its condition variable was not polled and signaled.
    *   This could lead to Fetcher B (and potentially others) never being woken 
up, resulting in stalled data fetching or apparent deadlocks.

## Suggested Area for Investigation / Potential Fix

Based on this hypothesis, the potential inconsistency seems to stem from 
`wakeUpPuttingThread` not removing the condition from the `notFull` queue. If 
this analysis is correct, a possible solution could involve ensuring the 
condition is removed when a thread is woken up manually via this path. For 
example, adding a line similar to:

```java
// Inside wakeUpPuttingThread, after retrieving caf:
notFull.remove(caf.condition()); // Conceptual - requires correct 
implementation details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to