herbherbherb opened a new pull request, #26446:
URL: https://github.com/apache/flink/pull/26446

   **PR Description:**
   
   ## What is the purpose of the change
   
   This pull request addresses a potential thread synchronization issue 
identified in `FutureCompletingBlockingQueue` (see 
[FLINK-37663](https://issues.apache.org/jira/browse/FLINK-37663)). The concern 
is that the `wakeUpPuttingThread` method signals a putting thread's condition 
variable but might not remove it from the internal `notFull` condition queue.
   
   This could lead to an inconsistent state where subsequent signals (triggered 
when `dequeue` finds space) are sent to the condition of an already woken or 
potentially closed thread, effectively losing the signal for a genuinely 
waiting thread. This scenario could potentially cause source fetchers to stall 
indefinitely. This change aims to prevent this potential lost signal by 
ensuring the condition queue remains consistent.
   
   ## Brief change log
   
   *   Modified `FutureCompletingBlockingQueue#wakeUpPuttingThread(int 
fetcherIndex)` to explicitly remove the corresponding condition variable from 
the `notFull` queue after retrieving it and before signaling it.
   *   *(Optional, if you added one)* Added 
`FutureCompletingBlockingQueueTest#testManualWakeupRemovesCondition` (or a 
similar name) to specifically verify that a manually woken thread's condition 
is removed from the `notFull` queue and does not interfere with subsequent 
signals intended for other waiting threads.
   
   ## Verifying this change
   
   *(Choose ONE of the following options and delete the others)*
   
   **Option 1 (If you added a new test):**
   
   This change added tests and can be verified as follows:
   *   Added 
`FutureCompletingBlockingQueueTest#testManualWakeupRemovesCondition` (or your 
actual test name) which simulates the scenario described in 
[FLINK-37663](https://issues.apache.org/jira/browse/FLINK-37663):
       1.  A thread blocks on `put()`.
       2.  It gets manually woken up via `wakeUpPuttingThread()`.
       3.  Another thread blocks on `put()`.
       4.  An element is polled from the queue using `poll()`.
       5.  The test verifies that the *second* thread (which was genuinely 
waiting) is woken up by the signal triggered during `poll()`, confirming the 
first thread's condition was removed upon manual wakeup.
   
   **Option 2 (If existing tests cover it - Less likely for this specific 
scenario):**
   
   This change is already covered by existing tests, such as *(please describe 
precisely which existing tests now cover this specific edge case after your 
change, e.g., `FutureCompletingBlockingQueueTest` suite passes and implicitly 
covers this synchronization)*. However, adding a dedicated test (as in Option 
1) is strongly recommended for subtle concurrency issues.
   
   **Option 3 (If it's a trivial fix *without* tests - Highly discouraged for 
concurrency fixes):**
   
   This change is a targeted fix for the synchronization logic described in 
FLINK-37663. Manual code inspection suggests it correctly addresses the 
potential inconsistency. (Note: Lack of specific tests might delay review).
   
   ## Does this pull request potentially affect one of the following parts:
   
   *   Dependencies (does it add or upgrade a dependency): **no**
   *   The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no** (`FutureCompletingBlockingQueue` is `@Internal`)
   *   The serializers: **no**
   *   The runtime per-record code paths (performance sensitive): **yes** (This 
queue is used in source connectors' data path; the change adds a queue removal 
operation which has minor performance overhead)
   *   Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **yes** (If source 
connectors stall due to this bug, it can affect checkpointing progress and 
overall job health/recovery).
   *   The S3 file system connector: **don't know** (It affects all connectors 
using the base source framework, potentially including S3 if it uses this 
mechanism, but the change is in connector-base).
   
   ## Documentation
   
   *   Does this pull request introduce a new feature? **no**
   *   If yes, how is the feature documented? **not applicable**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to