Dear Viktor,

I hope this email finds you well. First and foremost, thank you for providing 
such a detailed explanation of the AQS issue.

Based on Archie's suggestion, I've taken the liberty of implementing a custom 
ArrayBlockingQueue3 class and created a test case that attempts to reproduce 
the scenario you described. I've particularly focused on the potential 
"barging" behavior you mentioned.

Following Archie's recommendation, I've added the following implementation to 
ArrayBlockingQueue3:

private final HashSet<Thread> puttingThreads = new HashSet<>();

public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    final boolean added = puttingThreads.add(Thread.currentThread());
    try {
        while (count == items.length) {
            notFull.await();
        }
        enqueue(e);
    } finally {
        if (added)
            puttingThreads.remove(Thread.currentThread());
        lock.unlock();
    }
}

public boolean isPutting(Thread thread) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return puttingThreads.contains(thread);
    } finally {
        lock.unlock();
    }
}
Using this modified implementation, I've written a test case that attempts to 
simulate the scenario. I've attached the full test code to this email for your 
reference.



--------------------------------------------------------

The issue arises in the brief moment between when the thread performing the 
take() decreases the count and just before it sends the signal, during which 
another thread calling put() can acquire the lock via lock.lockInterruptibly(). 
I understand that the waiting spaces for Condition.await() and the 
ReentrantLock are separate, which contributes to this subtle timing problem. 
When a thread is signaled in Condition.await(), it is transferred from the 
Condition's waiting queue to the lock queue, where it must reacquire the lock 
before proceeding.

To better understand the timing of this issue, it may be helpful to review the 
dequeue method inside the queue.take()method

private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}
--------------------------------------------------------
Test Code

package org.main;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.ArrayList;
import java.util.List;
import org.example.ArrayBlockingQueue3;
import org.junit.jupiter.api.Test;

class ThreadStarvationTest {

    @Test
    void test() throws InterruptedException {
        // Set the capacity of the queue
        int CAPACITY = 10;
        // Create an instance of ArrayBlockingQueue3 with fairness mode enabled
        ArrayBlockingQueue3<Integer> queue = new 
ArrayBlockingQueue3<>(CAPACITY, true);
        
        // Preload the queue to its capacity
        int PRELOAD_VALUE = -1;
        for (int i = 0; i < CAPACITY; i++) {
            queue.add(PRELOAD_VALUE);
        }

        // Create threads that are already waiting to put elements
        int ALREADY_PUT_COUNT = 10;
        int ALREADY_PUT_NUMBER = 10;
        for (int i = 0; i < ALREADY_PUT_COUNT; i++) {
            Thread thread = new Thread(() -> {
                try {
                    // put() will block because the queue is full
                    // This thread simulates T2, the thread trying to put an 
element while the queue is full.
                    queue.put(ALREADY_PUT_NUMBER);
                } catch (InterruptedException e) {
                    // ignore
                }
            });
            thread.start();

            // Wait until the thread is actually trying to put
            while (!queue.isPutting(thread)) ;
        }

        // Give time for all threads to enter waiting state
        Thread.sleep(2000);

        // Set up new producer threads
        final int PRODUCER_PUT_NUMBER = -999;
        final int PRODUCER_COUNT = 1000;

        final Runnable producingJob = () -> {
            try {
                // T3
                queue.put(PRODUCER_PUT_NUMBER);
            } catch (InterruptedException e) {
                // ignore
            }
        };
        // Thread to start new producer threads when consumption begins
        Thread produceWhenConsumingThread = new Thread(() -> {
            for (int i = 0; i < PRODUCER_COUNT; i++) {
                Thread thd = new Thread(producingJob);
                thd.start();
            }
        });
        
        ArrayList<Integer> result = new ArrayList<>();

        // Start new producer threads simultaneously with consumption
        produceWhenConsumingThread.start();

        // Consume elements from the queue
        for (int i = 0; i < CAPACITY + ALREADY_PUT_COUNT; i++) {
            // Can be T1
            Integer take = queue.take();
            result.add(take);
        }

        // Expected result

        List<Integer> expected = new ArrayList<>();
        // First 10 elements should be -1 (preloaded values)
        for (int i = 0; i < CAPACITY; i++) {
            expected.add(PRELOAD_VALUE);
        }
        // Next 10 elements should be 10 (from already waiting threads)
        for (int i = 0; i < ALREADY_PUT_COUNT; i++) {
            expected.add(ALREADY_PUT_NUMBER);
        }

        // Assert that the actual result matches the expected result
        assertEquals(expected, result);
    }

}

Expected :[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 10, 10, 10, 10, 10, 10, 10, 
10, 10, 10]
Actual   :[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 10, 10, 10, 10, -999, -999, 
10, -999, -999, 10]
The test aims to recreate the following sequence:

The queue is full, and T2 is executing put() and is waiting in 
Condition.await().
T1 calls queue.take(), removes an item from the queue, and just before 
signaling, T1 decreases the count. At this point, the queue is no longer full 
(count != items.length).
T3 acquires the lock via lock.lockInterruptibly().
T1 then sends the Condition.signal(), and T2 is still waiting to acquire the 
lock via lock.lockInterruptibly().
T3 successfully enqueues an item because the while (count == items.length) 
condition is not satisfied. T3 then increases the count again, making the queue 
full (count == items.length).
T2 finally acquires the lock via lock.lockInterruptibly(), but since the queue 
is full again, T2 hits the while(count == items.length) condition and goes back 
to Condition.await().
This sequence illustrates a potential race condition where T3 acquires the lock 
just after T1 decreases the count but before T1 sends the signal. Even though 
T2 is waiting in Condition.await() and receives the signal, I understand that 
the waiting spaces for Condition.await() and ReentrantLock are separate. 
Because of this, I believe that T3 can still acquire the lock while T2 is in 
await(). As a result, T3 can proceed, enqueue an item, and refill the queue, 
causing T2, once it acquires the lock, to find the queue full again. This race 
condition seems possible even with a fair lock due to the subtle timing gap 
between when T1 lowers the count and when it sends the signal, which would 
force T2 to re-enter the await() state.

Since this test involves very subtle timing issues, I found it challenging to 
consistently reproduce it using only three threads. Therefore, my thought was 
that threads that had previously called put() and were waiting should be 
guaranteed to enter before subsequent threads that come in during consumption. 
I've attempted to design the test with this intention.

I would be incredibly grateful for your insights on whether this test 
accurately represents the scenario you described. I'm particularly interested 
in your opinion on whether it might help identify any unexpected behavior.

There's a possibility that this situation might not correspond to the unbounded 
unfairness you mentioned. I'm unsure whether this is the intended 
implementation or if there might be an issue here. Your expertise on this 
matter would be invaluable. Because it occurs only in extremely rare and 
extreme cases.

If you need any clarification or if you think the test should be modified in 
any way, please don't hesitate to let me know. I'm more than happy to make any 
necessary adjustments based on your feedback.

Thank you once again for your time and patience in helping me understand this 
complex issue.

Best regards,

Kim Minju


> 2024. 9. 6. 오후 8:47, Viktor Klang <viktor.kl...@oracle.com> 작성:
> 
> Hi Kim,
> 
> The recent updated to AQS reacquisition has to do with behavior if for some 
> reason there's an exception thrown (think SOE or OOM, or something like 
> that), so it isn't really applicable in this case.
> 
> The queue is full, and T2 is executing put() and is waiting in 
> Condition.await().
> T1 calls queue.take(), removes an item from the queue, and is about to send a 
> signal()
> T3 is about to call put() and is just before lock.lockInterruptibly().
> T1 decreases the count and sends a signal(), indicating that space is 
> available in the queue.
> T3 acquires the lock via lock.lockInterruptibly(), successfully enqueues an 
> item because the count condition is satisfied, and releases the lock.
> T2 receives the signal and wakes up, but since T3 already enqueued an item, 
> the count has increased, and T2 must wait again in await().
> 
> I've re-read ReentrantLock and AQS, and from my understanding on the logic 
> the Condition's place in the wait queue should be maintained, which means 
> that T3 shouldn't be able to "barge". (tryLock() is documented to allow 
> barging)
> 
> Let us know if you can come up with a reproducer which says otherwise. 👍
> 
> Cheers,
> √
> 
> 
> Viktor Klang
> Software Architect, Java Platform Group
> Oracle
> 
> From: 김민주 <miiiinj...@gmail.com>
> Sent: Friday, 6 September 2024 04:43
> To: Viktor Klang <viktor.kl...@oracle.com>
> Cc: Archie Cobbs <archie.co...@gmail.com>; Daniel FUCHS 
> <daniel.fu...@oracle.com>; core-libs-dev@openjdk.org 
> <core-libs-dev@openjdk.org>
> Subject: Re: [External] : Re: [POTENTIAL BUG] Potential FIFO violation in 
> BlockingQueue under high contention and suggestion for fair mode in 
> ArrayBlockingQueue and LinkedBlockingQueue
>  
> Hi Archie,
> Thanks to your valuable suggestions, I was able to come up with a much more 
> appropriate test, and I’ve learned a great deal in the process. I truly 
> appreciate your insights! While this approach is clearly a significant 
> improvement over the previous one, I still feel there might be concerns about 
> the atomicity between notFull.await() and the signaling from outside, but I 
> can’t deny that this new approach provides much better guarantees. Your 
> feedback has been incredibly helpful, and I’m very grateful for your time and 
> effort. Thank you again!
> 
> 
> 
> 
> Hi Viktor,
> Thank you for sharing your thoughts, which have given me much to consider. I 
> have a follow-up question regarding the improvements in AQS that you 
> mentioned. Specifically, I’d like to clarify whether these improvements 
> ensure that, in the fair mode of ReentrantLock, threads waiting on a 
> Condition are placed back in the queue without acquiring the lock, or if the 
> signaling thread can now immediately acquire the lock after being signaled.
> Initially, my concern was that a thread receiving a signal might still face 
> starvation if another thread calls put() and acquires the lock before the 
> signaled thread can do so. Here's an example scenario:
> The queue is full, and T2 is executing put() and is waiting in 
> Condition.await().
> T1 calls queue.take(), removes an item from the queue, and is about to send a 
> signal()
> T3 is about to call put() and is just before lock.lockInterruptibly().
> T1 decreases the count and sends a signal(), indicating that space is 
> available in the queue.
> T3 acquires the lock via lock.lockInterruptibly(), successfully enqueues an 
> item because the count condition is satisfied, and releases the lock.
> T2 receives the signal and wakes up, but since T3 already enqueued an item, 
> the count has increased, and T2 must wait again in await().
> It seems to me that this scenario could occur regardless of whether 
> ReentrantLock is in fair mode or not. Has the improvement in AQS addressed 
> this type of contention scenario to prevent such starvation issues, or is 
> this still a possible outcome?
> Your insights into "unbounded unfairness" have also provided me with a lot of 
> food for thought, and I’m grateful for the opportunity to reflect on these 
> issues. 
> In your view, would the thread contention scenario I’ve described fall under 
> the category of unbounded unfairness, or would it be considered an acceptable 
> level of contention?
> 
> Once again, thank you for all the knowledge and understanding I've gained 
> through your feedback. I'm truly grateful for your time and expertise.
> 
> 
> Best regards,
> Kim Minju
> 
> 
> 2024년 9월 6일 (금) 오전 4:52, Viktor Klang <viktor.kl...@oracle.com 
> <mailto:viktor.kl...@oracle.com>>님이 작성:
> Archie,
> 
> I should've been more specific—Condition-as-implemented-by-ReentrantLock (in 
> fair mode) provides stronger (for some definition of stronger) semantics that 
> the Condition interface specifies.
> 
> Since it's related, I've recently integrated a hardening of AQS and AQLS 
> reacquisition logic in await().
> 
> Given what you presented earlier about the detection of "producer parked" 
> it's likely that the conclusion is that ABQ works as expected.
> 
> Cheers,
> √
> 
> 
> Viktor Klang
> Software Architect, Java Platform Group
> Oracle
> From: Archie Cobbs <archie.co...@gmail.com <mailto:archie.co...@gmail.com>>
> Sent: Thursday, 5 September 2024 21:23
> To: Viktor Klang <viktor.kl...@oracle.com <mailto:viktor.kl...@oracle.com>>
> Cc: 김민주 <miiiinj...@gmail.com <mailto:miiiinj...@gmail.com>>; Daniel FUCHS 
> <daniel.fu...@oracle.com <mailto:daniel.fu...@oracle.com>>; 
> core-libs-dev@openjdk.org <mailto:core-libs-dev@openjdk.org> 
> <core-libs-dev@openjdk.org <mailto:core-libs-dev@openjdk.org>>
> Subject: Re: [External] : Re: [POTENTIAL BUG] Potential FIFO violation in 
> BlockingQueue under high contention and suggestion for fair mode in 
> ArrayBlockingQueue and LinkedBlockingQueue
>  
> Apologies in advance if I'm misunderstanding anything...
> 
> On Thu, Sep 5, 2024 at 2:05 PM Viktor Klang <viktor.kl...@oracle.com 
> <mailto:viktor.kl...@oracle.com>> wrote:
>  Thread state polling aside, for as long as Condition::await() is allowed to 
> spuriously wake, FIFO just cannot be "guaranteed".
>  
> What about this statement in the Javadoc for ReentrantLock.newCondition():
> 
> The ordering of lock reacquisition for threads returning from waiting methods 
> is the same as for threads initially acquiring the lock, which is in the 
> default case not specified, but for fair locks favors those threads that have 
> been waiting the longest.
> 
> So what you're saying is that a spurious wakeup on a Condition is not the 
> same thing as a spurious signal() on a Condition; if it were, then the above 
> statement would apply and FIFO ordering would be preserved.
> 
> Of course, a spurious wakeup would not find the condition being waited on 
> satisfied unless there was a big coincidence. So an ordering violation that 
> actually mattered should be exceedingly rare.
> 
> Anyway, this does seem to be a glitch in how things are supposed to work. 
> That is: there can be no guaranteed ordering for Condition waiters when there 
> can be spurious wakeups.
> 
> Maybe this corner case should be documented. Or better yet, fix the bug by 
> requiring Condition to "filter out" spurious wakeups if preserving FIFO 
> ordering (it should be possible).
> 
> -Archie
> 
> --
> Archie L. Cobbs

Reply via email to