herbherbherb opened a new pull request, #26446: URL: https://github.com/apache/flink/pull/26446
**PR Description:** ## What is the purpose of the change This pull request addresses a potential thread synchronization issue identified in `FutureCompletingBlockingQueue` (see [FLINK-37663](https://issues.apache.org/jira/browse/FLINK-37663)). The concern is that the `wakeUpPuttingThread` method signals a putting thread's condition variable but might not remove it from the internal `notFull` condition queue. This could lead to an inconsistent state where subsequent signals (triggered when `dequeue` finds space) are sent to the condition of an already woken or potentially closed thread, effectively losing the signal for a genuinely waiting thread. This scenario could potentially cause source fetchers to stall indefinitely. This change aims to prevent this potential lost signal by ensuring the condition queue remains consistent. ## Brief change log * Modified `FutureCompletingBlockingQueue#wakeUpPuttingThread(int fetcherIndex)` to explicitly remove the corresponding condition variable from the `notFull` queue after retrieving it and before signaling it. * *(Optional, if you added one)* Added `FutureCompletingBlockingQueueTest#testManualWakeupRemovesCondition` (or a similar name) to specifically verify that a manually woken thread's condition is removed from the `notFull` queue and does not interfere with subsequent signals intended for other waiting threads. ## Verifying this change *(Choose ONE of the following options and delete the others)* **Option 1 (If you added a new test):** This change added tests and can be verified as follows: * Added `FutureCompletingBlockingQueueTest#testManualWakeupRemovesCondition` (or your actual test name) which simulates the scenario described in [FLINK-37663](https://issues.apache.org/jira/browse/FLINK-37663): 1. A thread blocks on `put()`. 2. It gets manually woken up via `wakeUpPuttingThread()`. 3. Another thread blocks on `put()`. 4. An element is polled from the queue using `poll()`. 5. The test verifies that the *second* thread (which was genuinely waiting) is woken up by the signal triggered during `poll()`, confirming the first thread's condition was removed upon manual wakeup. **Option 2 (If existing tests cover it - Less likely for this specific scenario):** This change is already covered by existing tests, such as *(please describe precisely which existing tests now cover this specific edge case after your change, e.g., `FutureCompletingBlockingQueueTest` suite passes and implicitly covers this synchronization)*. However, adding a dedicated test (as in Option 1) is strongly recommended for subtle concurrency issues. **Option 3 (If it's a trivial fix *without* tests - Highly discouraged for concurrency fixes):** This change is a targeted fix for the synchronization logic described in FLINK-37663. Manual code inspection suggests it correctly addresses the potential inconsistency. (Note: Lack of specific tests might delay review). ## Does this pull request potentially affect one of the following parts: * Dependencies (does it add or upgrade a dependency): **no** * The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** (`FutureCompletingBlockingQueue` is `@Internal`) * The serializers: **no** * The runtime per-record code paths (performance sensitive): **yes** (This queue is used in source connectors' data path; the change adds a queue removal operation which has minor performance overhead) * Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **yes** (If source connectors stall due to this bug, it can affect checkpointing progress and overall job health/recovery). * The S3 file system connector: **don't know** (It affects all connectors using the base source framework, potentially including S3 if it uses this mechanism, but the change is in connector-base). ## Documentation * Does this pull request introduce a new feature? **no** * If yes, how is the feature documented? **not applicable** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org